1use std::{
5 cmp::{max, min},
6 sync::Arc,
7 time::Duration,
8};
9
10use consensus_config::AuthorityIndex;
11use futures::{StreamExt as _, stream::FuturesUnordered};
12use tokio::{
13 sync::broadcast,
14 task::JoinSet,
15 time::{Instant, error::Elapsed, sleep_until, timeout},
16};
17use tracing::{trace, warn};
18
19use crate::{
20 block::{BlockAPI as _, ExtendedBlock, VerifiedBlock},
21 context::Context,
22 core::CoreSignalsReceivers,
23 error::ConsensusResult,
24 network::NetworkClient,
25};
26
27const BROADCAST_CONCURRENCY: usize = 10;
29
30pub(crate) struct Broadcaster {
35 senders: JoinSet<()>,
37}
38
39impl Broadcaster {
40 const LAST_BLOCK_RETRY_INTERVAL: Duration = Duration::from_secs(2);
41 const MIN_SEND_BLOCK_NETWORK_TIMEOUT: Duration = Duration::from_secs(5);
42
43 pub(crate) fn new<C: NetworkClient>(
44 context: Arc<Context>,
45 network_client: Arc<C>,
46 signals_receiver: &CoreSignalsReceivers,
47 ) -> Self {
48 let mut senders = JoinSet::new();
49 for (index, _authority) in context.committee.authorities() {
50 if index == context.own_index {
52 continue;
53 }
54 senders.spawn(Self::push_blocks(
55 context.clone(),
56 network_client.clone(),
57 signals_receiver.block_broadcast_receiver(),
58 index,
59 ));
60 }
61
62 Self { senders }
63 }
64
65 pub(crate) fn stop(&mut self) {
66 self.senders.abort_all();
68 }
69
70 async fn push_blocks<C: NetworkClient>(
75 context: Arc<Context>,
76 network_client: Arc<C>,
77 mut rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
78 peer: AuthorityIndex,
79 ) {
80 let peer_hostname = &context.committee.authority(peer).hostname;
81
82 let mut last_block: Option<VerifiedBlock> = None;
86
87 let mut retry_timer = tokio::time::interval(Self::LAST_BLOCK_RETRY_INTERVAL);
89 retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
90 retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
91
92 const RTT_ESTIMATE_DECAY: f64 = 0.95;
96 const TIMEOUT_THRESHOLD_MULTIPLIER: f64 = 2.0;
97 const TIMEOUT_RTT_INCREMENT_FACTOR: f64 = 1.6;
98 let mut rtt_estimate = Duration::from_millis(200);
99
100 let mut requests = FuturesUnordered::new();
101
102 async fn send_block<C: NetworkClient>(
103 network_client: Arc<C>,
104 peer: AuthorityIndex,
105 rtt_estimate: Duration,
106 block: VerifiedBlock,
107 ) -> (Result<ConsensusResult<()>, Elapsed>, Instant, VerifiedBlock) {
108 let start = Instant::now();
109 let req_timeout = rtt_estimate.mul_f64(TIMEOUT_THRESHOLD_MULTIPLIER);
110 let network_timeout =
112 std::cmp::max(req_timeout, Broadcaster::MIN_SEND_BLOCK_NETWORK_TIMEOUT);
113 let resp = timeout(
114 req_timeout,
115 network_client.send_block(peer, &block, network_timeout),
116 )
117 .await;
118 if matches!(resp, Ok(Err(_))) {
119 sleep_until(start + req_timeout).await;
121 }
122 (resp, start, block)
123 }
124
125 loop {
126 tokio::select! {
127 result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => {
128 let block = match result {
129 Ok(block) => block.block,
131 Err(broadcast::error::RecvError::Closed) => {
132 trace!("Sender to {peer} is shutting down!");
133 return;
134 }
135 Err(broadcast::error::RecvError::Lagged(e)) => {
136 warn!("Sender to {peer} is lagging! {e}");
137 continue;
139 }
140 };
141 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block.clone()));
142 if last_block.is_none() || last_block.as_ref().unwrap().round() < block.round() {
143 last_block = Some(block);
144 }
145 }
146
147 Some((resp, start, block)) = requests.next() => {
148 match resp {
149 Ok(Ok(_)) => {
150 let now = Instant::now();
151 rtt_estimate = rtt_estimate.mul_f64(RTT_ESTIMATE_DECAY) + (now - start).mul_f64(1.0 - RTT_ESTIMATE_DECAY);
152 retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
156 },
157 Err(Elapsed { .. }) => {
158 rtt_estimate = rtt_estimate.mul_f64(TIMEOUT_RTT_INCREMENT_FACTOR);
159 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
160 },
161 Ok(Err(_)) => {
162 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
163 },
164 };
165 }
166
167 _ = retry_timer.tick() => {
168 if requests.is_empty()
169 && let Some(block) = last_block.clone() {
170 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
171 }
172 }
173 };
174
175 rtt_estimate = min(rtt_estimate, Duration::from_secs(5));
177 rtt_estimate = max(rtt_estimate, Duration::from_millis(5));
178 context
179 .metrics
180 .node_metrics
181 .broadcaster_rtt_estimate_ms
182 .with_label_values(&[peer_hostname])
183 .set(rtt_estimate.as_millis() as i64);
184 }
185 }
186}
187
188#[cfg(test)]
189mod test {
190 use std::{collections::BTreeMap, ops::DerefMut, time::Duration};
191
192 use async_trait::async_trait;
193 use bytes::Bytes;
194 use consensus_types::block::{BlockRef, Round};
195 use parking_lot::Mutex;
196 use tokio::time::sleep;
197
198 use super::*;
199 use crate::{
200 block::{ExtendedBlock, TestBlock},
201 commit::CommitRange,
202 core::CoreSignals,
203 network::BlockStream,
204 };
205
206 struct FakeNetworkClient {
207 blocks_sent: Mutex<BTreeMap<AuthorityIndex, Vec<Bytes>>>,
208 }
209
210 impl FakeNetworkClient {
211 fn new() -> Self {
212 Self {
213 blocks_sent: Mutex::new(BTreeMap::new()),
214 }
215 }
216
217 fn blocks_sent(&self) -> BTreeMap<AuthorityIndex, Vec<Bytes>> {
218 let mut blocks_sent = self.blocks_sent.lock();
219 let result = std::mem::take(blocks_sent.deref_mut());
220 blocks_sent.clear();
221 result
222 }
223 }
224
225 #[async_trait]
226 impl NetworkClient for FakeNetworkClient {
227 const SUPPORT_STREAMING: bool = false;
228
229 async fn send_block(
230 &self,
231 peer: AuthorityIndex,
232 block: &VerifiedBlock,
233 _timeout: Duration,
234 ) -> ConsensusResult<()> {
235 let mut blocks_sent = self.blocks_sent.lock();
236 let blocks = blocks_sent.entry(peer).or_default();
237 blocks.push(block.serialized().clone());
238 Ok(())
239 }
240
241 async fn subscribe_blocks(
242 &self,
243 _peer: AuthorityIndex,
244 _last_received: Round,
245 _timeout: Duration,
246 ) -> ConsensusResult<BlockStream> {
247 unimplemented!("Unimplemented")
248 }
249
250 async fn fetch_blocks(
251 &self,
252 _peer: AuthorityIndex,
253 _block_refs: Vec<BlockRef>,
254 _highest_accepted_rounds: Vec<Round>,
255 _breadth_first: bool,
256 _timeout: Duration,
257 ) -> ConsensusResult<Vec<Bytes>> {
258 unimplemented!("Unimplemented")
259 }
260
261 async fn fetch_commits(
262 &self,
263 _peer: AuthorityIndex,
264 _commit_range: CommitRange,
265 _timeout: Duration,
266 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
267 unimplemented!("Unimplemented")
268 }
269
270 async fn fetch_latest_blocks(
271 &self,
272 _peer: AuthorityIndex,
273 _authorities: Vec<AuthorityIndex>,
274 _timeout: Duration,
275 ) -> ConsensusResult<Vec<Bytes>> {
276 unimplemented!("Unimplemented")
277 }
278
279 async fn get_latest_rounds(
280 &self,
281 _peer: AuthorityIndex,
282 _timeout: Duration,
283 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
284 unimplemented!("Unimplemented")
285 }
286 }
287
288 #[tokio::test(flavor = "current_thread", start_paused = true)]
289 async fn test_broadcaster() {
290 let (context, _keys) = Context::new_for_test(4);
291 let context = Arc::new(context);
292 let network_client = Arc::new(FakeNetworkClient::new());
293 let (core_signals, signals_receiver) = CoreSignals::new(context.clone());
294 let _broadcaster =
295 Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver);
296
297 let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build());
298 assert!(
299 core_signals
300 .new_block(ExtendedBlock {
301 block: block.clone(),
302 excluded_ancestors: vec![],
303 })
304 .is_ok(),
305 "No subscriber active to receive the block"
306 );
307
308 sleep(Duration::from_millis(1)).await;
310 let blocks_sent = network_client.blocks_sent();
311 for (index, _) in context.committee.authorities() {
312 if index == context.own_index {
313 continue;
314 }
315 assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
316 }
317
318 sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
320 let blocks_sent = network_client.blocks_sent();
321 for (index, _) in context.committee.authorities() {
322 if index == context.own_index {
323 continue;
324 }
325 assert!(!blocks_sent.contains_key(&index));
326 }
327
328 sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
330 let blocks_sent = network_client.blocks_sent();
331 for (index, _) in context.committee.authorities() {
332 if index == context.own_index {
333 continue;
334 }
335 assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
336 }
337 }
338}