1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{primary::PrimaryMessage, PayloadToken};
use config::{Committee, WorkerId};
use crypto::PublicKey;
use network::{P2pNetwork, UnreliableNetwork};
use storage::CertificateStore;
use store::{Store, StoreError};
use thiserror::Error;
use tokio::{sync::watch, task::JoinHandle};
use tracing::{error, info, instrument};
use types::{
    metered_channel::Receiver, BatchDigest, Certificate, CertificateDigest, ReconfigureNotification,
};

#[cfg(test)]
#[path = "tests/helper_tests.rs"]
mod helper_tests;

#[derive(Debug, Error)]
enum HelperError {
    #[error("Storage failure: {0}")]
    StoreError(#[from] StoreError),

    #[error("Invalid request received: {0}")]
    InvalidRequest(String),
}

/// A task dedicated to help other authorities by replying to their certificate &
/// payload availability requests.
pub struct Helper {
    /// The node's name
    name: PublicKey,
    /// The committee information.
    committee: Committee,
    /// The certificate persistent storage.
    certificate_store: CertificateStore,
    /// The payloads (batches) persistent storage.
    payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
    /// Watch channel to reconfigure the committee.
    rx_committee: watch::Receiver<ReconfigureNotification>,
    /// Input channel to receive requests.
    rx_primaries: Receiver<PrimaryMessage>,
    /// A network sender to reply to the sync requests.
    primary_network: P2pNetwork,
}

impl Helper {
    #[must_use]
    pub fn spawn(
        name: PublicKey,
        committee: Committee,
        certificate_store: CertificateStore,
        payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
        rx_committee: watch::Receiver<ReconfigureNotification>,
        rx_primaries: Receiver<PrimaryMessage>,
        primary_network: P2pNetwork,
    ) -> JoinHandle<()> {
        tokio::spawn(async move {
            Self {
                name,
                committee,
                certificate_store,
                payload_store,
                rx_committee,
                rx_primaries,
                primary_network,
            }
            .run()
            .await;
        })
    }

    async fn run(&mut self) {
        info!(
            "Helper for availability requests on node {} has started successfully.",
            self.name
        );
        loop {
            tokio::select! {
                Some(request) = self.rx_primaries.recv() => match request {
                    // The CertificatesRequest will find any certificates that exist in
                    // the data source (dictated by the digests parameter). The results
                    // will be emitted one by one to the consumer.
                    PrimaryMessage::CertificatesRequest(digests, origin) => {
                        let _ = self.process_certificates(digests, origin, false).await;
                    }
                    // The CertificatesBatchRequest will find any certificates that exist in
                    // the data source (dictated by the digests parameter). The results will
                    // be sent though back to the consumer as a batch - one message.
                    PrimaryMessage::CertificatesBatchRequest {
                        certificate_ids,
                        requestor,
                    } => {
                        let _ = self
                            .process_certificates(certificate_ids, requestor, true)
                            .await;
                    }
                    // A request that another primary sends us to ask whether we
                    // can serve batch data for the provided certificate_ids.
                    PrimaryMessage::PayloadAvailabilityRequest {
                        certificate_ids,
                        requestor,
                    } => {
                        let _ = self
                            .process_payload_availability(certificate_ids, requestor)
                            .await;
                    }
                    _ => {
                        panic!("Received unexpected message!");
                    }
                },

                result = self.rx_committee.changed() => {
                    result.expect("Committee channel dropped");
                    let message = self.rx_committee.borrow().clone();
                    match message {
                        ReconfigureNotification::NewEpoch(new_committee) => {
                            self.primary_network.cleanup(self.committee.network_diff(&new_committee));
                            self.committee = new_committee;
                        },
                        ReconfigureNotification::UpdateCommittee(new_committee) => {
                            self.primary_network.cleanup(self.committee.network_diff(&new_committee));
                            self.committee = new_committee;
                        },
                        ReconfigureNotification::Shutdown => return
                    }
                    tracing::debug!("Committee updated to {}", self.committee);
                }
            }
        }
    }

    /// Processes a payload availability request by checking we have the
    /// certificate & batch data for each certificate digest in digests,
    /// and reports on each fully available item in the request in a
    /// PayloadAvailabilityResponse.
    #[instrument(level="debug", skip_all, fields(origin = ?origin, num_certificate_ids = digests.len()), err)]
    async fn process_payload_availability(
        &mut self,
        digests: Vec<CertificateDigest>,
        origin: PublicKey,
    ) -> Result<(), HelperError> {
        let mut result: Vec<(CertificateDigest, bool)> = Vec::new();

        let certificates = match self.certificate_store.read_all(digests.to_owned()) {
            Ok(certificates) => certificates,
            Err(err) => {
                // just return at this point. Send back to the requestor
                // that we don't have availability - ideally we would like
                // to communicate an error (so they could potentially retry).
                result = digests.into_iter().map(|d| (d, false)).collect();

                let message = PrimaryMessage::PayloadAvailabilityResponse {
                    payload_availability: result,
                    from: self.name.clone(),
                };
                let _ = self
                    .primary_network
                    .unreliable_send(self.committee.network_key(&origin).unwrap(), &message);

                return Err(HelperError::StoreError(err));
            }
        };

        for (id, certificate_option) in digests.into_iter().zip(certificates) {
            // Find the batches only for the certificates that exist
            if let Some(certificate) = certificate_option {
                let payload_available = match self
                    .payload_store
                    .read_all(certificate.header.payload)
                    .await
                {
                    Ok(payload_result) => payload_result.into_iter().all(|x| x.is_some()),
                    Err(err) => {
                        // we'll assume that we don't have available the payloads,
                        // otherwise and error response should be sent back.
                        error!("Error while retrieving payloads: {err}");
                        false
                    }
                };

                result.push((id, payload_available));
            } else {
                // We don't have the certificate available in first place,
                // so we can't even look up for the batches.
                result.push((id, false));
            }
        }

        // now send the result back to the requestor
        let message = PrimaryMessage::PayloadAvailabilityResponse {
            payload_availability: result,
            from: self.name.clone(),
        };
        let _ = self
            .primary_network
            .unreliable_send(self.committee.network_key(&origin).unwrap(), &message);

        Ok(())
    }

    #[instrument(level="debug", skip_all, fields(origin = ?origin, num_certificate_ids = digests.len(), mode = batch_mode), err)]
    async fn process_certificates(
        &mut self,
        digests: Vec<CertificateDigest>,
        origin: PublicKey,
        batch_mode: bool,
    ) -> Result<(), HelperError> {
        if digests.is_empty() {
            return Err(HelperError::InvalidRequest(
                "empty digests received - ignore request".to_string(),
            ));
        }

        // TODO [issue #195]: Do some accounting to prevent bad nodes from monopolizing our resources.
        let certificates = match self.certificate_store.read_all(digests.to_owned()) {
            Ok(certificates) => certificates,
            Err(err) => {
                error!("Error while retrieving certificates: {err}");
                vec![]
            }
        };

        // When batch_mode = true, then the requested certificates will be sent back
        // to the consumer as one message over the network. For the non found
        // certificates only the digest will be sent instead.
        //
        // When batch_mode = false, then the requested certificates will be sent
        // back to the consumer as separate messages one by one. If a certificate
        // has not been found, then no message will be sent.
        if batch_mode {
            let response: Vec<(CertificateDigest, Option<Certificate>)> = if certificates.is_empty()
            {
                digests.into_iter().map(|c| (c, None)).collect()
            } else {
                digests.into_iter().zip(certificates).collect()
            };

            let message = PrimaryMessage::CertificatesBatchResponse {
                certificates: response,
                from: self.name.clone(),
            };

            let _ = self
                .primary_network
                .unreliable_send(self.committee.network_key(&origin).unwrap(), &message);
        } else {
            for certificate in certificates.into_iter().flatten() {
                // TODO: Remove this deserialization-serialization in the critical path.
                let message = PrimaryMessage::Certificate(certificate);
                let _ = self
                    .primary_network
                    .unreliable_send(self.committee.network_key(&origin).unwrap(), &message);
            }
        }

        Ok(())
    }
}