consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{
4    collections::{BTreeMap, BTreeSet, HashMap},
5    sync::Arc,
6    time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::debug_fatal;
15use mysten_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25    runtime::Handle,
26    sync::{mpsc::error::TrySendError, oneshot},
27    task::{JoinError, JoinSet},
28    time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::{
33    BlockAPI,
34    block::{SignedBlock, VerifiedBlock},
35    block_verifier::BlockVerifier,
36    commit_vote_monitor::CommitVoteMonitor,
37    context::Context,
38    dag_state::DagState,
39    error::{ConsensusError, ConsensusResult},
40    network::NetworkClient,
41};
42use crate::{
43    authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
44    transaction_certifier::TransactionCertifier,
45};
46
47/// The number of concurrent fetch blocks requests per authority
48const FETCH_BLOCKS_CONCURRENCY: usize = 5;
49
50/// Timeouts when fetching blocks.
51const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
52const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
53
54const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
55
56// Max number of peers to request missing blocks concurrently in periodic sync.
57const MAX_PERIODIC_SYNC_PEERS: usize = 3;
58
59struct BlocksGuard {
60    map: Arc<InflightBlocksMap>,
61    block_refs: BTreeSet<BlockRef>,
62    peer: AuthorityIndex,
63}
64
65impl Drop for BlocksGuard {
66    fn drop(&mut self) {
67        self.map.unlock_blocks(&self.block_refs, self.peer);
68    }
69}
70
71// Keeps a mapping between the missing blocks that have been instructed to be fetched and the authorities
72// that are currently fetching them. For a block ref there is a maximum number of authorities that can
73// concurrently fetch it. The authority ids that are currently fetching a block are set on the corresponding
74// `BTreeSet` and basically they act as "locks".
75struct InflightBlocksMap {
76    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
77}
78
79impl InflightBlocksMap {
80    fn new() -> Arc<Self> {
81        Arc::new(Self {
82            inner: Mutex::new(HashMap::new()),
83        })
84    }
85
86    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to avoid re-fetching the
87    /// missing blocks from too many authorities at the same time, thus we limit the concurrency
88    /// per block by attempting to lock per block. If a block is already fetched by the maximum allowed
89    /// number of authorities, then the block ref will not be included in the returned set. The method
90    /// returns all the block refs that have been successfully locked and allowed to be fetched.
91    fn lock_blocks(
92        self: &Arc<Self>,
93        missing_block_refs: BTreeSet<BlockRef>,
94        peer: AuthorityIndex,
95    ) -> Option<BlocksGuard> {
96        let mut blocks = BTreeSet::new();
97        let mut inner = self.inner.lock();
98
99        for block_ref in missing_block_refs {
100            // check that the number of authorities that are already instructed to fetch the block is not
101            // higher than the allowed and the `peer_index` has not already been instructed to do that.
102            let authorities = inner.entry(block_ref).or_default();
103            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
104                && authorities.get(&peer).is_none()
105            {
106                assert!(authorities.insert(peer));
107                blocks.insert(block_ref);
108            }
109        }
110
111        if blocks.is_empty() {
112            None
113        } else {
114            Some(BlocksGuard {
115                map: self.clone(),
116                block_refs: blocks,
117                peer,
118            })
119        }
120    }
121
122    /// Unlocks the provided block references for the given `peer`. The unlocking is strict, meaning that
123    /// if this method is called for a specific block ref and peer more times than the corresponding lock
124    /// has been called, it will panic.
125    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
126        // Now mark all the blocks as fetched from the map
127        let mut blocks_to_fetch = self.inner.lock();
128        for block_ref in block_refs {
129            let authorities = blocks_to_fetch
130                .get_mut(block_ref)
131                .expect("Should have found a non empty map");
132
133            assert!(authorities.remove(&peer), "Peer index should be present!");
134
135            // if the last one then just clean up
136            if authorities.is_empty() {
137                blocks_to_fetch.remove(block_ref);
138            }
139        }
140    }
141
142    /// Drops the provided `blocks_guard` which will force to unlock the blocks, and lock now again the
143    /// referenced block refs. The swap is best effort and there is no guarantee that the `peer` will
144    /// be able to acquire the new locks.
145    fn swap_locks(
146        self: &Arc<Self>,
147        blocks_guard: BlocksGuard,
148        peer: AuthorityIndex,
149    ) -> Option<BlocksGuard> {
150        let block_refs = blocks_guard.block_refs.clone();
151
152        // Explicitly drop the guard
153        drop(blocks_guard);
154
155        // Now create new guard
156        self.lock_blocks(block_refs, peer)
157    }
158
159    #[cfg(test)]
160    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
161        let inner = self.inner.lock();
162        inner.len()
163    }
164}
165
166enum Command {
167    FetchBlocks {
168        missing_block_refs: BTreeSet<BlockRef>,
169        peer_index: AuthorityIndex,
170        result: oneshot::Sender<Result<(), ConsensusError>>,
171    },
172    FetchOwnLastBlock,
173    KickOffScheduler,
174}
175
176pub(crate) struct SynchronizerHandle {
177    commands_sender: Sender<Command>,
178    tasks: tokio::sync::Mutex<JoinSet<()>>,
179}
180
181impl SynchronizerHandle {
182    /// Explicitly asks from the synchronizer to fetch the blocks - provided the block_refs set - from
183    /// the peer authority.
184    pub(crate) async fn fetch_blocks(
185        &self,
186        missing_block_refs: BTreeSet<BlockRef>,
187        peer_index: AuthorityIndex,
188    ) -> ConsensusResult<()> {
189        let (sender, receiver) = oneshot::channel();
190        self.commands_sender
191            .send(Command::FetchBlocks {
192                missing_block_refs,
193                peer_index,
194                result: sender,
195            })
196            .await
197            .map_err(|_err| ConsensusError::Shutdown)?;
198        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
199    }
200
201    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
202        let mut tasks = self.tasks.lock().await;
203        tasks.abort_all();
204        while let Some(result) = tasks.join_next().await {
205            result?
206        }
207        Ok(())
208    }
209}
210
211/// `Synchronizer` oversees live block synchronization, crucial for node progress. Live synchronization
212/// refers to the process of retrieving missing blocks, particularly those essential for advancing a node
213/// when data from only a few rounds is absent. If a node significantly lags behind the network,
214/// `commit_syncer` handles fetching missing blocks via a more efficient approach. `Synchronizer`
215/// aims for swift catch-up employing two mechanisms:
216///
217/// 1. Explicitly requesting missing blocks from designated authorities via the "block send" path.
218///    This includes attempting to fetch any missing ancestors necessary for processing a received block.
219///    Such requests prioritize the block author, maximizing the chance of prompt retrieval.
220///    A locking mechanism allows concurrent requests for missing blocks from up to two authorities
221///    simultaneously, enhancing the chances of timely retrieval. Notably, if additional missing blocks
222///    arise during block processing, requests to the same authority are deferred to the scheduler.
223///
224/// 2. Periodically requesting missing blocks via a scheduler. This primarily serves to retrieve
225///    missing blocks that were not ancestors of a received block via the "block send" path.
226///    The scheduler operates on either a fixed periodic basis or is triggered immediately
227///    after explicit fetches described in (1), ensuring continued block retrieval if gaps persist.
228///
229/// Additionally to the above, the synchronizer can synchronize and fetch the last own proposed block
230/// from the network peers as best effort approach to recover node from amnesia and avoid making the
231/// node equivocate.
232pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> {
233    context: Arc<Context>,
234    commands_receiver: Receiver<Command>,
235    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
236    core_dispatcher: Arc<D>,
237    commit_vote_monitor: Arc<CommitVoteMonitor>,
238    dag_state: Arc<RwLock<DagState>>,
239    fetch_blocks_scheduler_task: JoinSet<()>,
240    fetch_own_last_block_task: JoinSet<()>,
241    network_client: Arc<C>,
242    block_verifier: Arc<V>,
243    transaction_certifier: TransactionCertifier,
244    inflight_blocks_map: Arc<InflightBlocksMap>,
245    commands_sender: Sender<Command>,
246}
247
248impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
249    pub(crate) fn start(
250        network_client: Arc<C>,
251        context: Arc<Context>,
252        core_dispatcher: Arc<D>,
253        commit_vote_monitor: Arc<CommitVoteMonitor>,
254        block_verifier: Arc<V>,
255        transaction_certifier: TransactionCertifier,
256        dag_state: Arc<RwLock<DagState>>,
257        sync_last_known_own_block: bool,
258    ) -> Arc<SynchronizerHandle> {
259        let (commands_sender, commands_receiver) =
260            channel("consensus_synchronizer_commands", 1_000);
261        let inflight_blocks_map = InflightBlocksMap::new();
262
263        // Spawn the tasks to fetch the blocks from the others
264        let mut fetch_block_senders = BTreeMap::new();
265        let mut tasks = JoinSet::new();
266        for (index, _) in context.committee.authorities() {
267            if index == context.own_index {
268                continue;
269            }
270            let (sender, receiver) =
271                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
272            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
273                index,
274                network_client.clone(),
275                block_verifier.clone(),
276                transaction_certifier.clone(),
277                commit_vote_monitor.clone(),
278                context.clone(),
279                core_dispatcher.clone(),
280                dag_state.clone(),
281                receiver,
282                commands_sender.clone(),
283            );
284            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
285            fetch_block_senders.insert(index, sender);
286        }
287
288        let commands_sender_clone = commands_sender.clone();
289
290        if sync_last_known_own_block {
291            commands_sender
292                .try_send(Command::FetchOwnLastBlock)
293                .expect("Failed to sync our last block");
294        }
295
296        // Spawn the task to listen to the requests & periodic runs
297        tasks.spawn(monitored_future!(async move {
298            let mut s = Self {
299                context,
300                commands_receiver,
301                fetch_block_senders,
302                core_dispatcher,
303                commit_vote_monitor,
304                fetch_blocks_scheduler_task: JoinSet::new(),
305                fetch_own_last_block_task: JoinSet::new(),
306                network_client,
307                block_verifier,
308                transaction_certifier,
309                inflight_blocks_map,
310                commands_sender: commands_sender_clone,
311                dag_state,
312            };
313            s.run().await;
314        }));
315
316        Arc::new(SynchronizerHandle {
317            commands_sender,
318            tasks: tokio::sync::Mutex::new(tasks),
319        })
320    }
321
322    // The main loop to listen for the submitted commands.
323    async fn run(&mut self) {
324        // We want the synchronizer to run periodically every 200ms to fetch any missing blocks.
325        const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
326        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
327
328        tokio::pin!(scheduler_timeout);
329
330        loop {
331            tokio::select! {
332                Some(command) = self.commands_receiver.recv() => {
333                    match command {
334                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
335                            if peer_index == self.context.own_index {
336                                error!("We should never attempt to fetch blocks from our own node");
337                                continue;
338                            }
339
340                            // Keep only the max allowed blocks to request. Additional missing blocks
341                            // will be fetched via periodic sync.
342                            // Fetch from the lowest to highest round, to ensure progress.
343                            let missing_block_refs = missing_block_refs
344                                .into_iter()
345                                .take(self.context.parameters.max_blocks_per_sync)
346                                .collect();
347
348                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
349                            let Some(blocks_guard) = blocks_guard else {
350                                result.send(Ok(())).ok();
351                                continue;
352                            };
353
354                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
355                            // synchronization task will handle any still missing blocks in next run.
356                            let r = self
357                                .fetch_block_senders
358                                .get(&peer_index)
359                                .expect("Fatal error, sender should be present")
360                                .try_send(blocks_guard)
361                                .map_err(|err| {
362                                    match err {
363                                        TrySendError::Full(_) => {
364                                            let peer_hostname = &self.context.committee.authority(peer_index).hostname;
365                                            self.context
366                                                .metrics
367                                                .node_metrics
368                                                .synchronizer_skipped_fetch_requests
369                                                .with_label_values(&[peer_hostname])
370                                                .inc();
371                                            ConsensusError::SynchronizerSaturated(peer_index)
372                                        },
373                                        TrySendError::Closed(_) => ConsensusError::Shutdown
374                                    }
375                                });
376
377                            result.send(r).ok();
378                        }
379                        Command::FetchOwnLastBlock => {
380                            if self.fetch_own_last_block_task.is_empty() {
381                                self.start_fetch_own_last_block_task();
382                            }
383                        }
384                        Command::KickOffScheduler => {
385                            // just reset the scheduler timeout timer to run immediately if not already running.
386                            // If the scheduler is already running then just reduce the remaining time to run.
387                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
388                                Instant::now()
389                            } else {
390                                Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
391                            };
392
393                            // only reset if it is earlier than the next deadline
394                            if timeout < scheduler_timeout.deadline() {
395                                scheduler_timeout.as_mut().reset(timeout);
396                            }
397                        }
398                    }
399                },
400                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
401                    match result {
402                        Ok(()) => {},
403                        Err(e) => {
404                            if e.is_cancelled() {
405                            } else if e.is_panic() {
406                                std::panic::resume_unwind(e.into_panic());
407                            } else {
408                                panic!("fetch our last block task failed: {e}");
409                            }
410                        },
411                    };
412                },
413                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
414                    match result {
415                        Ok(()) => {},
416                        Err(e) => {
417                            if e.is_cancelled() {
418                            } else if e.is_panic() {
419                                std::panic::resume_unwind(e.into_panic());
420                            } else {
421                                panic!("fetch blocks scheduler task failed: {e}");
422                            }
423                        },
424                    };
425                },
426                () = &mut scheduler_timeout => {
427                    // we want to start a new task only if the previous one has already finished.
428                    // TODO: consider starting backup fetches in parallel, when a fetch takes too long?
429                    if self.fetch_blocks_scheduler_task.is_empty()
430                        && let Err(err) = self.start_fetch_missing_blocks_task().await {
431                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
432                            return;
433                        };
434
435                    scheduler_timeout
436                        .as_mut()
437                        .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
438                }
439            }
440        }
441    }
442
443    async fn fetch_blocks_from_authority(
444        peer_index: AuthorityIndex,
445        network_client: Arc<C>,
446        block_verifier: Arc<V>,
447        transaction_certifier: TransactionCertifier,
448        commit_vote_monitor: Arc<CommitVoteMonitor>,
449        context: Arc<Context>,
450        core_dispatcher: Arc<D>,
451        dag_state: Arc<RwLock<DagState>>,
452        mut receiver: Receiver<BlocksGuard>,
453        commands_sender: Sender<Command>,
454    ) {
455        const MAX_RETRIES: u32 = 3;
456        let peer_hostname = &context.committee.authority(peer_index).hostname;
457        let mut requests = FuturesUnordered::new();
458
459        loop {
460            tokio::select! {
461                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
462                    // get the highest accepted rounds
463                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
464
465                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
466                },
467                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
468                    match response {
469                        Ok(blocks) => {
470                            if let Err(err) = Self::process_fetched_blocks(blocks,
471                                peer_index,
472                                blocks_guard,
473                                core_dispatcher.clone(),
474                                block_verifier.clone(),
475                                transaction_certifier.clone(),
476                                commit_vote_monitor.clone(),
477                                context.clone(),
478                                commands_sender.clone(),
479                                "live"
480                            ).await {
481                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
482                                context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer_hostname, "live"]).inc();
483                            }
484                        },
485                        Err(_) => {
486                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "live"]).inc();
487                            if retries <= MAX_RETRIES {
488                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
489                            } else {
490                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
491                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
492                                drop(blocks_guard);
493                            }
494                        }
495                    }
496                },
497                else => {
498                    info!("Fetching blocks from authority {peer_index} task will now abort.");
499                    break;
500                }
501            }
502        }
503    }
504
505    /// Processes the requested raw fetched blocks from peer `peer_index`. If no error is returned then
506    /// the verified blocks are immediately sent to Core for processing.
507    async fn process_fetched_blocks(
508        mut serialized_blocks: Vec<Bytes>,
509        peer_index: AuthorityIndex,
510        requested_blocks_guard: BlocksGuard,
511        core_dispatcher: Arc<D>,
512        block_verifier: Arc<V>,
513        transaction_certifier: TransactionCertifier,
514        commit_vote_monitor: Arc<CommitVoteMonitor>,
515        context: Arc<Context>,
516        commands_sender: Sender<Command>,
517        sync_method: &str,
518    ) -> ConsensusResult<()> {
519        if serialized_blocks.is_empty() {
520            return Ok(());
521        }
522
523        // Limit the number of the returned blocks processed.
524        serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
525
526        // Verify all the fetched blocks
527        let blocks = Handle::current()
528            .spawn_blocking({
529                let block_verifier = block_verifier.clone();
530                let context = context.clone();
531                move || {
532                    Self::verify_blocks(
533                        serialized_blocks,
534                        block_verifier,
535                        transaction_certifier,
536                        &context,
537                        peer_index,
538                    )
539                }
540            })
541            .await
542            .expect("Spawn blocking should not fail")?;
543
544        // Record commit votes from the verified blocks.
545        for block in &blocks {
546            commit_vote_monitor.observe_block(block);
547        }
548
549        let metrics = &context.metrics.node_metrics;
550        let peer_hostname = &context.committee.authority(peer_index).hostname;
551        metrics
552            .synchronizer_fetched_blocks_by_peer
553            .with_label_values(&[peer_hostname, sync_method])
554            .inc_by(blocks.len() as u64);
555        for block in &blocks {
556            let block_hostname = &context.committee.authority(block.author()).hostname;
557            metrics
558                .synchronizer_fetched_blocks_by_authority
559                .with_label_values(&[block_hostname, sync_method])
560                .inc();
561        }
562
563        debug!(
564            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
565            blocks.len(),
566            blocks.iter().map(|b| b.reference().to_string()).join(", "),
567        );
568
569        // Now send them to core for processing. Ignore the returned missing blocks as we don't want
570        // this mechanism to keep feedback looping on fetching more blocks. The periodic synchronization
571        // will take care of that.
572        let missing_blocks = core_dispatcher
573            .add_blocks(blocks)
574            .await
575            .map_err(|_| ConsensusError::Shutdown)?;
576
577        // now release all the locked blocks as they have been fetched, verified & processed
578        drop(requested_blocks_guard);
579
580        // kick off immediately the scheduled synchronizer
581        if !missing_blocks.is_empty() {
582            // do not block here, so we avoid any possible cycles.
583            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
584            {
585                warn!("Commands channel is full")
586            }
587        }
588
589        context
590            .metrics
591            .node_metrics
592            .missing_blocks_after_fetch_total
593            .inc_by(missing_blocks.len() as u64);
594
595        Ok(())
596    }
597
598    fn get_highest_accepted_rounds(
599        dag_state: Arc<RwLock<DagState>>,
600        context: &Arc<Context>,
601    ) -> Vec<Round> {
602        let blocks = dag_state
603            .read()
604            .get_last_cached_block_per_authority(Round::MAX);
605        assert_eq!(blocks.len(), context.committee.size());
606
607        blocks
608            .into_iter()
609            .map(|(block, _)| block.round())
610            .collect::<Vec<_>>()
611    }
612
613    fn verify_blocks(
614        serialized_blocks: Vec<Bytes>,
615        block_verifier: Arc<V>,
616        transaction_certifier: TransactionCertifier,
617        context: &Context,
618        peer_index: AuthorityIndex,
619    ) -> ConsensusResult<Vec<VerifiedBlock>> {
620        let mut verified_blocks = Vec::new();
621        let mut voted_blocks = Vec::new();
622        for serialized_block in serialized_blocks {
623            let signed_block: SignedBlock =
624                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
625
626            // TODO: cache received and verified block refs to avoid duplicated work.
627            let (verified_block, reject_txn_votes) = block_verifier
628                .verify_and_vote(signed_block, serialized_block)
629                .tap_err(|e| {
630                    let hostname = context.committee.authority(peer_index).hostname.clone();
631                    context
632                        .metrics
633                        .node_metrics
634                        .invalid_blocks
635                        .with_label_values(&[&hostname, "synchronizer", e.clone().name()])
636                        .inc();
637                    info!("Invalid block received from {}: {}", peer_index, e);
638                })?;
639
640            // TODO: improve efficiency, maybe suspend and continue processing the block asynchronously.
641            let now = context.clock.timestamp_utc_ms();
642            let drift = verified_block.timestamp_ms().saturating_sub(now);
643            if drift > 0 {
644                let peer_hostname = &context
645                    .committee
646                    .authority(verified_block.author())
647                    .hostname;
648                context
649                    .metrics
650                    .node_metrics
651                    .block_timestamp_drift_ms
652                    .with_label_values(&[peer_hostname, "synchronizer"])
653                    .inc_by(drift);
654
655                trace!(
656                    "Synced block {} timestamp {} is in the future (now={}).",
657                    verified_block.reference(),
658                    verified_block.timestamp_ms(),
659                    now
660                );
661            }
662
663            verified_blocks.push(verified_block.clone());
664            voted_blocks.push((verified_block, reject_txn_votes));
665        }
666
667        if context.protocol_config.mysticeti_fastpath() {
668            transaction_certifier.add_voted_blocks(voted_blocks);
669        }
670
671        Ok(verified_blocks)
672    }
673
674    async fn fetch_blocks_request(
675        network_client: Arc<C>,
676        peer: AuthorityIndex,
677        blocks_guard: BlocksGuard,
678        highest_rounds: Vec<Round>,
679        breadth_first: bool,
680        request_timeout: Duration,
681        mut retries: u32,
682    ) -> (
683        ConsensusResult<Vec<Bytes>>,
684        BlocksGuard,
685        u32,
686        AuthorityIndex,
687        Vec<Round>,
688    ) {
689        let start = Instant::now();
690        let resp = timeout(
691            request_timeout,
692            network_client.fetch_blocks(
693                peer,
694                blocks_guard
695                    .block_refs
696                    .clone()
697                    .into_iter()
698                    .collect::<Vec<_>>(),
699                highest_rounds.clone().into_iter().collect::<Vec<_>>(),
700                breadth_first,
701                request_timeout,
702            ),
703        )
704        .await;
705
706        fail_point_async!("consensus-delay");
707
708        let resp = match resp {
709            Ok(Err(err)) => {
710                // Add a delay before retrying - if that is needed. If request has timed out then eventually
711                // this will be a no-op.
712                sleep_until(start + request_timeout).await;
713                retries += 1;
714                Err(err)
715            } // network error
716            Err(err) => {
717                // timeout
718                sleep_until(start + request_timeout).await;
719                retries += 1;
720                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
721            }
722            Ok(result) => result,
723        };
724        (resp, blocks_guard, retries, peer, highest_rounds)
725    }
726
727    fn start_fetch_own_last_block_task(&mut self) {
728        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
729        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
730
731        let context = self.context.clone();
732        let dag_state = self.dag_state.clone();
733        let network_client = self.network_client.clone();
734        let block_verifier = self.block_verifier.clone();
735        let core_dispatcher = self.core_dispatcher.clone();
736
737        self.fetch_own_last_block_task
738            .spawn(monitored_future!(async move {
739                let _scope = monitored_scope("FetchOwnLastBlockTask");
740
741                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
742                    let network_client_cloned = network_client.clone();
743                    let own_index = context.own_index;
744                    async move {
745                        sleep(fetch_own_block_delay).await;
746                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
747                        (r, authority_index)
748                    }
749                };
750
751                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
752                    let mut result = Vec::new();
753                    for serialized_block in blocks {
754                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
755                        let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
756                            let hostname = context.committee.authority(authority_index).hostname.clone();
757                            context
758                                .metrics
759                                .node_metrics
760                                .invalid_blocks
761                                .with_label_values(&[&hostname, "synchronizer_own_block", err.clone().name()])
762                                .inc();
763                            warn!("Invalid block received from {}: {}", authority_index, err);
764                        })?;
765
766                        if verified_block.author() != context.own_index {
767                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
768                        }
769                        result.push(verified_block);
770                    }
771                    Ok(result)
772                };
773
774                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
775                let mut highest_round;
776                let mut retries = 0;
777                let mut retry_delay_step = Duration::from_millis(500);
778                'main:loop {
779                    if context.committee.size() == 1 {
780                        highest_round = dag_state.read().get_last_proposed_block().round();
781                        info!("Only one node in the network, will not try fetching own last block from peers.");
782                        break 'main;
783                    }
784
785                    let mut total_stake = 0;
786                    highest_round = 0;
787
788                    // Ask all the other peers about our last block
789                    let mut results = FuturesUnordered::new();
790
791                    for (authority_index, _authority) in context.committee.authorities() {
792                        if authority_index != context.own_index {
793                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
794                        }
795                    }
796
797                    // Gather the results but wait to timeout as well
798                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
799                    tokio::pin!(timer);
800
801                    'inner: loop {
802                        tokio::select! {
803                            result = results.next() => {
804                                let Some((result, authority_index)) = result else {
805                                    break 'inner;
806                                };
807                                match result {
808                                    Ok(result) => {
809                                        match process_blocks(result, authority_index) {
810                                            Ok(blocks) => {
811                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
812                                                highest_round = highest_round.max(max_round);
813
814                                                total_stake += context.committee.stake(authority_index);
815                                            },
816                                            Err(err) => {
817                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
818                                            }
819                                        }
820                                    },
821                                    Err(err) => {
822                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
823                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
824                                    }
825                                }
826                            },
827                            () = &mut timer => {
828                                info!("Timeout while trying to sync our own last block from peers");
829                                break 'inner;
830                            }
831                        }
832                    }
833
834                    // Request at least f+1 stake to have replied back.
835                    if context.committee.reached_validity(total_stake) {
836                        info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
837                        break 'main;
838                    }
839
840                    retries += 1;
841                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
842                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
843
844                    sleep(retry_delay_step).await;
845
846                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
847                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
848                }
849
850                // Update the Core with the highest detected round
851                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
852
853                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
854                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
855                }
856            }));
857    }
858
859    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
860        if self.context.committee.size() == 1 {
861            trace!(
862                "Only one node in the network, will not try fetching missing blocks from peers."
863            );
864            return Ok(());
865        }
866
867        let missing_blocks = self
868            .core_dispatcher
869            .get_missing_blocks()
870            .await
871            .map_err(|_err| ConsensusError::Shutdown)?;
872
873        // No reason to kick off the scheduler if there are no missing blocks to fetch
874        if missing_blocks.is_empty() {
875            return Ok(());
876        }
877
878        let context = self.context.clone();
879        let network_client = self.network_client.clone();
880        let block_verifier = self.block_verifier.clone();
881        let transaction_certifier = self.transaction_certifier.clone();
882        let commit_vote_monitor = self.commit_vote_monitor.clone();
883        let core_dispatcher = self.core_dispatcher.clone();
884        let blocks_to_fetch = self.inflight_blocks_map.clone();
885        let commands_sender = self.commands_sender.clone();
886        let dag_state = self.dag_state.clone();
887
888        // If we are commit lagging, then we don't want to enable the scheduler. As the node is sycnhronizing via the commit syncer, the certified commits
889        // will bring all the necessary blocks to run the commits. As the commits are certified, we are guaranteed that all the necessary causal history is present.
890        if self.is_commit_lagging() {
891            return Ok(());
892        }
893
894        self.fetch_blocks_scheduler_task
895            .spawn(monitored_future!(async move {
896                let _scope = monitored_scope("FetchMissingBlocksScheduler");
897                context
898                    .metrics
899                    .node_metrics
900                    .fetch_blocks_scheduler_inflight
901                    .inc();
902                let total_requested = missing_blocks.len();
903
904                fail_point_async!("consensus-delay");
905                // Fetch blocks from peers
906                let results = Self::fetch_blocks_from_authorities(
907                    context.clone(),
908                    blocks_to_fetch.clone(),
909                    network_client,
910                    missing_blocks,
911                    dag_state,
912                )
913                .await;
914                context
915                    .metrics
916                    .node_metrics
917                    .fetch_blocks_scheduler_inflight
918                    .dec();
919                if results.is_empty() {
920                    return;
921                }
922
923                // Now process the returned results
924                let mut total_fetched = 0;
925                for (blocks_guard, fetched_blocks, peer) in results {
926                    total_fetched += fetched_blocks.len();
927
928                    if let Err(err) = Self::process_fetched_blocks(
929                        fetched_blocks,
930                        peer,
931                        blocks_guard,
932                        core_dispatcher.clone(),
933                        block_verifier.clone(),
934                        transaction_certifier.clone(),
935                        commit_vote_monitor.clone(),
936                        context.clone(),
937                        commands_sender.clone(),
938                        "periodic",
939                    )
940                    .await
941                    {
942                        warn!(
943                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
944                        );
945                        context
946                            .metrics
947                            .node_metrics
948                            .synchronizer_process_fetched_failures
949                            .with_label_values(&[
950                                &context.committee.authority(peer).hostname,
951                                "periodic",
952                            ])
953                            .inc();
954                    }
955                }
956
957                debug!(
958                    "Total blocks requested to fetch: {}, total fetched: {}",
959                    total_requested, total_fetched
960                );
961            }));
962        Ok(())
963    }
964
965    fn is_commit_lagging(&self) -> bool {
966        let last_commit_index = self.dag_state.read().last_commit_index();
967        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
968        let commit_threshold = last_commit_index
969            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
970        commit_threshold < quorum_commit_index
971    }
972
973    /// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
974    /// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
975    /// authorities with missing blocks.
976    /// Each element of the vector is a tuple which contains the requested missing block refs, the returned blocks and
977    /// the peer authority index.
978    async fn fetch_blocks_from_authorities(
979        context: Arc<Context>,
980        inflight_blocks: Arc<InflightBlocksMap>,
981        network_client: Arc<C>,
982        missing_blocks: BTreeSet<BlockRef>,
983        dag_state: Arc<RwLock<DagState>>,
984    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
985        // Preliminary truncation of missing blocks to fetch. Since each peer can have different
986        // number of missing blocks and the fetching is batched by peer, so keep more than max_blocks_per_sync
987        // per peer on average.
988        let missing_blocks = missing_blocks
989            .into_iter()
990            .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_sync)
991            .collect::<Vec<_>>();
992
993        // Maps authorities to the missing blocks they have.
994        let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
995        for block_ref in &missing_blocks {
996            authorities
997                .entry(block_ref.author)
998                .or_default()
999                .push(*block_ref);
1000        }
1001        // Distribute the same number of authorities into each peer to sync.
1002        // When running this function, context.committee.size() is always greater than 1.
1003        let num_authorities_per_peer = authorities
1004            .len()
1005            .div_ceil((context.committee.size() - 1).min(MAX_PERIODIC_SYNC_PEERS));
1006
1007        // Update metrics related to missing blocks.
1008        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1009        for (authority, blocks) in &authorities {
1010            missing_blocks_per_authority[*authority] += blocks.len();
1011        }
1012        for (missing, (_, authority)) in missing_blocks_per_authority
1013            .into_iter()
1014            .zip(context.committee.authorities())
1015        {
1016            context
1017                .metrics
1018                .node_metrics
1019                .synchronizer_missing_blocks_by_authority
1020                .with_label_values(&[&authority.hostname])
1021                .inc_by(missing as u64);
1022            context
1023                .metrics
1024                .node_metrics
1025                .synchronizer_current_missing_blocks_by_authority
1026                .with_label_values(&[&authority.hostname])
1027                .set(missing as i64);
1028        }
1029
1030        let mut peers = context
1031            .committee
1032            .authorities()
1033            .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1034            .collect::<Vec<_>>();
1035
1036        // TODO: probably inject the RNG to allow unit testing - this is a work around for now.
1037        if cfg!(not(test)) {
1038            // Shuffle the peers
1039            peers.shuffle(&mut ThreadRng::default());
1040        }
1041
1042        let mut peers = peers.into_iter();
1043        let mut request_futures = FuturesUnordered::new();
1044
1045        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1046
1047        // Shuffle the authorities for each request.
1048        let mut authorities = authorities.into_values().collect::<Vec<_>>();
1049        if cfg!(not(test)) {
1050            // Shuffle the authorities
1051            authorities.shuffle(&mut ThreadRng::default());
1052        }
1053
1054        // Send the fetch requests
1055        for batch in authorities.chunks(num_authorities_per_peer) {
1056            let Some(peer) = peers.next() else {
1057                debug_fatal!("No more peers left to fetch blocks!");
1058                break;
1059            };
1060            let peer_hostname = &context.committee.authority(peer).hostname;
1061            // Fetch from the lowest round missing blocks to ensure progress.
1062            // This may reduce efficiency and increase the chance of duplicated data transfer in edge cases.
1063            let block_refs = batch
1064                .iter()
1065                .flatten()
1066                .cloned()
1067                .collect::<BTreeSet<_>>()
1068                .into_iter()
1069                .take(context.parameters.max_blocks_per_sync)
1070                .collect::<BTreeSet<_>>();
1071
1072            // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
1073            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1074                info!(
1075                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1076                    block_refs.len(),
1077                    peer,
1078                    peer_hostname,
1079                    block_refs
1080                        .iter()
1081                        .map(|b| b.to_string())
1082                        .collect::<Vec<_>>()
1083                        .join(", ")
1084                );
1085                request_futures.push(Self::fetch_blocks_request(
1086                    network_client.clone(),
1087                    peer,
1088                    blocks_guard,
1089                    highest_rounds.clone(),
1090                    false,
1091                    FETCH_REQUEST_TIMEOUT,
1092                    1,
1093                ));
1094            }
1095        }
1096
1097        let mut results = Vec::new();
1098        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1099
1100        tokio::pin!(fetcher_timeout);
1101
1102        loop {
1103            tokio::select! {
1104                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1105                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1106                    match response {
1107                        Ok(fetched_blocks) => {
1108                            results.push((blocks_guard, fetched_blocks, peer_index));
1109
1110                            // no more pending requests are left, just break the loop
1111                            if request_futures.is_empty() {
1112                                break;
1113                            }
1114                        },
1115                        Err(_) => {
1116                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname, "periodic"]).inc();
1117                            // try again if there is any peer left
1118                            if let Some(next_peer) = peers.next() {
1119                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1120                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1121                                    info!(
1122                                        "Retrying syncing {} missing blocks from peer {}: {}",
1123                                        blocks_guard.block_refs.len(),
1124                                        peer_hostname,
1125                                        blocks_guard.block_refs
1126                                            .iter()
1127                                            .map(|b| b.to_string())
1128                                            .collect::<Vec<_>>()
1129                                            .join(", ")
1130                                    );
1131                                    request_futures.push(Self::fetch_blocks_request(
1132                                        network_client.clone(),
1133                                        next_peer,
1134                                        blocks_guard,
1135                                        highest_rounds,
1136                                        false,
1137                                        FETCH_REQUEST_TIMEOUT,
1138                                        1,
1139                                    ));
1140                                } else {
1141                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1142                                }
1143                            } else {
1144                                debug!("No more peers left to fetch blocks");
1145                            }
1146                        }
1147                    }
1148                },
1149                _ = &mut fetcher_timeout => {
1150                    debug!("Timed out while fetching missing blocks");
1151                    break;
1152                }
1153            }
1154        }
1155
1156        results
1157    }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use std::{
1163        collections::{BTreeMap, BTreeSet},
1164        sync::Arc,
1165        time::Duration,
1166    };
1167
1168    use async_trait::async_trait;
1169    use bytes::Bytes;
1170    use consensus_config::{AuthorityIndex, Parameters};
1171    use consensus_types::block::{BlockDigest, BlockRef, Round};
1172    use mysten_metrics::monitored_mpsc;
1173    use parking_lot::RwLock;
1174    use tokio::{sync::Mutex, time::sleep};
1175
1176    use crate::commit::{CommitVote, TrustedCommit};
1177    use crate::{
1178        CommitDigest, CommitIndex,
1179        block::{TestBlock, VerifiedBlock},
1180        block_verifier::NoopBlockVerifier,
1181        commit::CommitRange,
1182        commit_vote_monitor::CommitVoteMonitor,
1183        context::Context,
1184        core_thread::CoreThreadDispatcher,
1185        dag_state::DagState,
1186        error::{ConsensusError, ConsensusResult},
1187        network::{BlockStream, NetworkClient},
1188        storage::mem_store::MemStore,
1189        synchronizer::{
1190            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1191        },
1192    };
1193    use crate::{
1194        authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1195        transaction_certifier::TransactionCertifier,
1196    };
1197
1198    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1199    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1200    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1201    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1202
1203    #[derive(Default)]
1204    struct MockNetworkClient {
1205        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1206        fetch_latest_blocks_requests:
1207            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1208    }
1209
1210    impl MockNetworkClient {
1211        async fn stub_fetch_blocks(
1212            &self,
1213            blocks: Vec<VerifiedBlock>,
1214            peer: AuthorityIndex,
1215            latency: Option<Duration>,
1216        ) {
1217            let mut lock = self.fetch_blocks_requests.lock().await;
1218            let block_refs = blocks
1219                .iter()
1220                .map(|block| block.reference())
1221                .collect::<Vec<_>>();
1222            lock.insert((block_refs, peer), (blocks, latency));
1223        }
1224
1225        async fn stub_fetch_latest_blocks(
1226            &self,
1227            blocks: Vec<VerifiedBlock>,
1228            peer: AuthorityIndex,
1229            authorities: Vec<AuthorityIndex>,
1230            latency: Option<Duration>,
1231        ) {
1232            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1233            lock.entry((peer, authorities))
1234                .or_default()
1235                .push((blocks, latency));
1236        }
1237
1238        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1239            let lock = self.fetch_latest_blocks_requests.lock().await;
1240            lock.len()
1241        }
1242    }
1243
1244    #[async_trait]
1245    impl NetworkClient for MockNetworkClient {
1246        async fn send_block(
1247            &self,
1248            _peer: AuthorityIndex,
1249            _serialized_block: &VerifiedBlock,
1250            _timeout: Duration,
1251        ) -> ConsensusResult<()> {
1252            unimplemented!("Unimplemented")
1253        }
1254
1255        async fn subscribe_blocks(
1256            &self,
1257            _peer: AuthorityIndex,
1258            _last_received: Round,
1259            _timeout: Duration,
1260        ) -> ConsensusResult<BlockStream> {
1261            unimplemented!("Unimplemented")
1262        }
1263
1264        async fn fetch_blocks(
1265            &self,
1266            peer: AuthorityIndex,
1267            block_refs: Vec<BlockRef>,
1268            _highest_accepted_rounds: Vec<Round>,
1269            _breadth_first: bool,
1270            _timeout: Duration,
1271        ) -> ConsensusResult<Vec<Bytes>> {
1272            let mut lock = self.fetch_blocks_requests.lock().await;
1273            let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1274                panic!(
1275                    "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1276                    block_refs, peer, lock
1277                );
1278            });
1279
1280            let serialised = response
1281                .0
1282                .into_iter()
1283                .map(|block| block.serialized().clone())
1284                .collect::<Vec<_>>();
1285
1286            drop(lock);
1287
1288            if let Some(latency) = response.1 {
1289                sleep(latency).await;
1290            }
1291
1292            Ok(serialised)
1293        }
1294
1295        async fn fetch_commits(
1296            &self,
1297            _peer: AuthorityIndex,
1298            _commit_range: CommitRange,
1299            _timeout: Duration,
1300        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1301            unimplemented!("Unimplemented")
1302        }
1303
1304        async fn fetch_latest_blocks(
1305            &self,
1306            peer: AuthorityIndex,
1307            authorities: Vec<AuthorityIndex>,
1308            _timeout: Duration,
1309        ) -> ConsensusResult<Vec<Bytes>> {
1310            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1311            let mut responses = lock
1312                .remove(&(peer, authorities.clone()))
1313                .expect("Unexpected fetch blocks request made");
1314
1315            let response = responses.remove(0);
1316            let serialised = response
1317                .0
1318                .into_iter()
1319                .map(|block| block.serialized().clone())
1320                .collect::<Vec<_>>();
1321
1322            if !responses.is_empty() {
1323                lock.insert((peer, authorities), responses);
1324            }
1325
1326            drop(lock);
1327
1328            if let Some(latency) = response.1 {
1329                sleep(latency).await;
1330            }
1331
1332            Ok(serialised)
1333        }
1334
1335        async fn get_latest_rounds(
1336            &self,
1337            _peer: AuthorityIndex,
1338            _timeout: Duration,
1339        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1340            unimplemented!("Unimplemented")
1341        }
1342    }
1343
1344    #[test]
1345    fn test_inflight_blocks_map() {
1346        // GIVEN
1347        let map = InflightBlocksMap::new();
1348        let some_block_refs = [
1349            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1350            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1351            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1352            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1353        ];
1354        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1355
1356        // Lock & unlock blocks
1357        {
1358            let mut all_guards = Vec::new();
1359
1360            // Try to acquire the block locks for authorities 1 & 2
1361            for i in 1..=2 {
1362                let authority = AuthorityIndex::new_for_test(i);
1363
1364                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1365                let guard = guard.expect("Guard should be created");
1366                assert_eq!(guard.block_refs.len(), 4);
1367
1368                all_guards.push(guard);
1369
1370                // trying to acquire any of them again will not succeed
1371                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1372                assert!(guard.is_none());
1373            }
1374
1375            // Trying to acquire for authority 3 it will fail - as we have maxed out the number of allowed peers
1376            let authority_3 = AuthorityIndex::new_for_test(3);
1377
1378            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1379            assert!(guard.is_none());
1380
1381            // Explicitly drop the guard of authority 1 and try for authority 3 again - it will now succeed
1382            drop(all_guards.remove(0));
1383
1384            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1385            let guard = guard.expect("Guard should be successfully acquired");
1386
1387            assert_eq!(guard.block_refs, missing_block_refs);
1388
1389            // Dropping all guards should unlock on the block refs
1390            drop(guard);
1391            drop(all_guards);
1392
1393            assert_eq!(map.num_of_locked_blocks(), 0);
1394        }
1395
1396        // Swap locks
1397        {
1398            // acquire a lock for authority 1
1399            let authority_1 = AuthorityIndex::new_for_test(1);
1400            let guard = map
1401                .lock_blocks(missing_block_refs.clone(), authority_1)
1402                .unwrap();
1403
1404            // Now swap the locks for authority 2
1405            let authority_2 = AuthorityIndex::new_for_test(2);
1406            let guard = map.swap_locks(guard, authority_2);
1407
1408            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn successful_fetch_blocks_from_peer() {
1414        // GIVEN
1415        let (context, _) = Context::new_for_test(4);
1416        let context = Arc::new(context);
1417        let block_verifier = Arc::new(NoopBlockVerifier {});
1418        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1419        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1420        let network_client = Arc::new(MockNetworkClient::default());
1421        let (blocks_sender, _blocks_receiver) =
1422            monitored_mpsc::unbounded_channel("consensus_block_output");
1423        let store = Arc::new(MemStore::new());
1424        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1425        let transaction_certifier = TransactionCertifier::new(
1426            context.clone(),
1427            block_verifier.clone(),
1428            dag_state.clone(),
1429            blocks_sender,
1430        );
1431
1432        let handle = Synchronizer::start(
1433            network_client.clone(),
1434            context,
1435            core_dispatcher.clone(),
1436            commit_vote_monitor,
1437            block_verifier,
1438            transaction_certifier,
1439            dag_state,
1440            false,
1441        );
1442
1443        // Create some test blocks
1444        let expected_blocks = (0..10)
1445            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1446            .collect::<Vec<_>>();
1447        let missing_blocks = expected_blocks
1448            .iter()
1449            .map(|block| block.reference())
1450            .collect::<BTreeSet<_>>();
1451
1452        // AND stub the fetch_blocks request from peer 1
1453        let peer = AuthorityIndex::new_for_test(1);
1454        network_client
1455            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1456            .await;
1457
1458        // WHEN request missing blocks from peer 1
1459        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1460
1461        // Wait a little bit until those have been added in core
1462        sleep(Duration::from_millis(1_000)).await;
1463
1464        // THEN ensure those ended up in Core
1465        let added_blocks = core_dispatcher.get_add_blocks().await;
1466        assert_eq!(added_blocks, expected_blocks);
1467    }
1468
1469    #[tokio::test]
1470    async fn saturate_fetch_blocks_from_peer() {
1471        // GIVEN
1472        let (context, _) = Context::new_for_test(4);
1473        let context = Arc::new(context);
1474        let block_verifier = Arc::new(NoopBlockVerifier {});
1475        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1476        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1477        let network_client = Arc::new(MockNetworkClient::default());
1478        let (blocks_sender, _blocks_receiver) =
1479            monitored_mpsc::unbounded_channel("consensus_block_output");
1480        let store = Arc::new(MemStore::new());
1481        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1482        let transaction_certifier = TransactionCertifier::new(
1483            context.clone(),
1484            block_verifier.clone(),
1485            dag_state.clone(),
1486            blocks_sender,
1487        );
1488
1489        let handle = Synchronizer::start(
1490            network_client.clone(),
1491            context,
1492            core_dispatcher.clone(),
1493            commit_vote_monitor,
1494            block_verifier,
1495            transaction_certifier,
1496            dag_state,
1497            false,
1498        );
1499
1500        // Create some test blocks
1501        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1502            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1503            .collect::<Vec<_>>();
1504
1505        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1506        let peer = AuthorityIndex::new_for_test(1);
1507        let mut iter = expected_blocks.iter().peekable();
1508        while let Some(block) = iter.next() {
1509            // stub the fetch_blocks request from peer 1 and give some high response latency so requests
1510            // can start blocking the peer task.
1511            network_client
1512                .stub_fetch_blocks(
1513                    vec![block.clone()],
1514                    peer,
1515                    Some(Duration::from_millis(5_000)),
1516                )
1517                .await;
1518
1519            let mut missing_blocks = BTreeSet::new();
1520            missing_blocks.insert(block.reference());
1521
1522            // WHEN requesting to fetch the blocks, it should not succeed for the last request and get
1523            // an error with "saturated" synchronizer
1524            if iter.peek().is_none() {
1525                match handle.fetch_blocks(missing_blocks, peer).await {
1526                    Err(ConsensusError::SynchronizerSaturated(index)) => {
1527                        assert_eq!(index, peer);
1528                    }
1529                    _ => panic!("A saturated synchronizer error was expected"),
1530                }
1531            } else {
1532                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1533            }
1534        }
1535    }
1536
1537    #[tokio::test(flavor = "current_thread", start_paused = true)]
1538    async fn synchronizer_periodic_task_fetch_blocks() {
1539        // GIVEN
1540        let (context, _) = Context::new_for_test(4);
1541        let context = Arc::new(context);
1542        let block_verifier = Arc::new(NoopBlockVerifier {});
1543        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1544        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1545        let network_client = Arc::new(MockNetworkClient::default());
1546        let (blocks_sender, _blocks_receiver) =
1547            monitored_mpsc::unbounded_channel("consensus_block_output");
1548        let store = Arc::new(MemStore::new());
1549        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1550        let transaction_certifier = TransactionCertifier::new(
1551            context.clone(),
1552            block_verifier.clone(),
1553            dag_state.clone(),
1554            blocks_sender,
1555        );
1556
1557        // Create some test blocks
1558        let expected_blocks = (0..10)
1559            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1560            .collect::<Vec<_>>();
1561        let missing_blocks = expected_blocks
1562            .iter()
1563            .map(|block| block.reference())
1564            .collect::<BTreeSet<_>>();
1565
1566        // AND stub the missing blocks
1567        core_dispatcher
1568            .stub_missing_blocks(missing_blocks.clone())
1569            .await;
1570
1571        // AND stub the requests for authority 1 & 2
1572        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1573        // we are skipped anyways.
1574        network_client
1575            .stub_fetch_blocks(
1576                expected_blocks.clone(),
1577                AuthorityIndex::new_for_test(1),
1578                Some(FETCH_REQUEST_TIMEOUT),
1579            )
1580            .await;
1581        network_client
1582            .stub_fetch_blocks(
1583                expected_blocks.clone(),
1584                AuthorityIndex::new_for_test(2),
1585                None,
1586            )
1587            .await;
1588
1589        // WHEN start the synchronizer and wait for a couple of seconds
1590        let _handle = Synchronizer::start(
1591            network_client.clone(),
1592            context,
1593            core_dispatcher.clone(),
1594            commit_vote_monitor,
1595            block_verifier,
1596            transaction_certifier,
1597            dag_state,
1598            false,
1599        );
1600
1601        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1602
1603        // THEN the missing blocks should now be fetched and added to core
1604        let added_blocks = core_dispatcher.get_add_blocks().await;
1605        assert_eq!(added_blocks, expected_blocks);
1606
1607        // AND missing blocks should have been consumed by the stub
1608        assert!(
1609            core_dispatcher
1610                .get_missing_blocks()
1611                .await
1612                .unwrap()
1613                .is_empty()
1614        );
1615    }
1616
1617    #[tokio::test(flavor = "current_thread", start_paused = true)]
1618    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1619        // GIVEN
1620        let (context, _) = Context::new_for_test(4);
1621        let context = Arc::new(context);
1622        let block_verifier = Arc::new(NoopBlockVerifier {});
1623        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1624        let network_client = Arc::new(MockNetworkClient::default());
1625        let (blocks_sender, _blocks_receiver) =
1626            monitored_mpsc::unbounded_channel("consensus_block_output");
1627        let store = Arc::new(MemStore::new());
1628        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1629        let transaction_certifier = TransactionCertifier::new(
1630            context.clone(),
1631            block_verifier.clone(),
1632            dag_state.clone(),
1633            blocks_sender,
1634        );
1635        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1636
1637        // AND stub some missing blocks. The highest accepted round is 0. Create blocks that are above the sync threshold.
1638        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1639        let stub_blocks = (sync_missing_block_round_threshold * 2
1640            ..sync_missing_block_round_threshold * 3)
1641            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1642            .collect::<Vec<_>>();
1643        let missing_blocks = stub_blocks
1644            .iter()
1645            .map(|block| block.reference())
1646            .collect::<BTreeSet<_>>();
1647        core_dispatcher
1648            .stub_missing_blocks(missing_blocks.clone())
1649            .await;
1650
1651        // AND stub the requests for authority 1 & 2
1652        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1653        // we are skipped anyways.
1654        let mut expected_blocks = stub_blocks
1655            .iter()
1656            .take(context.parameters.max_blocks_per_sync)
1657            .cloned()
1658            .collect::<Vec<_>>();
1659        network_client
1660            .stub_fetch_blocks(
1661                expected_blocks.clone(),
1662                AuthorityIndex::new_for_test(1),
1663                Some(FETCH_REQUEST_TIMEOUT),
1664            )
1665            .await;
1666        network_client
1667            .stub_fetch_blocks(
1668                expected_blocks.clone(),
1669                AuthorityIndex::new_for_test(2),
1670                None,
1671            )
1672            .await;
1673
1674        // Now create some blocks to simulate a commit lag
1675        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1676        let commit_index: CommitIndex = round - 1;
1677        let blocks = (0..4)
1678            .map(|authority| {
1679                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1680                let block = TestBlock::new(round, authority)
1681                    .set_commit_votes(commit_votes)
1682                    .build();
1683
1684                VerifiedBlock::new_for_test(block)
1685            })
1686            .collect::<Vec<_>>();
1687
1688        // Pass them through the commit vote monitor - so now there will be a big commit lag to prevent
1689        // the scheduled synchronizer from running
1690        for block in blocks {
1691            commit_vote_monitor.observe_block(&block);
1692        }
1693
1694        // WHEN start the synchronizer and wait for a couple of seconds where normally the synchronizer should have kicked in.
1695        let _handle = Synchronizer::start(
1696            network_client.clone(),
1697            context.clone(),
1698            core_dispatcher.clone(),
1699            commit_vote_monitor.clone(),
1700            block_verifier,
1701            transaction_certifier,
1702            dag_state.clone(),
1703            false,
1704        );
1705
1706        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1707
1708        // Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
1709        // sent to core for processing.
1710        let added_blocks = core_dispatcher.get_add_blocks().await;
1711        assert_eq!(added_blocks, vec![]);
1712
1713        // AND advance now the local commit index by adding a new commit that matches the commit index
1714        // of quorum
1715        {
1716            let mut d = dag_state.write();
1717            for index in 1..=commit_index {
1718                let commit =
1719                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1720
1721                d.add_commit(commit);
1722            }
1723
1724            assert_eq!(
1725                d.last_commit_index(),
1726                commit_vote_monitor.quorum_commit_index()
1727            );
1728        }
1729
1730        // Now stub again the missing blocks to fetch the exact same ones.
1731        core_dispatcher
1732            .stub_missing_blocks(missing_blocks.clone())
1733            .await;
1734
1735        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1736
1737        // THEN the missing blocks should now be fetched and added to core
1738        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1739
1740        added_blocks.sort_by_key(|block| block.reference());
1741        expected_blocks.sort_by_key(|block| block.reference());
1742
1743        assert_eq!(added_blocks, expected_blocks);
1744    }
1745
1746    #[tokio::test(flavor = "current_thread", start_paused = true)]
1747    async fn synchronizer_fetch_own_last_block() {
1748        // GIVEN
1749        let (context, _) = Context::new_for_test(4);
1750        let context = Arc::new(context.with_parameters(Parameters {
1751            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1752            ..Default::default()
1753        }));
1754        let block_verifier = Arc::new(NoopBlockVerifier {});
1755        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1756        let network_client = Arc::new(MockNetworkClient::default());
1757        let (blocks_sender, _blocks_receiver) =
1758            monitored_mpsc::unbounded_channel("consensus_block_output");
1759        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1760        let store = Arc::new(MemStore::new());
1761        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1762        let transaction_certifier = TransactionCertifier::new(
1763            context.clone(),
1764            block_verifier.clone(),
1765            dag_state.clone(),
1766            blocks_sender,
1767        );
1768        let our_index = AuthorityIndex::new_for_test(0);
1769
1770        // Create some test blocks
1771        let mut expected_blocks = (9..=10)
1772            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1773            .collect::<Vec<_>>();
1774
1775        // Now set different latest blocks for the peers
1776        // For peer 1 we give the block of round 10 (highest)
1777        let block_1 = expected_blocks.pop().unwrap();
1778        network_client
1779            .stub_fetch_latest_blocks(
1780                vec![block_1.clone()],
1781                AuthorityIndex::new_for_test(1),
1782                vec![our_index],
1783                None,
1784            )
1785            .await;
1786        network_client
1787            .stub_fetch_latest_blocks(
1788                vec![block_1],
1789                AuthorityIndex::new_for_test(1),
1790                vec![our_index],
1791                None,
1792            )
1793            .await;
1794
1795        // For peer 2 we give the block of round 9
1796        let block_2 = expected_blocks.pop().unwrap();
1797        network_client
1798            .stub_fetch_latest_blocks(
1799                vec![block_2.clone()],
1800                AuthorityIndex::new_for_test(2),
1801                vec![our_index],
1802                Some(Duration::from_secs(10)),
1803            )
1804            .await;
1805        network_client
1806            .stub_fetch_latest_blocks(
1807                vec![block_2],
1808                AuthorityIndex::new_for_test(2),
1809                vec![our_index],
1810                None,
1811            )
1812            .await;
1813
1814        // For peer 3 we don't give any block - and it should return an empty vector
1815        network_client
1816            .stub_fetch_latest_blocks(
1817                vec![],
1818                AuthorityIndex::new_for_test(3),
1819                vec![our_index],
1820                Some(Duration::from_secs(10)),
1821            )
1822            .await;
1823        network_client
1824            .stub_fetch_latest_blocks(
1825                vec![],
1826                AuthorityIndex::new_for_test(3),
1827                vec![our_index],
1828                None,
1829            )
1830            .await;
1831
1832        // WHEN start the synchronizer and wait for a couple of seconds
1833        let handle = Synchronizer::start(
1834            network_client.clone(),
1835            context.clone(),
1836            core_dispatcher.clone(),
1837            commit_vote_monitor,
1838            block_verifier,
1839            transaction_certifier,
1840            dag_state,
1841            true,
1842        );
1843
1844        // Wait at least for the timeout time
1845        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1846
1847        // Assert that core has been called to set the min propose round
1848        assert_eq!(
1849            core_dispatcher.get_last_own_proposed_round().await,
1850            vec![10]
1851        );
1852
1853        // Ensure that all the requests have been called
1854        assert_eq!(network_client.fetch_latest_blocks_pending_calls().await, 0);
1855
1856        // And we got one retry
1857        assert_eq!(
1858            context
1859                .metrics
1860                .node_metrics
1861                .sync_last_known_own_block_retries
1862                .get(),
1863            1
1864        );
1865
1866        // Ensure that no panic occurred
1867        if let Err(err) = handle.stop().await
1868            && err.is_panic()
1869        {
1870            std::panic::resume_unwind(err.into_panic());
1871        }
1872    }
1873
1874    #[tokio::test]
1875    async fn test_process_fetched_blocks() {
1876        // GIVEN
1877        let (context, _) = Context::new_for_test(4);
1878        let context = Arc::new(context);
1879        let block_verifier = Arc::new(NoopBlockVerifier {});
1880        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1881        let (blocks_sender, _blocks_receiver) =
1882            monitored_mpsc::unbounded_channel("consensus_block_output");
1883        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1884        let store = Arc::new(MemStore::new());
1885        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1886        let transaction_certifier = TransactionCertifier::new(
1887            context.clone(),
1888            block_verifier.clone(),
1889            dag_state.clone(),
1890            blocks_sender,
1891        );
1892        let (commands_sender, _commands_receiver) =
1893            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1894
1895        // Create input test blocks:
1896        // - Authority 0 block at round 60.
1897        // - Authority 1 blocks from round 30 to 60.
1898        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
1899        expected_blocks.extend(
1900            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
1901        );
1902        assert_eq!(
1903            expected_blocks.len(),
1904            context.parameters.max_blocks_per_sync
1905        );
1906
1907        let expected_serialized_blocks = expected_blocks
1908            .iter()
1909            .map(|b| b.serialized().clone())
1910            .collect::<Vec<_>>();
1911
1912        let expected_block_refs = expected_blocks
1913            .iter()
1914            .map(|b| b.reference())
1915            .collect::<BTreeSet<_>>();
1916
1917        // GIVEN peer to fetch blocks from
1918        let peer_index = AuthorityIndex::new_for_test(2);
1919
1920        // Create blocks_guard
1921        let inflight_blocks_map = InflightBlocksMap::new();
1922        let blocks_guard = inflight_blocks_map
1923            .lock_blocks(expected_block_refs.clone(), peer_index)
1924            .expect("Failed to lock blocks");
1925
1926        assert_eq!(
1927            inflight_blocks_map.num_of_locked_blocks(),
1928            expected_block_refs.len()
1929        );
1930
1931        // Create a Synchronizer
1932        let result = Synchronizer::<
1933            MockNetworkClient,
1934            NoopBlockVerifier,
1935            MockCoreThreadDispatcher,
1936        >::process_fetched_blocks(
1937            expected_serialized_blocks,
1938            peer_index,
1939            blocks_guard, // The guard is consumed here
1940            core_dispatcher.clone(),
1941            block_verifier,
1942            transaction_certifier,
1943            commit_vote_monitor,
1944            context.clone(),
1945            commands_sender,
1946            "test",
1947        )
1948        .await;
1949
1950        // THEN
1951        assert!(result.is_ok());
1952
1953        // Check blocks were sent to core
1954        let added_blocks = core_dispatcher.get_add_blocks().await;
1955        assert_eq!(
1956            added_blocks
1957                .iter()
1958                .map(|b| b.reference())
1959                .collect::<BTreeSet<_>>(),
1960            expected_block_refs,
1961        );
1962
1963        // Check blocks were unlocked
1964        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
1965    }
1966}