consensus_core/
broadcaster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::{max, min},
6    sync::Arc,
7    time::Duration,
8};
9
10use consensus_config::AuthorityIndex;
11use futures::{StreamExt as _, stream::FuturesUnordered};
12use tokio::{
13    sync::broadcast,
14    task::JoinSet,
15    time::{Instant, error::Elapsed, sleep_until, timeout},
16};
17use tracing::{trace, warn};
18
19use crate::{
20    block::{BlockAPI as _, ExtendedBlock, VerifiedBlock},
21    context::Context,
22    core::CoreSignalsReceivers,
23    error::ConsensusResult,
24    network::NetworkClient,
25};
26
27/// Number of Blocks that can be inflight sending to a peer.
28const BROADCAST_CONCURRENCY: usize = 10;
29
30/// Broadcaster sends newly created blocks to each peer over the network.
31///
32/// For a peer that lags behind or is disconnected, blocks are buffered and retried until
33/// a limit is reached, then old blocks will get dropped from the buffer.
34pub(crate) struct Broadcaster {
35    // Background tasks listening for new blocks and pushing them to peers.
36    senders: JoinSet<()>,
37}
38
39impl Broadcaster {
40    const LAST_BLOCK_RETRY_INTERVAL: Duration = Duration::from_secs(2);
41    const MIN_SEND_BLOCK_NETWORK_TIMEOUT: Duration = Duration::from_secs(5);
42
43    pub(crate) fn new<C: NetworkClient>(
44        context: Arc<Context>,
45        network_client: Arc<C>,
46        signals_receiver: &CoreSignalsReceivers,
47    ) -> Self {
48        let mut senders = JoinSet::new();
49        for (index, _authority) in context.committee.authorities() {
50            // Skip sending Block to self.
51            if index == context.own_index {
52                continue;
53            }
54            senders.spawn(Self::push_blocks(
55                context.clone(),
56                network_client.clone(),
57                signals_receiver.block_broadcast_receiver(),
58                index,
59            ));
60        }
61
62        Self { senders }
63    }
64
65    pub(crate) fn stop(&mut self) {
66        // Intentionally not waiting for senders to exit, to speed up shutdown.
67        self.senders.abort_all();
68    }
69
70    /// Runs a loop that continously pushes new blocks received from the rx_block_broadcast
71    /// channel to the target peer.
72    ///
73    /// The loop does not exit until the validator is shutting down.
74    async fn push_blocks<C: NetworkClient>(
75        context: Arc<Context>,
76        network_client: Arc<C>,
77        mut rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
78        peer: AuthorityIndex,
79    ) {
80        let peer_hostname = &context.committee.authority(peer).hostname;
81
82        // Record the last block to be broadcasted, to retry in case no new block is produced for awhile.
83        // Even if the peer has acknowledged the last block, the block might have been dropped afterwards
84        // if the peer crashed.
85        let mut last_block: Option<VerifiedBlock> = None;
86
87        // Retry last block with an interval.
88        let mut retry_timer = tokio::time::interval(Self::LAST_BLOCK_RETRY_INTERVAL);
89        retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
90        retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
91
92        // Use a simple exponential-decay RTT estimator to adjust the timeout for each block sent.
93        // The estimation logic will be removed once the underlying transport switches to use
94        // streaming and the streaming implementation can be relied upon for retries.
95        const RTT_ESTIMATE_DECAY: f64 = 0.95;
96        const TIMEOUT_THRESHOLD_MULTIPLIER: f64 = 2.0;
97        const TIMEOUT_RTT_INCREMENT_FACTOR: f64 = 1.6;
98        let mut rtt_estimate = Duration::from_millis(200);
99
100        let mut requests = FuturesUnordered::new();
101
102        async fn send_block<C: NetworkClient>(
103            network_client: Arc<C>,
104            peer: AuthorityIndex,
105            rtt_estimate: Duration,
106            block: VerifiedBlock,
107        ) -> (Result<ConsensusResult<()>, Elapsed>, Instant, VerifiedBlock) {
108            let start = Instant::now();
109            let req_timeout = rtt_estimate.mul_f64(TIMEOUT_THRESHOLD_MULTIPLIER);
110            // Use a minimum timeout of 5s so the receiver does not terminate the request too early.
111            let network_timeout =
112                std::cmp::max(req_timeout, Broadcaster::MIN_SEND_BLOCK_NETWORK_TIMEOUT);
113            let resp = timeout(
114                req_timeout,
115                network_client.send_block(peer, &block, network_timeout),
116            )
117            .await;
118            if matches!(resp, Ok(Err(_))) {
119                // Add a delay before retrying.
120                sleep_until(start + req_timeout).await;
121            }
122            (resp, start, block)
123        }
124
125        loop {
126            tokio::select! {
127                result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => {
128                    let block = match result {
129                        // Other info from ExtendedBlock are ignored, because Broadcaster is not used in production.
130                        Ok(block) => block.block,
131                        Err(broadcast::error::RecvError::Closed) => {
132                            trace!("Sender to {peer} is shutting down!");
133                            return;
134                        }
135                        Err(broadcast::error::RecvError::Lagged(e)) => {
136                            warn!("Sender to {peer} is lagging! {e}");
137                            // Re-run the loop to receive again.
138                            continue;
139                        }
140                    };
141                    requests.push(send_block(network_client.clone(), peer, rtt_estimate, block.clone()));
142                    if last_block.is_none() || last_block.as_ref().unwrap().round() < block.round() {
143                        last_block = Some(block);
144                    }
145                }
146
147                Some((resp, start, block)) = requests.next() => {
148                    match resp {
149                        Ok(Ok(_)) => {
150                            let now = Instant::now();
151                            rtt_estimate = rtt_estimate.mul_f64(RTT_ESTIMATE_DECAY) + (now - start).mul_f64(1.0 - RTT_ESTIMATE_DECAY);
152                            // Avoid immediately retrying a successfully sent block.
153                            // Resetting timer is unnecessary otherwise because there are
154                            // additional inflight requests.
155                            retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
156                        },
157                        Err(Elapsed { .. }) => {
158                            rtt_estimate = rtt_estimate.mul_f64(TIMEOUT_RTT_INCREMENT_FACTOR);
159                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
160                        },
161                        Ok(Err(_)) => {
162                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
163                        },
164                    };
165                }
166
167                _ = retry_timer.tick() => {
168                    if requests.is_empty()
169                        && let Some(block) = last_block.clone() {
170                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
171                        }
172                }
173            };
174
175            // Limit RTT estimate to be between 5ms and 5s.
176            rtt_estimate = min(rtt_estimate, Duration::from_secs(5));
177            rtt_estimate = max(rtt_estimate, Duration::from_millis(5));
178            context
179                .metrics
180                .node_metrics
181                .broadcaster_rtt_estimate_ms
182                .with_label_values(&[peer_hostname])
183                .set(rtt_estimate.as_millis() as i64);
184        }
185    }
186}
187
188#[cfg(test)]
189mod test {
190    use std::{collections::BTreeMap, ops::DerefMut, time::Duration};
191
192    use async_trait::async_trait;
193    use bytes::Bytes;
194    use consensus_types::block::{BlockRef, Round};
195    use parking_lot::Mutex;
196    use tokio::time::sleep;
197
198    use super::*;
199    use crate::{
200        block::{ExtendedBlock, TestBlock},
201        commit::CommitRange,
202        core::CoreSignals,
203        network::BlockStream,
204    };
205
206    struct FakeNetworkClient {
207        blocks_sent: Mutex<BTreeMap<AuthorityIndex, Vec<Bytes>>>,
208    }
209
210    impl FakeNetworkClient {
211        fn new() -> Self {
212            Self {
213                blocks_sent: Mutex::new(BTreeMap::new()),
214            }
215        }
216
217        fn blocks_sent(&self) -> BTreeMap<AuthorityIndex, Vec<Bytes>> {
218            let mut blocks_sent = self.blocks_sent.lock();
219            let result = std::mem::take(blocks_sent.deref_mut());
220            blocks_sent.clear();
221            result
222        }
223    }
224
225    #[async_trait]
226    impl NetworkClient for FakeNetworkClient {
227        const SUPPORT_STREAMING: bool = false;
228
229        async fn send_block(
230            &self,
231            peer: AuthorityIndex,
232            block: &VerifiedBlock,
233            _timeout: Duration,
234        ) -> ConsensusResult<()> {
235            let mut blocks_sent = self.blocks_sent.lock();
236            let blocks = blocks_sent.entry(peer).or_default();
237            blocks.push(block.serialized().clone());
238            Ok(())
239        }
240
241        async fn subscribe_blocks(
242            &self,
243            _peer: AuthorityIndex,
244            _last_received: Round,
245            _timeout: Duration,
246        ) -> ConsensusResult<BlockStream> {
247            unimplemented!("Unimplemented")
248        }
249
250        async fn fetch_blocks(
251            &self,
252            _peer: AuthorityIndex,
253            _block_refs: Vec<BlockRef>,
254            _highest_accepted_rounds: Vec<Round>,
255            _breadth_first: bool,
256            _timeout: Duration,
257        ) -> ConsensusResult<Vec<Bytes>> {
258            unimplemented!("Unimplemented")
259        }
260
261        async fn fetch_commits(
262            &self,
263            _peer: AuthorityIndex,
264            _commit_range: CommitRange,
265            _timeout: Duration,
266        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
267            unimplemented!("Unimplemented")
268        }
269
270        async fn fetch_latest_blocks(
271            &self,
272            _peer: AuthorityIndex,
273            _authorities: Vec<AuthorityIndex>,
274            _timeout: Duration,
275        ) -> ConsensusResult<Vec<Bytes>> {
276            unimplemented!("Unimplemented")
277        }
278
279        async fn get_latest_rounds(
280            &self,
281            _peer: AuthorityIndex,
282            _timeout: Duration,
283        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
284            unimplemented!("Unimplemented")
285        }
286    }
287
288    #[tokio::test(flavor = "current_thread", start_paused = true)]
289    async fn test_broadcaster() {
290        let (context, _keys) = Context::new_for_test(4);
291        let context = Arc::new(context);
292        let network_client = Arc::new(FakeNetworkClient::new());
293        let (core_signals, signals_receiver) = CoreSignals::new(context.clone());
294        let _broadcaster =
295            Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver);
296
297        let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build());
298        assert!(
299            core_signals
300                .new_block(ExtendedBlock {
301                    block: block.clone(),
302                    excluded_ancestors: vec![],
303                })
304                .is_ok(),
305            "No subscriber active to receive the block"
306        );
307
308        // block should be broadcasted immediately to all peers.
309        sleep(Duration::from_millis(1)).await;
310        let blocks_sent = network_client.blocks_sent();
311        for (index, _) in context.committee.authorities() {
312            if index == context.own_index {
313                continue;
314            }
315            assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
316        }
317
318        // block should not be re-broadcasted ...
319        sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
320        let blocks_sent = network_client.blocks_sent();
321        for (index, _) in context.committee.authorities() {
322            if index == context.own_index {
323                continue;
324            }
325            assert!(!blocks_sent.contains_key(&index));
326        }
327
328        // ... until LAST_BLOCK_RETRY_INTERVAL
329        sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
330        let blocks_sent = network_client.blocks_sent();
331        for (index, _) in context.committee.authorities() {
332            if index == context.own_index {
333                continue;
334            }
335            assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
336        }
337    }
338}