consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{
4    collections::{BTreeMap, BTreeSet},
5    sync::Arc,
6    time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::{ZipDebugEqIteratorExt, debug_fatal};
15use mysten_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25    runtime::Handle,
26    sync::{mpsc::error::TrySendError, oneshot},
27    task::{JoinError, JoinSet},
28    time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, info, trace, warn};
31
32use crate::{
33    BlockAPI,
34    block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35    block_verifier::BlockVerifier,
36    commit::CommitIndex,
37    commit_vote_monitor::{CommitVoteMonitor, is_commit_lagging},
38    context::Context,
39    dag_state::DagState,
40    error::{ConsensusError, ConsensusResult},
41    network::{ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient},
42    peers_pool::PeersPool,
43    round_tracker::RoundTracker,
44};
45use crate::{core_thread::CoreThreadDispatcher, transaction_vote_tracker::TransactionVoteTracker};
46
47/// The number of concurrent fetch blocks requests per authority
48const FETCH_BLOCKS_CONCURRENCY: usize = 5;
49
50/// Timeouts when fetching blocks.
51const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
52const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
53
54const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
55
56// Max number of peers to request missing blocks concurrently in periodic sync.
57const MAX_PERIODIC_SYNC_PEERS: usize = 3;
58
59/// How long commit must be stalled before periodic sync kicks in as fallback.
60const COMMIT_PROGRESS_TIMEOUT: Duration = Duration::from_secs(10);
61
62struct BlocksGuard {
63    map: Arc<InflightBlocksMap>,
64    block_refs: BTreeSet<BlockRef>,
65    peer: PeerId,
66}
67
68impl Drop for BlocksGuard {
69    fn drop(&mut self) {
70        self.map.unlock_blocks(&self.block_refs, self.peer.clone());
71    }
72}
73
74// Keeps a mapping between the missing blocks that have been instructed to be fetched and the peers
75// that are currently fetching them. For a block ref there is a maximum number of peers that can
76// concurrently fetch it. The peer ids that are currently fetching a block are set on the corresponding
77// `BTreeSet` and basically they act as "locks".
78struct InflightBlocksMap {
79    inner: Mutex<BTreeMap<BlockRef, BTreeSet<PeerId>>>,
80}
81
82impl InflightBlocksMap {
83    fn new() -> Arc<Self> {
84        Arc::new(Self {
85            inner: Mutex::new(BTreeMap::new()),
86        })
87    }
88
89    /// Locks the blocks to be fetched for the assigned `peer`. We want to avoid re-fetching the
90    /// missing blocks from too many peers at the same time, thus we limit the concurrency
91    /// per block by attempting to lock per block. If a block is already fetched by the maximum allowed
92    /// number of peers, then the block ref will not be included in the returned set. The method
93    /// returns all the block refs that have been successfully locked and allowed to be fetched.
94    fn lock_blocks(
95        self: &Arc<Self>,
96        missing_block_refs: BTreeSet<BlockRef>,
97        peer: PeerId,
98    ) -> Option<BlocksGuard> {
99        let mut blocks = BTreeSet::new();
100        let mut inner = self.inner.lock();
101
102        for block_ref in missing_block_refs {
103            // check that the number of peers that are already instructed to fetch the block is not
104            // higher than the allowed and the `peer` has not already been instructed to do that.
105            let peers = inner.entry(block_ref).or_default();
106            if peers.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK && peers.get(&peer).is_none() {
107                assert!(peers.insert(peer.clone()));
108                blocks.insert(block_ref);
109            }
110        }
111
112        if blocks.is_empty() {
113            None
114        } else {
115            Some(BlocksGuard {
116                map: self.clone(),
117                block_refs: blocks,
118                peer,
119            })
120        }
121    }
122
123    /// Unlocks the provided block references for the given `peer`. The unlocking is strict, meaning that
124    /// if this method is called for a specific block ref and peer more times than the corresponding lock
125    /// has been called, it will panic.
126    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: PeerId) {
127        // Now mark all the blocks as fetched from the map
128        let mut blocks_to_fetch = self.inner.lock();
129        for block_ref in block_refs {
130            let peers = blocks_to_fetch
131                .get_mut(block_ref)
132                .expect("Should have found a non empty map");
133
134            assert!(peers.remove(&peer), "Peer should be present!");
135
136            // if the last one then just clean up
137            if peers.is_empty() {
138                blocks_to_fetch.remove(block_ref);
139            }
140        }
141    }
142
143    /// Drops the provided `blocks_guard` which will force to unlock the blocks, and lock now again the
144    /// referenced block refs. The swap is best effort and there is no guarantee that the `peer` will
145    /// be able to acquire the new locks.
146    fn swap_locks(
147        self: &Arc<Self>,
148        blocks_guard: BlocksGuard,
149        peer: PeerId,
150    ) -> Option<BlocksGuard> {
151        let block_refs = blocks_guard.block_refs.clone();
152
153        // Explicitly drop the guard
154        drop(blocks_guard);
155
156        // Now create new guard
157        self.lock_blocks(block_refs, peer)
158    }
159
160    #[cfg(test)]
161    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
162        let inner = self.inner.lock();
163        inner.len()
164    }
165}
166
167enum Command {
168    FetchBlocks {
169        missing_block_refs: BTreeSet<BlockRef>,
170        peer: PeerId,
171        result: oneshot::Sender<Result<(), ConsensusError>>,
172    },
173    FetchOwnLastBlock,
174    KickOffScheduler,
175}
176
177pub(crate) struct SynchronizerHandle {
178    commands_sender: Sender<Command>,
179    tasks: tokio::sync::Mutex<JoinSet<()>>,
180}
181
182impl SynchronizerHandle {
183    /// Explicitly asks from the synchronizer to fetch the blocks - provided the block_refs set - from
184    /// the peer.
185    pub(crate) async fn fetch_blocks(
186        &self,
187        missing_block_refs: BTreeSet<BlockRef>,
188        peer: PeerId,
189    ) -> ConsensusResult<()> {
190        let (sender, receiver) = oneshot::channel();
191        self.commands_sender
192            .send(Command::FetchBlocks {
193                missing_block_refs,
194                peer,
195                result: sender,
196            })
197            .await
198            .map_err(|_err| ConsensusError::Shutdown)?;
199        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
200    }
201
202    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
203        let mut tasks = self.tasks.lock().await;
204        tasks.abort_all();
205        while let Some(result) = tasks.join_next().await {
206            result?
207        }
208        Ok(())
209    }
210
211    #[cfg(test)]
212    /// Creates a mock synchronizer handle for testing
213    pub(crate) fn new_for_test() -> Arc<Self> {
214        use tokio::task::JoinSet;
215        let (tx, _rx) = channel("test_synchronizer", 1);
216        Arc::new(Self {
217            commands_sender: tx,
218            tasks: tokio::sync::Mutex::new(JoinSet::new()),
219        })
220    }
221}
222
223/// `Synchronizer` oversees live block synchronization, crucial for node progress. Live synchronization
224/// refers to the process of retrieving missing blocks, particularly those essential for advancing a node
225/// when data from only a few rounds is absent. If a node significantly lags behind the network,
226/// `commit_syncer` handles fetching missing blocks via a more efficient approach. `Synchronizer`
227/// aims for swift catch-up employing two mechanisms:
228///
229/// 1. Explicitly requesting missing blocks from designated authorities via the "block send" path.
230///    This includes attempting to fetch any missing ancestors necessary for processing a received block.
231///    Such requests prioritize the block author, maximizing the chance of prompt retrieval.
232///    A locking mechanism allows concurrent requests for missing blocks from up to two authorities
233///    simultaneously, enhancing the chances of timely retrieval. Notably, if additional missing blocks
234///    arise during block processing, requests to the same authority are deferred to the scheduler.
235///
236/// 2. Periodically requesting missing blocks via a scheduler. This primarily serves to retrieve
237///    missing blocks that were not ancestors of a received block via the "block send" path.
238///    The scheduler operates on either a fixed periodic basis or is triggered immediately
239///    after explicit fetches described in (1), ensuring continued block retrieval if gaps persist.
240///
241/// Additionally to the above, the synchronizer can synchronize and fetch the last own proposed block
242/// from the network peers as best effort approach to recover node from amnesia and avoid making the
243/// node equivocate.
244pub(crate) struct Synchronizer<
245    V: BlockVerifier,
246    D: CoreThreadDispatcher,
247    VC: ValidatorNetworkClient,
248    OC: ObserverNetworkClient,
249> {
250    context: Arc<Context>,
251    commands_receiver: Receiver<Command>,
252    fetch_block_senders: BTreeMap<PeerId, Sender<BlocksGuard>>,
253    core_dispatcher: Arc<D>,
254    commit_vote_monitor: Arc<CommitVoteMonitor>,
255    dag_state: Arc<RwLock<DagState>>,
256    fetch_blocks_scheduler_task: JoinSet<()>,
257    fetch_own_last_block_task: JoinSet<()>,
258    network_client: Arc<SynchronizerClient<VC, OC>>,
259    block_verifier: Arc<V>,
260    transaction_vote_tracker: TransactionVoteTracker,
261    round_tracker: Arc<RwLock<RoundTracker>>,
262    inflight_blocks_map: Arc<InflightBlocksMap>,
263    commands_sender: Sender<Command>,
264    last_changed_commit_index: CommitIndex,
265    last_commit_change_time: Instant,
266    // When commit is not progressing, commit sync fails over to periodic sync for catchup.
267    commit_sync_failover: bool,
268    peers_pool: Arc<PeersPool>,
269}
270
271impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
272where
273    V: BlockVerifier,
274    D: CoreThreadDispatcher,
275    VC: ValidatorNetworkClient,
276    OC: ObserverNetworkClient,
277{
278    pub(crate) fn start(
279        network_client: Arc<SynchronizerClient<VC, OC>>,
280        context: Arc<Context>,
281        core_dispatcher: Arc<D>,
282        commit_vote_monitor: Arc<CommitVoteMonitor>,
283        block_verifier: Arc<V>,
284        transaction_vote_tracker: TransactionVoteTracker,
285        round_tracker: Arc<RwLock<RoundTracker>>,
286        dag_state: Arc<RwLock<DagState>>,
287        peers_pool: Arc<PeersPool>,
288        sync_last_known_own_block: bool,
289    ) -> Arc<SynchronizerHandle> {
290        let (commands_sender, commands_receiver) =
291            channel("consensus_synchronizer_commands", 1_000);
292        let inflight_blocks_map = InflightBlocksMap::new();
293
294        // Spawn the tasks to fetch the blocks from the others
295        let mut fetch_block_senders = BTreeMap::new();
296        let mut tasks = JoinSet::new();
297
298        // Create fetch tasks for all known peers (validators and observers)
299        // TODO: refactor to update the sender tasks based on the registered/removed pool peers.
300        let known_peers = peers_pool.get_known_peers();
301        for peer in known_peers {
302            let (sender, receiver) =
303                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
304            let fetch_blocks_from_peer_async = Self::fetch_blocks_from_peer(
305                peer.clone(),
306                network_client.clone(),
307                block_verifier.clone(),
308                transaction_vote_tracker.clone(),
309                commit_vote_monitor.clone(),
310                context.clone(),
311                core_dispatcher.clone(),
312                dag_state.clone(),
313                receiver,
314                commands_sender.clone(),
315                round_tracker.clone(),
316                peers_pool.clone(),
317            );
318            tasks.spawn(monitored_future!(fetch_blocks_from_peer_async));
319            fetch_block_senders.insert(peer, sender);
320        }
321
322        let commands_sender_clone = commands_sender.clone();
323
324        if sync_last_known_own_block {
325            commands_sender
326                .try_send(Command::FetchOwnLastBlock)
327                .expect("Failed to sync our last block");
328        }
329
330        // Spawn the task to listen to the requests & periodic runs
331        tasks.spawn(monitored_future!(async move {
332            let mut s = Self {
333                context,
334                commands_receiver,
335                fetch_block_senders,
336                core_dispatcher,
337                commit_vote_monitor,
338                fetch_blocks_scheduler_task: JoinSet::new(),
339                fetch_own_last_block_task: JoinSet::new(),
340                network_client,
341                block_verifier,
342                transaction_vote_tracker,
343                inflight_blocks_map,
344                commands_sender: commands_sender_clone,
345                dag_state,
346                round_tracker,
347                last_changed_commit_index: 0,
348                last_commit_change_time: Instant::now(),
349                commit_sync_failover: false,
350                peers_pool,
351            };
352            s.run().await;
353        }));
354
355        Arc::new(SynchronizerHandle {
356            commands_sender,
357            tasks: tokio::sync::Mutex::new(tasks),
358        })
359    }
360
361    // The main loop to listen for the submitted commands.
362    async fn run(&mut self) {
363        // We want the synchronizer to run periodically every 200ms to fetch any missing blocks.
364        const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
365        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
366
367        tokio::pin!(scheduler_timeout);
368
369        loop {
370            tokio::select! {
371                Some(command) = self.commands_receiver.recv() => {
372                    match command {
373                        Command::FetchBlocks{ missing_block_refs, peer, result } => {
374                            // Check if peer is available. This check also makes sure that we are not trying to fetch from ourselves.
375                            if !self.peers_pool.is_peer_known(&peer) {
376                                result.send(Err(ConsensusError::PeerUnavailable(format!("{:?}", peer)))).ok();
377                                continue;
378                            }
379
380                            // Keep only the max allowed blocks to request. Additional missing blocks
381                            // will be fetched via periodic sync.
382                            // Fetch from the lowest to highest round, to ensure progress.
383                            let missing_block_refs = missing_block_refs
384                                .into_iter()
385                                .take(self.context.parameters.max_blocks_per_sync)
386                                .collect();
387
388                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer.clone());
389                            let Some(blocks_guard) = blocks_guard else {
390                                result.send(Ok(())).ok();
391                                continue;
392                            };
393
394                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
395                            // synchronization task will handle any still missing blocks in next run.
396                            let r = self
397                                .fetch_block_senders
398                                .get(&peer)
399                                .ok_or(ConsensusError::PeerNotFound(format!("Peer {} not found in fetch_block_senders", peer)))
400                                .and_then(|sender| {
401                                    sender
402                                        .try_send(blocks_guard)
403                                        .map_err(|err| {
404                                            match err {
405                                                TrySendError::Full(_) => {
406                                                    let peer_name = peer.labelname(&self.context);
407                                                    self.context
408                                                        .metrics
409                                                        .node_metrics
410                                                        .synchronizer_skipped_fetch_requests
411                                                        .with_label_values(&[peer_name])
412                                                        .inc();
413                                                    ConsensusError::SynchronizerSaturated(format!("{:?}", peer))
414                                                },
415                                                TrySendError::Closed(_) => ConsensusError::Shutdown
416                                            }
417                                        })
418                                });
419
420                            result.send(r).ok();
421                        }
422                        Command::FetchOwnLastBlock => {
423                            if self.fetch_own_last_block_task.is_empty() {
424                                self.start_fetch_own_last_block_task();
425                            }
426                        }
427                        Command::KickOffScheduler => {
428                            // just reset the scheduler timeout timer to run immediately if not already running.
429                            // If the scheduler is already running then just reduce the remaining time to run.
430                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
431                                Instant::now()
432                            } else {
433                                Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
434                            };
435
436                            // only reset if it is earlier than the next deadline
437                            if timeout < scheduler_timeout.deadline() {
438                                scheduler_timeout.as_mut().reset(timeout);
439                            }
440                        }
441                    }
442                },
443                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
444                    match result {
445                        Ok(()) => {},
446                        Err(e) => {
447                            if e.is_cancelled() {
448                            } else if e.is_panic() {
449                                std::panic::resume_unwind(e.into_panic());
450                            } else {
451                                panic!("fetch our last block task failed: {e}");
452                            }
453                        },
454                    };
455                },
456                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
457                    match result {
458                        Ok(()) => {},
459                        Err(e) => {
460                            if e.is_cancelled() {
461                            } else if e.is_panic() {
462                                std::panic::resume_unwind(e.into_panic());
463                            } else {
464                                panic!("fetch blocks scheduler task failed: {e}");
465                            }
466                        },
467                    };
468                },
469                () = &mut scheduler_timeout => {
470                    // we want to start a new task only if the previous one has already finished.
471                    // TODO: consider starting backup fetches in parallel, when a fetch takes too long?
472                    if self.fetch_blocks_scheduler_task.is_empty()
473                        && let Err(err) = self.start_fetch_missing_blocks_task().await {
474                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
475                            return;
476                        };
477
478                    scheduler_timeout
479                        .as_mut()
480                        .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
481                }
482            }
483        }
484    }
485
486    async fn fetch_blocks_from_peer(
487        peer: PeerId,
488        network_client: Arc<SynchronizerClient<VC, OC>>,
489        block_verifier: Arc<V>,
490        transaction_vote_tracker: TransactionVoteTracker,
491        commit_vote_monitor: Arc<CommitVoteMonitor>,
492        context: Arc<Context>,
493        core_dispatcher: Arc<D>,
494        dag_state: Arc<RwLock<DagState>>,
495        mut receiver: Receiver<BlocksGuard>,
496        commands_sender: Sender<Command>,
497        round_tracker: Arc<RwLock<RoundTracker>>,
498        _peers_pool: Arc<PeersPool>,
499    ) {
500        const MAX_RETRIES: u32 = 3;
501        let mut requests = FuturesUnordered::new();
502
503        loop {
504            tokio::select! {
505                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
506                    let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
507
508                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
509                },
510                Some((response, blocks_guard, retries, _peer, fetch_after_rounds)) = requests.next() => {
511                    match response {
512                        Ok(blocks) => {
513                            if let Err(err) = Self::process_fetched_blocks(blocks,
514                                peer.clone(),
515                                blocks_guard,
516                                core_dispatcher.clone(),
517                                block_verifier.clone(),
518                                transaction_vote_tracker.clone(),
519                                commit_vote_monitor.clone(),
520                                context.clone(),
521                                commands_sender.clone(),
522                                round_tracker.clone(),
523                                "live"
524                            ).await {
525                                warn!("Error while processing fetched blocks from peer {}: {err}", peer.hostname(&context));
526                                context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
527                            }
528                        },
529                        Err(_) => {
530                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
531                            if retries <= MAX_RETRIES {
532                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
533                            } else {
534                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {}.", peer.hostname(&context));
535                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
536                                drop(blocks_guard);
537                            }
538                        }
539                    }
540                },
541                else => {
542                    info!("Fetching blocks from peer {} task will now abort.", peer.hostname(&context));
543                    break;
544                }
545            }
546        }
547    }
548
549    /// Processes the requested raw fetched blocks from peer. If no error is returned then
550    /// the verified blocks are immediately sent to Core for processing.
551    async fn process_fetched_blocks(
552        mut serialized_blocks: Vec<Bytes>,
553        peer: PeerId,
554        requested_blocks_guard: BlocksGuard,
555        core_dispatcher: Arc<D>,
556        block_verifier: Arc<V>,
557        transaction_vote_tracker: TransactionVoteTracker,
558        commit_vote_monitor: Arc<CommitVoteMonitor>,
559        context: Arc<Context>,
560        commands_sender: Sender<Command>,
561        round_tracker: Arc<RwLock<RoundTracker>>,
562        sync_method: &str,
563    ) -> ConsensusResult<()> {
564        if serialized_blocks.is_empty() {
565            return Ok(());
566        }
567
568        // Limit the number of the returned blocks processed.
569        serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
570
571        // Verify all the fetched blocks
572        let blocks = Handle::current()
573            .spawn_blocking({
574                let block_verifier = block_verifier.clone();
575                let context = context.clone();
576                let peer = peer.clone();
577                move || {
578                    Self::verify_blocks(
579                        serialized_blocks,
580                        block_verifier,
581                        transaction_vote_tracker,
582                        &context,
583                        peer,
584                    )
585                }
586            })
587            .await
588            .expect("Spawn blocking should not fail")?;
589
590        // Record commit votes from the verified blocks.
591        for block in &blocks {
592            commit_vote_monitor.observe_block(block);
593        }
594
595        // Update round tracker from the verified blocks. For fetched blocks,
596        // excluded_ancestors are not available so we use an empty vector.
597        {
598            let mut tracker = round_tracker.write();
599            for block in &blocks {
600                tracker.update_from_verified_block(&ExtendedBlock {
601                    block: block.clone(),
602                    excluded_ancestors: vec![],
603                });
604            }
605        }
606
607        let metrics = &context.metrics.node_metrics;
608        metrics
609            .synchronizer_fetched_blocks_by_peer
610            .with_label_values(&[peer.labelname(&context).as_str(), sync_method])
611            .inc_by(blocks.len() as u64);
612        for block in &blocks {
613            let block_hostname = &context.committee.authority(block.author()).hostname;
614            metrics
615                .synchronizer_fetched_blocks_by_authority
616                .with_label_values(&[block_hostname.as_str(), sync_method])
617                .inc();
618        }
619
620        debug!(
621            "Synced {} missing blocks from peer {:?}: {}",
622            blocks.len(),
623            peer,
624            blocks.iter().map(|b| b.reference().to_string()).join(", "),
625        );
626
627        // Now send them to core for processing. Ignore the returned missing blocks as we don't want
628        // this mechanism to keep feedback looping on fetching more blocks. The periodic synchronization
629        // will take care of that.
630        let missing_blocks = core_dispatcher
631            .add_blocks(blocks)
632            .await
633            .map_err(|_| ConsensusError::Shutdown)?;
634
635        // now release all the locked blocks as they have been fetched, verified & processed
636        drop(requested_blocks_guard);
637
638        // kick off immediately the scheduled synchronizer
639        if !missing_blocks.is_empty() {
640            // do not block here, so we avoid any possible cycles.
641            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
642            {
643                warn!("Commands channel is full")
644            }
645        }
646
647        context
648            .metrics
649            .node_metrics
650            .missing_blocks_after_fetch_total
651            .inc_by(missing_blocks.len() as u64);
652
653        Ok(())
654    }
655
656    fn get_fetch_after_rounds(
657        context: &Arc<Context>,
658        dag_state: Arc<RwLock<DagState>>,
659    ) -> Vec<Round> {
660        let (blocks, gc_round) = {
661            let dag_state = dag_state.read();
662            (
663                dag_state.get_last_cached_block_per_authority(Round::MAX),
664                dag_state.gc_round(),
665            )
666        };
667        assert_eq!(blocks.len(), context.committee.size());
668
669        blocks
670            .into_iter()
671            .map(|(block, _)| block.round().max(gc_round))
672            .collect::<Vec<_>>()
673    }
674
675    fn verify_blocks(
676        serialized_blocks: Vec<Bytes>,
677        block_verifier: Arc<V>,
678        transaction_vote_tracker: TransactionVoteTracker,
679        context: &Context,
680        peer: PeerId,
681    ) -> ConsensusResult<Vec<VerifiedBlock>> {
682        let mut verified_blocks = Vec::new();
683        let mut voted_blocks = Vec::new();
684        for serialized_block in serialized_blocks {
685            let signed_block: SignedBlock =
686                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
687
688            // TODO: cache received and verified block refs to avoid duplicated work.
689            let (verified_block, reject_txn_votes) = block_verifier
690                .verify_and_vote(signed_block, serialized_block)
691                .tap_err(|e| {
692                    let peer_label = peer.labelname(context);
693                    context
694                        .metrics
695                        .node_metrics
696                        .invalid_blocks
697                        .with_label_values(&[peer_label.as_str(), "synchronizer", e.clone().name()])
698                        .inc();
699                    info!("Invalid block received from {}: {}", peer, e);
700                })?;
701
702            // TODO: improve efficiency, maybe suspend and continue processing the block asynchronously.
703            let now = context.clock.timestamp_utc_ms();
704            let drift = verified_block.timestamp_ms().saturating_sub(now);
705            if drift > 0 {
706                let peer_hostname = &context
707                    .committee
708                    .authority(verified_block.author())
709                    .hostname;
710                context
711                    .metrics
712                    .node_metrics
713                    .block_timestamp_drift_ms
714                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
715                    .inc_by(drift);
716
717                trace!(
718                    "Synced block {} timestamp {} is in the future (now={}).",
719                    verified_block.reference(),
720                    verified_block.timestamp_ms(),
721                    now
722                );
723            }
724
725            verified_blocks.push(verified_block.clone());
726            voted_blocks.push((verified_block, reject_txn_votes));
727        }
728
729        if context.protocol_config.transaction_voting_enabled() {
730            transaction_vote_tracker.add_voted_blocks(voted_blocks);
731        }
732
733        Ok(verified_blocks)
734    }
735
736    async fn fetch_blocks_request(
737        network_client: Arc<SynchronizerClient<VC, OC>>,
738        peer: PeerId,
739        blocks_guard: BlocksGuard,
740        fetch_after_rounds: Vec<Round>,
741        fetch_missing_ancestors: bool,
742        request_timeout: Duration,
743        mut retries: u32,
744    ) -> (
745        ConsensusResult<Vec<Bytes>>,
746        BlocksGuard,
747        u32,
748        PeerId,
749        Vec<Round>,
750    ) {
751        let start = Instant::now();
752        let resp = timeout(
753            request_timeout,
754            network_client.fetch_blocks(
755                peer.clone(),
756                blocks_guard
757                    .block_refs
758                    .clone()
759                    .into_iter()
760                    .collect::<Vec<_>>(),
761                fetch_after_rounds.clone().into_iter().collect::<Vec<_>>(),
762                fetch_missing_ancestors,
763                request_timeout,
764            ),
765        )
766        .await;
767
768        fail_point_async!("consensus-delay");
769
770        let resp = match resp {
771            Ok(Err(err)) => {
772                // Add a delay before retrying - if that is needed. If request has timed out then eventually
773                // this will be a no-op.
774                sleep_until(start + request_timeout).await;
775                retries += 1;
776                Err(err)
777            } // network error
778            Err(err) => {
779                // timeout
780                sleep_until(start + request_timeout).await;
781                retries += 1;
782                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
783            }
784            Ok(result) => result,
785        };
786        (resp, blocks_guard, retries, peer, fetch_after_rounds)
787    }
788
789    fn start_fetch_own_last_block_task(&mut self) {
790        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
791        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
792
793        let context = self.context.clone();
794        let dag_state = self.dag_state.clone();
795        let network_client = self.network_client.clone();
796        let block_verifier = self.block_verifier.clone();
797        let core_dispatcher = self.core_dispatcher.clone();
798
799        self.fetch_own_last_block_task
800            .spawn(monitored_future!(async move {
801                let _scope = monitored_scope("FetchOwnLastBlockTask");
802
803                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
804                    let network_client_cloned = network_client.clone();
805                    let own_index = context.own_index;
806                    async move {
807                        sleep(fetch_own_block_delay).await;
808                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
809                        (r, authority_index)
810                    }
811                };
812
813
814                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
815                    let mut result = Vec::new();
816                    for serialized_block in blocks {
817                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
818                        let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
819                            let hostname = context.committee.authority(authority_index).hostname.clone();
820                            context
821                                .metrics
822                                .node_metrics
823                                .invalid_blocks
824                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
825                                .inc();
826                            warn!("Invalid block received from {}: {}", authority_index, err);
827                        })?;
828
829                        if verified_block.author() != context.own_index {
830                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
831                        }
832                        result.push(verified_block);
833                    }
834                    Ok(result)
835                };
836
837                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
838                let mut highest_round;
839                let mut retries = 0;
840                let mut retry_delay_step = Duration::from_millis(500);
841                'main:loop {
842                    if context.committee.size() == 1 {
843                        highest_round = dag_state.read().get_last_proposed_block().expect("Last proposed block should be returned on validators").round();
844                        info!("Only one node in the network, will not try fetching own last block from peers.");
845                        break 'main;
846                    }
847
848                    let mut total_stake = 0;
849                    highest_round = 0;
850
851                    // Ask all the other peers about our last block
852                    let mut results = FuturesUnordered::new();
853
854                    for (authority_index, _authority) in context.committee.authorities() {
855                        if authority_index != context.own_index {
856                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
857                        }
858                    }
859
860                    // Gather the results but wait to timeout as well
861                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
862                    tokio::pin!(timer);
863
864                    'inner: loop {
865                        tokio::select! {
866                            result = results.next() => {
867                                let Some((result, authority_index)) = result else {
868                                    break 'inner;
869                                };
870                                match result {
871                                    Ok(result) => {
872                                        match process_blocks(result, authority_index) {
873                                            Ok(blocks) => {
874                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
875                                                highest_round = highest_round.max(max_round);
876
877                                                total_stake += context.committee.stake(authority_index);
878                                            },
879                                            Err(err) => {
880                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
881                                            }
882                                        }
883                                    },
884                                    Err(err) => {
885                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
886                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
887                                    }
888                                }
889                            },
890                            () = &mut timer => {
891                                info!("Timeout while trying to sync our own last block from peers");
892                                break 'inner;
893                            }
894                        }
895                    }
896
897                    // Request at least f+1 stake to have replied back.
898                    if context.committee.reached_validity(total_stake) {
899                        info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
900                        break 'main;
901                    }
902
903                    retries += 1;
904                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
905                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
906
907                    sleep(retry_delay_step).await;
908
909                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
910                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
911                }
912
913                // Update the Core with the highest detected round
914                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
915
916                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
917                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
918                }
919            }));
920    }
921
922    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
923        if self.context.committee.size() == 1 {
924            trace!(
925                "Only one node in the network, will not try fetching missing blocks from peers."
926            );
927            return Ok(());
928        }
929
930        // If commit is lagging and commit sync is making progress, skip periodic sync.
931        // Commit syncer fetches certified commits with all necessary causal history.
932        // If commit sync is not making progress, periodic sync resumes as a fallback.
933        if !self.should_run_periodic_sync() {
934            return Ok(());
935        }
936
937        let context = self.context.clone();
938        let network_client = self.network_client.clone();
939        let block_verifier = self.block_verifier.clone();
940        let transaction_vote_tracker = self.transaction_vote_tracker.clone();
941        let commit_vote_monitor = self.commit_vote_monitor.clone();
942        let core_dispatcher = self.core_dispatcher.clone();
943        let blocks_to_fetch = self.inflight_blocks_map.clone();
944        let commands_sender = self.commands_sender.clone();
945        let dag_state = self.dag_state.clone();
946        let round_tracker = self.round_tracker.clone();
947        let peers_pool = self.peers_pool.clone();
948
949        let mut missing_blocks = self
950            .core_dispatcher
951            .get_missing_blocks()
952            .await
953            .map_err(|_err| ConsensusError::Shutdown)?;
954        if self.commit_sync_failover {
955            // Keep missing blocks to those that must be included in fetch request.
956            // Filtered out missing blocks that will eventually be fetched with fetch_after_rounds.
957            let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
958            missing_blocks.retain(|block| block.round <= fetch_after_rounds[block.author.value()]);
959        } else if missing_blocks.is_empty() {
960            return Ok(());
961        }
962
963        self.fetch_blocks_scheduler_task
964            .spawn(monitored_future!(async move {
965                let _scope = monitored_scope("FetchMissingBlocksScheduler");
966                context
967                    .metrics
968                    .node_metrics
969                    .fetch_blocks_scheduler_inflight
970                    .inc();
971                let total_requested = missing_blocks.len();
972
973                let results = if missing_blocks.is_empty() {
974                    let _scope = monitored_scope("BlockSync::Periodic::HighestAcceptedRounds");
975                    // Fetch blocks from a random peer using highest accepted rounds (commit sync failover)
976                    Self::fetch_blocks_with_fetch_after_rounds(
977                        context.clone(),
978                        blocks_to_fetch.clone(),
979                        network_client,
980                        dag_state,
981                        peers_pool,
982                    )
983                    .await
984                } else {
985                    let _scope = monitored_scope("BlockSync::Periodic::MissingBlocks");
986                    // Fetch blocks from 1 to MAX_PERIODIC_SYNC_PEERS peers
987                    Self::fetch_blocks_from_peers(
988                        context.clone(),
989                        blocks_to_fetch.clone(),
990                        network_client,
991                        missing_blocks,
992                        dag_state,
993                        peers_pool,
994                    )
995                    .await
996                };
997                context
998                    .metrics
999                    .node_metrics
1000                    .fetch_blocks_scheduler_inflight
1001                    .dec();
1002                if results.is_empty() {
1003                    return;
1004                }
1005
1006                fail_point_async!("consensus-delay");
1007
1008                // Now process the returned results
1009                let mut total_fetched = 0;
1010                for (blocks_guard, fetched_blocks, peer) in results {
1011                    total_fetched += fetched_blocks.len();
1012
1013                    if let Err(err) = Self::process_fetched_blocks(
1014                        fetched_blocks,
1015                        peer.clone(),
1016                        blocks_guard,
1017                        core_dispatcher.clone(),
1018                        block_verifier.clone(),
1019                        transaction_vote_tracker.clone(),
1020                        commit_vote_monitor.clone(),
1021                        context.clone(),
1022                        commands_sender.clone(),
1023                        round_tracker.clone(),
1024                        "periodic",
1025                    )
1026                    .await
1027                    {
1028                        warn!(
1029                            "Error occurred while processing fetched blocks from peer {:?}: {err}",
1030                            peer
1031                        );
1032                        let peer_name = peer.labelname(&context);
1033                        context
1034                            .metrics
1035                            .node_metrics
1036                            .synchronizer_process_fetched_failures
1037                            .with_label_values(&[peer_name.as_str(), "periodic"])
1038                            .inc();
1039                    }
1040                }
1041
1042                debug!(
1043                    "Total blocks requested to fetch: {}, total fetched: {}",
1044                    total_requested, total_fetched
1045                );
1046            }));
1047
1048        Ok(())
1049    }
1050
1051    fn should_run_periodic_sync(&mut self) -> bool {
1052        let current_commit_index = self.dag_state.read().last_commit_index();
1053        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1054        let now = Instant::now();
1055        let metrics = &self.context.metrics.node_metrics;
1056
1057        // Commit is not lagging.
1058        if !is_commit_lagging(
1059            self.context.as_ref(),
1060            current_commit_index,
1061            quorum_commit_index,
1062        ) {
1063            metrics
1064                .synchronizer_periodic_sync_decision
1065                .with_label_values(&["true", "default"])
1066                .inc();
1067            // Reset last commit state.
1068            self.last_changed_commit_index = current_commit_index;
1069            self.last_commit_change_time = now;
1070            self.commit_sync_failover = false;
1071            // Run periodic sync.
1072            return true;
1073        }
1074        // Commit is lagging.
1075
1076        // When in commit sync failover, check if enough progress has been made.
1077        if self.commit_sync_failover {
1078            if current_commit_index
1079                < self.last_changed_commit_index + self.context.parameters.commit_sync_batch_size
1080            {
1081                metrics
1082                    .synchronizer_periodic_sync_decision
1083                    .with_label_values(&["true", "commit_catchup::run"])
1084                    .inc();
1085                // Not enough progress has been made yet. Keep running periodic sync.
1086                // Do not update last commit state yet.
1087                // Run periodic sync.
1088                return true;
1089            } else {
1090                metrics
1091                    .synchronizer_periodic_sync_decision
1092                    .with_label_values(&["false", "commit_catchup::end"])
1093                    .inc();
1094                // Enough progress has been made. Disable commit sync failover and reset last commit state.
1095                self.last_changed_commit_index = current_commit_index;
1096                self.last_commit_change_time = now;
1097                self.commit_sync_failover = false;
1098                // Skip periodic sync because of commit lag.
1099                return false;
1100            }
1101        }
1102
1103        // The node is commit lagging and not in commit sync failover yet.
1104        if current_commit_index == self.last_changed_commit_index {
1105            // Enter commit sync failover if not enough progress has been made.
1106            if now.duration_since(self.last_commit_change_time) >= COMMIT_PROGRESS_TIMEOUT {
1107                metrics
1108                    .synchronizer_periodic_sync_decision
1109                    .with_label_values(&["true", "commit_catchup::start"])
1110                    .inc();
1111                self.commit_sync_failover = true;
1112                // Run periodic sync.
1113                return true;
1114            }
1115        } else {
1116            // IMPORTANT: Only update last commit state when commit index is changing.
1117            self.last_changed_commit_index = current_commit_index;
1118            self.last_commit_change_time = now;
1119        }
1120
1121        metrics
1122            .synchronizer_periodic_sync_decision
1123            .with_label_values(&["false", "commit_lag"])
1124            .inc();
1125        false
1126    }
1127
1128    /// Fetches blocks from a random peer using only fetch_after_rounds (no specific missing blocks).
1129    /// Used during commit sync failover to make progress when commit sync is stalled.
1130    async fn fetch_blocks_with_fetch_after_rounds(
1131        context: Arc<Context>,
1132        inflight_blocks: Arc<InflightBlocksMap>,
1133        network_client: Arc<SynchronizerClient<VC, OC>>,
1134        dag_state: Arc<RwLock<DagState>>,
1135        peers_pool: Arc<PeersPool>,
1136    ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1137        let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1138
1139        // Pick a random peer (excluding self).
1140        // Get available peers from the PeersPool
1141        let mut peers = peers_pool.get_known_peers();
1142
1143        // TODO: in the future it would be possible, temporarily, for an Observer node to not have peers to fetch from.
1144        // We should change this assertion to allow for this case.
1145        assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1146
1147        if cfg!(not(test)) {
1148            peers.shuffle(&mut ThreadRng::default());
1149        }
1150
1151        let peer = peers.first().unwrap().clone();
1152
1153        let response = timeout(
1154            FETCH_REQUEST_TIMEOUT,
1155            network_client.fetch_blocks(
1156                peer.clone(),
1157                vec![],
1158                fetch_after_rounds,
1159                false,
1160                FETCH_REQUEST_TIMEOUT,
1161            ),
1162        )
1163        .await;
1164
1165        let serialized_blocks = match response {
1166            Ok(Ok(blocks)) => blocks,
1167            Ok(Err(err)) => {
1168                debug!("Failed to fetch blocks with fetch_after_rounds from peer {peer}: {err}");
1169                return vec![];
1170            }
1171            Err(_) => {
1172                debug!("Timed out fetching blocks with fetch_after_rounds from peer {peer}");
1173                return vec![];
1174            }
1175        };
1176
1177        let blocks_guard = BlocksGuard {
1178            map: inflight_blocks,
1179            block_refs: BTreeSet::new(),
1180            peer: peer.clone(),
1181        };
1182
1183        vec![(blocks_guard, serialized_blocks, peer)]
1184    }
1185
1186    /// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
1187    /// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
1188    /// authorities with missing blocks.
1189    /// Each element of the vector is a tuple which contains the requested missing block refs, the returned blocks and
1190    /// the peer.
1191    async fn fetch_blocks_from_peers(
1192        context: Arc<Context>,
1193        inflight_blocks: Arc<InflightBlocksMap>,
1194        network_client: Arc<SynchronizerClient<VC, OC>>,
1195        missing_blocks: BTreeSet<BlockRef>,
1196        dag_state: Arc<RwLock<DagState>>,
1197        peers_pool: Arc<PeersPool>,
1198    ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1199        // Preliminary truncation of missing blocks to fetch. Since each peer can have different
1200        // number of missing blocks and the fetching is batched by peer, so keep more than max_blocks_per_fetch
1201        // per peer on average.
1202        let missing_blocks = missing_blocks
1203            .into_iter()
1204            .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_fetch)
1205            .collect::<Vec<_>>();
1206
1207        // Maps authorities to the missing blocks they have.
1208        let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1209        for block_ref in &missing_blocks {
1210            authorities
1211                .entry(block_ref.author)
1212                .or_default()
1213                .push(*block_ref);
1214        }
1215
1216        // Get known peers from the PeersPool
1217        let mut peers = peers_pool.get_known_peers();
1218
1219        // Distribute the same number of authorities into each peer to sync.
1220        // Use the number of known peers from the pool, capped at MAX_PERIODIC_SYNC_PEERS
1221        // TODO: in the future it would be possible, temporarily, for an Observer node to not have peers to fetch from.
1222        // We should change this assertion to allow for this case.
1223        assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1224
1225        let num_authorities_per_peer = authorities
1226            .len()
1227            .div_ceil(peers.len().min(MAX_PERIODIC_SYNC_PEERS));
1228
1229        // Update metrics related to missing blocks.
1230        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1231        for (authority, blocks) in &authorities {
1232            missing_blocks_per_authority[*authority] += blocks.len();
1233        }
1234        for (missing, (_, authority)) in missing_blocks_per_authority
1235            .into_iter()
1236            .zip_debug_eq(context.committee.authorities())
1237        {
1238            context
1239                .metrics
1240                .node_metrics
1241                .synchronizer_missing_blocks_by_authority
1242                .with_label_values(&[&authority.hostname])
1243                .inc_by(missing as u64);
1244            context
1245                .metrics
1246                .node_metrics
1247                .synchronizer_current_missing_blocks_by_authority
1248                .with_label_values(&[&authority.hostname])
1249                .set(missing as i64);
1250        }
1251
1252        // TODO: probably inject the RNG to allow unit testing - this is a work around for now.
1253        if cfg!(not(test)) {
1254            // Shuffle the peers
1255            peers.shuffle(&mut ThreadRng::default());
1256        }
1257
1258        let mut peers = peers.into_iter();
1259        let mut request_futures = FuturesUnordered::new();
1260
1261        // Shuffle the authorities for each request.
1262        let mut authorities = authorities.into_values().collect::<Vec<_>>();
1263        if cfg!(not(test)) {
1264            // Shuffle the authorities
1265            authorities.shuffle(&mut ThreadRng::default());
1266        }
1267
1268        let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1269
1270        // Send the fetch requests
1271        for batch in authorities.chunks(num_authorities_per_peer) {
1272            let Some(peer) = peers.next() else {
1273                debug_fatal!("No more peers left to fetch blocks!");
1274                break;
1275            };
1276            let peer_name = peer.hostname(&context);
1277            // Fetch from the lowest round missing blocks to ensure progress.
1278            // This may reduce efficiency and increase the chance of duplicated data transfer in edge cases.
1279            let block_refs = batch
1280                .iter()
1281                .flatten()
1282                .cloned()
1283                .collect::<BTreeSet<_>>()
1284                .into_iter()
1285                .take(context.parameters.max_blocks_per_fetch)
1286                .collect::<BTreeSet<_>>();
1287
1288            // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
1289            if let Some(blocks_guard) =
1290                inflight_blocks.lock_blocks(block_refs.clone(), peer.clone())
1291            {
1292                info!(
1293                    "Periodic sync of {} missing blocks from peer {} {:?}: {}",
1294                    peer_name.as_str(),
1295                    block_refs.len(),
1296                    peer,
1297                    block_refs
1298                        .iter()
1299                        .map(|b| b.to_string())
1300                        .collect::<Vec<_>>()
1301                        .join(", ")
1302                );
1303                request_futures.push(Self::fetch_blocks_request(
1304                    network_client.clone(),
1305                    peer,
1306                    blocks_guard,
1307                    fetch_after_rounds.clone(),
1308                    false,
1309                    FETCH_REQUEST_TIMEOUT,
1310                    1,
1311                ));
1312            }
1313        }
1314
1315        let mut results = Vec::new();
1316        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1317
1318        tokio::pin!(fetcher_timeout);
1319
1320        loop {
1321            tokio::select! {
1322                Some((response, blocks_guard, _retries, peer, fetch_after_rounds)) = request_futures.next() => {
1323                    match response {
1324                        Ok(fetched_blocks) => {
1325                            results.push((blocks_guard, fetched_blocks, peer));
1326
1327                            // no more pending requests are left, just break the loop
1328                            if request_futures.is_empty() {
1329                                break;
1330                            }
1331                        },
1332                        Err(_) => {
1333                            let peer_name = peer.labelname(&context);
1334                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_name.as_str(), "periodic"]).inc();
1335                            // try again if there is any peer left
1336                            if let Some(next_peer) = peers.next() {
1337                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1338                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer.clone()) {
1339                                    info!(
1340                                        "Retrying syncing {} missing blocks from peer {:?}: {}",
1341                                        blocks_guard.block_refs.len(),
1342                                        next_peer,
1343                                        blocks_guard.block_refs
1344                                            .iter()
1345                                            .map(|b| b.to_string())
1346                                            .collect::<Vec<_>>()
1347                                            .join(", ")
1348                                    );
1349                                    request_futures.push(Self::fetch_blocks_request(
1350                                        network_client.clone(),
1351                                        next_peer,
1352                                        blocks_guard,
1353                                        fetch_after_rounds,
1354                                        false,
1355                                        FETCH_REQUEST_TIMEOUT,
1356                                        1,
1357                                    ));
1358                                } else {
1359                                    debug!("Couldn't acquire locks to fetch blocks from peer {:?}.", next_peer)
1360                                }
1361                            } else {
1362                                debug!("No more peers left to fetch blocks");
1363                            }
1364                        }
1365                    }
1366                },
1367                _ = &mut fetcher_timeout => {
1368                    debug!("Timed out while fetching missing blocks");
1369                    break;
1370                }
1371            }
1372        }
1373
1374        results
1375    }
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380    use std::{
1381        collections::{BTreeMap, BTreeSet},
1382        sync::Arc,
1383        time::Duration,
1384    };
1385
1386    use async_trait::async_trait;
1387    use bytes::Bytes;
1388    use consensus_config::{AuthorityIndex, Parameters};
1389    use consensus_types::block::{BlockDigest, BlockRef, Round};
1390    use mysten_metrics::monitored_mpsc;
1391    use parking_lot::RwLock;
1392    use tokio::{sync::Mutex, time::sleep};
1393
1394    use crate::commit::{CommitVote, TrustedCommit};
1395    use crate::{
1396        CommitDigest, CommitIndex,
1397        block::{TestBlock, VerifiedBlock},
1398        block_verifier::NoopBlockVerifier,
1399        commit_vote_monitor::CommitVoteMonitor,
1400        context::Context,
1401        core_thread::CoreThreadDispatcher,
1402        dag_state::DagState,
1403        error::{ConsensusError, ConsensusResult},
1404        network::{
1405            BlockStream, ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient,
1406        },
1407        storage::mem_store::MemStore,
1408        synchronizer::{
1409            COMMIT_PROGRESS_TIMEOUT, FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT,
1410            InflightBlocksMap, Synchronizer,
1411        },
1412    };
1413    use crate::{
1414        commit_vote_monitor::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1415        peers_pool::PeersPool, round_tracker::RoundTracker,
1416        transaction_vote_tracker::TransactionVoteTracker,
1417    };
1418
1419    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1420    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1421    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1422    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1423
1424    #[derive(Default)]
1425    struct MockNetworkClient {
1426        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1427        fetch_latest_blocks_requests:
1428            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1429    }
1430
1431    impl MockNetworkClient {
1432        async fn stub_fetch_blocks(
1433            &self,
1434            blocks: Vec<VerifiedBlock>,
1435            peer: AuthorityIndex,
1436            latency: Option<Duration>,
1437        ) {
1438            let mut lock = self.fetch_blocks_requests.lock().await;
1439            let block_refs = blocks
1440                .iter()
1441                .map(|block| block.reference())
1442                .collect::<Vec<_>>();
1443            lock.insert((block_refs, peer), (blocks, latency));
1444        }
1445
1446        async fn stub_fetch_blocks_for_key(
1447            &self,
1448            key_refs: Vec<BlockRef>,
1449            response_blocks: Vec<VerifiedBlock>,
1450            peer: AuthorityIndex,
1451            latency: Option<Duration>,
1452        ) {
1453            let mut lock = self.fetch_blocks_requests.lock().await;
1454            lock.insert((key_refs, peer), (response_blocks, latency));
1455        }
1456
1457        async fn stub_fetch_latest_blocks(
1458            &self,
1459            blocks: Vec<VerifiedBlock>,
1460            peer: AuthorityIndex,
1461            authorities: Vec<AuthorityIndex>,
1462            latency: Option<Duration>,
1463        ) {
1464            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1465            lock.entry((peer, authorities))
1466                .or_default()
1467                .push((blocks, latency));
1468        }
1469
1470        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1471            let lock = self.fetch_latest_blocks_requests.lock().await;
1472            lock.len()
1473        }
1474    }
1475
1476    #[async_trait]
1477    impl ValidatorNetworkClient for MockNetworkClient {
1478        async fn subscribe_blocks(
1479            &self,
1480            _peer: AuthorityIndex,
1481            _last_received: Round,
1482            _timeout: Duration,
1483        ) -> ConsensusResult<BlockStream> {
1484            unimplemented!("subscribe_blocks not implemented in mock")
1485        }
1486
1487        async fn fetch_blocks(
1488            &self,
1489            peer: AuthorityIndex,
1490            block_refs: Vec<BlockRef>,
1491            _fetch_after_rounds: Vec<Round>,
1492            _fetch_missing_ancestors: bool,
1493            _timeout: Duration,
1494        ) -> ConsensusResult<Vec<Bytes>> {
1495            let mut lock = self.fetch_blocks_requests.lock().await;
1496            let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1497                panic!(
1498                    "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1499                    block_refs, peer, lock
1500                );
1501            });
1502
1503            let serialised = response
1504                .0
1505                .into_iter()
1506                .map(|block| block.serialized().clone())
1507                .collect::<Vec<_>>();
1508
1509            drop(lock);
1510
1511            if let Some(latency) = response.1 {
1512                sleep(latency).await;
1513            }
1514
1515            Ok(serialised)
1516        }
1517
1518        async fn fetch_commits(
1519            &self,
1520            _peer: AuthorityIndex,
1521            _commit_range: crate::commit::CommitRange,
1522            _timeout: Duration,
1523        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1524            unimplemented!("fetch_commits not implemented in mock")
1525        }
1526
1527        async fn fetch_latest_blocks(
1528            &self,
1529            peer: AuthorityIndex,
1530            authorities: Vec<AuthorityIndex>,
1531            _timeout: Duration,
1532        ) -> ConsensusResult<Vec<Bytes>> {
1533            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1534            let mut responses = lock
1535                .remove(&(peer, authorities.clone()))
1536                .expect("Unexpected fetch blocks request made");
1537
1538            let response = responses.remove(0);
1539            let serialised = response
1540                .0
1541                .into_iter()
1542                .map(|block| block.serialized().clone())
1543                .collect::<Vec<_>>();
1544
1545            if !responses.is_empty() {
1546                lock.insert((peer, authorities), responses);
1547            }
1548
1549            drop(lock);
1550
1551            if let Some(latency) = response.1 {
1552                sleep(latency).await;
1553            }
1554
1555            Ok(serialised)
1556        }
1557
1558        async fn get_latest_rounds(
1559            &self,
1560            _peer: AuthorityIndex,
1561            _timeout: Duration,
1562        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1563            unimplemented!("get_latest_rounds not implemented in mock")
1564        }
1565
1566        #[cfg(test)]
1567        async fn send_block(
1568            &self,
1569            _peer: AuthorityIndex,
1570            _block: &VerifiedBlock,
1571            _timeout: Duration,
1572        ) -> ConsensusResult<()> {
1573            unimplemented!("send_block not implemented in mock")
1574        }
1575    }
1576
1577    #[async_trait]
1578    impl ObserverNetworkClient for MockNetworkClient {
1579        async fn stream_blocks(
1580            &self,
1581            _peer: crate::network::PeerId,
1582            _highest_round_per_authority: Vec<u64>,
1583            _timeout: Duration,
1584        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1585            unimplemented!("stream_blocks not implemented in mock")
1586        }
1587
1588        async fn fetch_blocks(
1589            &self,
1590            _peer: crate::network::PeerId,
1591            _block_refs: Vec<BlockRef>,
1592            _fetch_after_rounds: Vec<Round>,
1593            _fetch_missing_ancestors: bool,
1594            _timeout: Duration,
1595        ) -> ConsensusResult<Vec<Bytes>> {
1596            unimplemented!("Observer fetch_blocks not implemented in mock")
1597        }
1598
1599        async fn fetch_commits(
1600            &self,
1601            _peer: crate::network::PeerId,
1602            _commit_range: crate::commit::CommitRange,
1603            _timeout: Duration,
1604        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1605            unimplemented!("Observer fetch_commits not implemented in mock")
1606        }
1607    }
1608
1609    #[test]
1610    fn test_inflight_blocks_map() {
1611        // GIVEN
1612        let map = InflightBlocksMap::new();
1613        let some_block_refs = [
1614            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1615            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1616            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1617            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1618        ];
1619        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1620
1621        // Lock & unlock blocks
1622        {
1623            let mut all_guards = Vec::new();
1624
1625            // Try to acquire the block locks for authorities 1 & 2
1626            for i in 1..=2 {
1627                let authority = AuthorityIndex::new_for_test(i);
1628                let peer = PeerId::Validator(authority);
1629
1630                let guard = map.lock_blocks(missing_block_refs.clone(), peer.clone());
1631                let guard = guard.expect("Guard should be created");
1632                assert_eq!(guard.block_refs.len(), 4);
1633
1634                all_guards.push(guard);
1635
1636                // trying to acquire any of them again will not succeed
1637                let guard = map.lock_blocks(missing_block_refs.clone(), peer);
1638                assert!(guard.is_none());
1639            }
1640
1641            // Trying to acquire for authority 3 it will fail - as we have maxed out the number of allowed peers
1642            let authority_3 = AuthorityIndex::new_for_test(3);
1643            let peer_3 = PeerId::Validator(authority_3);
1644
1645            let guard = map.lock_blocks(missing_block_refs.clone(), peer_3.clone());
1646            assert!(guard.is_none());
1647
1648            // Explicitly drop the guard of authority 1 and try for authority 3 again - it will now succeed
1649            drop(all_guards.remove(0));
1650
1651            let guard = map.lock_blocks(missing_block_refs.clone(), peer_3);
1652            let guard = guard.expect("Guard should be successfully acquired");
1653
1654            assert_eq!(guard.block_refs, missing_block_refs);
1655
1656            // Dropping all guards should unlock on the block refs
1657            drop(guard);
1658            drop(all_guards);
1659
1660            assert_eq!(map.num_of_locked_blocks(), 0);
1661        }
1662
1663        // Swap locks
1664        {
1665            // acquire a lock for authority 1
1666            let authority_1 = AuthorityIndex::new_for_test(1);
1667            let peer_1 = PeerId::Validator(authority_1);
1668            let guard = map.lock_blocks(missing_block_refs.clone(), peer_1).unwrap();
1669
1670            // Now swap the locks for authority 2
1671            let authority_2 = AuthorityIndex::new_for_test(2);
1672            let peer_2 = PeerId::Validator(authority_2);
1673            let guard = map.swap_locks(guard, peer_2);
1674
1675            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1676        }
1677    }
1678
1679    #[tokio::test]
1680    async fn successful_fetch_blocks_from_peer() {
1681        // GIVEN
1682        let (context, _) = Context::new_for_test(4);
1683        let context = Arc::new(context);
1684        let block_verifier = Arc::new(NoopBlockVerifier {});
1685        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1686        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1687        let mock_client = Arc::new(MockNetworkClient::default());
1688        let store = Arc::new(MemStore::new());
1689        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1690        let transaction_vote_tracker =
1691            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1692        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1693
1694        let network_client = Arc::new(SynchronizerClient::new(
1695            context.clone(),
1696            Some(mock_client.clone()),
1697            Some(mock_client.clone()),
1698        ));
1699        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1700        let handle = Synchronizer::start(
1701            network_client,
1702            context.clone(),
1703            core_dispatcher.clone(),
1704            commit_vote_monitor,
1705            block_verifier,
1706            transaction_vote_tracker,
1707            round_tracker,
1708            dag_state,
1709            peers_pool.clone(),
1710            false,
1711        );
1712
1713        // Create some test blocks
1714        let expected_blocks = (0..10)
1715            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1716            .collect::<Vec<_>>();
1717        let missing_blocks = expected_blocks
1718            .iter()
1719            .map(|block| block.reference())
1720            .collect::<BTreeSet<_>>();
1721
1722        // AND stub the fetch_blocks request from peer 1
1723        let peer = AuthorityIndex::new_for_test(1);
1724        mock_client
1725            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1726            .await;
1727
1728        // WHEN request missing blocks from peer 1
1729        assert!(
1730            handle
1731                .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1732                .await
1733                .is_ok()
1734        );
1735
1736        // Wait a little bit until those have been added in core
1737        sleep(Duration::from_millis(1_000)).await;
1738
1739        // THEN ensure those ended up in Core
1740        let added_blocks = core_dispatcher.get_add_blocks().await;
1741        assert_eq!(added_blocks, expected_blocks);
1742    }
1743
1744    #[tokio::test]
1745    async fn saturate_fetch_blocks_from_peer() {
1746        // GIVEN
1747        let (context, _) = Context::new_for_test(4);
1748        let context = Arc::new(context);
1749        let block_verifier = Arc::new(NoopBlockVerifier {});
1750        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1751        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1752        let mock_client = Arc::new(MockNetworkClient::default());
1753        let store = Arc::new(MemStore::new());
1754        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1755        let transaction_vote_tracker =
1756            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1757        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1758
1759        let network_client = Arc::new(SynchronizerClient::new(
1760            context.clone(),
1761            Some(mock_client.clone()),
1762            Some(mock_client.clone()),
1763        ));
1764        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1765        let handle = Synchronizer::start(
1766            network_client,
1767            context.clone(),
1768            core_dispatcher.clone(),
1769            commit_vote_monitor,
1770            block_verifier,
1771            transaction_vote_tracker,
1772            round_tracker,
1773            dag_state,
1774            peers_pool.clone(),
1775            false,
1776        );
1777
1778        // Create some test blocks
1779        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1780            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1781            .collect::<Vec<_>>();
1782
1783        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1784        let peer = AuthorityIndex::new_for_test(1);
1785        let mut iter = expected_blocks.iter().peekable();
1786        while let Some(block) = iter.next() {
1787            // stub the fetch_blocks request from peer 1 and give some high response latency so requests
1788            // can start blocking the peer task.
1789            mock_client
1790                .stub_fetch_blocks(
1791                    vec![block.clone()],
1792                    peer,
1793                    Some(Duration::from_millis(5_000)),
1794                )
1795                .await;
1796
1797            let mut missing_blocks = BTreeSet::new();
1798            missing_blocks.insert(block.reference());
1799
1800            // WHEN requesting to fetch the blocks, it should not succeed for the last request and get
1801            // an error with "saturated" synchronizer
1802            if iter.peek().is_none() {
1803                match handle
1804                    .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1805                    .await
1806                {
1807                    Err(ConsensusError::SynchronizerSaturated(peer_str)) => {
1808                        assert_eq!(peer_str, format!("{:?}", PeerId::Validator(peer)));
1809                    }
1810                    _ => panic!("A saturated synchronizer error was expected"),
1811                }
1812            } else {
1813                assert!(
1814                    handle
1815                        .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1816                        .await
1817                        .is_ok()
1818                );
1819            }
1820        }
1821    }
1822
1823    #[tokio::test(flavor = "current_thread", start_paused = true)]
1824    async fn synchronizer_periodic_task_fetch_blocks() {
1825        // GIVEN
1826        let (context, _) = Context::new_for_test(4);
1827        let context = Arc::new(context);
1828        let block_verifier = Arc::new(NoopBlockVerifier {});
1829        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1830        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1831        let mock_client = Arc::new(MockNetworkClient::default());
1832        let store = Arc::new(MemStore::new());
1833        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1834        let transaction_vote_tracker =
1835            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1836        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1837
1838        // Create some test blocks
1839        let expected_blocks = (0..10)
1840            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1841            .collect::<Vec<_>>();
1842        let missing_blocks = expected_blocks
1843            .iter()
1844            .map(|block| block.reference())
1845            .collect::<BTreeSet<_>>();
1846
1847        // AND stub the missing blocks
1848        core_dispatcher
1849            .stub_missing_blocks(missing_blocks.clone())
1850            .await;
1851
1852        // AND stub the requests for authority 1 & 2
1853        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1854        // we are skipped anyways.
1855        mock_client
1856            .stub_fetch_blocks(
1857                expected_blocks.clone(),
1858                AuthorityIndex::new_for_test(1),
1859                Some(FETCH_REQUEST_TIMEOUT),
1860            )
1861            .await;
1862        mock_client
1863            .stub_fetch_blocks(
1864                expected_blocks.clone(),
1865                AuthorityIndex::new_for_test(2),
1866                None,
1867            )
1868            .await;
1869
1870        // WHEN start the synchronizer and wait for a couple of seconds
1871        let network_client = Arc::new(SynchronizerClient::new(
1872            context.clone(),
1873            Some(mock_client.clone()),
1874            Some(mock_client.clone()),
1875        ));
1876        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1877        let _handle = Synchronizer::start(
1878            network_client,
1879            context.clone(),
1880            core_dispatcher.clone(),
1881            commit_vote_monitor,
1882            block_verifier,
1883            transaction_vote_tracker,
1884            round_tracker,
1885            dag_state,
1886            peers_pool.clone(),
1887            false,
1888        );
1889
1890        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1891
1892        // THEN the missing blocks should now be fetched and added to core
1893        let added_blocks = core_dispatcher.get_add_blocks().await;
1894        assert_eq!(added_blocks, expected_blocks);
1895
1896        // AND missing blocks should have been consumed by the stub
1897        assert!(
1898            core_dispatcher
1899                .get_missing_blocks()
1900                .await
1901                .unwrap()
1902                .is_empty()
1903        );
1904    }
1905
1906    #[tokio::test(flavor = "current_thread", start_paused = true)]
1907    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1908        // GIVEN
1909        let (context, _) = Context::new_for_test(4);
1910        let context = Arc::new(context);
1911        let block_verifier = Arc::new(NoopBlockVerifier {});
1912        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1913        let mock_client = Arc::new(MockNetworkClient::default());
1914        let store = Arc::new(MemStore::new());
1915        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1916        let transaction_vote_tracker =
1917            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1918        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1919        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1920
1921        // AND stub some missing blocks. The highest accepted round is 0. Create blocks that are above the sync threshold.
1922        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1923        let stub_blocks = (sync_missing_block_round_threshold * 2
1924            ..sync_missing_block_round_threshold * 3)
1925            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1926            .collect::<Vec<_>>();
1927        let missing_blocks = stub_blocks
1928            .iter()
1929            .map(|block| block.reference())
1930            .collect::<BTreeSet<_>>();
1931        core_dispatcher
1932            .stub_missing_blocks(missing_blocks.clone())
1933            .await;
1934
1935        // AND stub the requests for authority 1 & 2
1936        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1937        // we are skipped anyways. Stub all blocks since the full set is sent in one request.
1938        // Only the first max_blocks_per_sync blocks will be processed by process_fetched_blocks.
1939        mock_client
1940            .stub_fetch_blocks(
1941                stub_blocks.clone(),
1942                AuthorityIndex::new_for_test(1),
1943                Some(FETCH_REQUEST_TIMEOUT),
1944            )
1945            .await;
1946        mock_client
1947            .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
1948            .await;
1949        let mut expected_blocks = stub_blocks
1950            .iter()
1951            .take(context.parameters.max_blocks_per_sync)
1952            .cloned()
1953            .collect::<Vec<_>>();
1954
1955        // Now create some blocks to simulate a commit lag
1956        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1957        let commit_index: CommitIndex = round - 1;
1958        let blocks = (0..4)
1959            .map(|authority| {
1960                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1961                let block = TestBlock::new(round, authority)
1962                    .set_commit_votes(commit_votes)
1963                    .build();
1964
1965                VerifiedBlock::new_for_test(block)
1966            })
1967            .collect::<Vec<_>>();
1968
1969        // Pass them through the commit vote monitor - so now there will be a big commit lag to prevent
1970        // the scheduled synchronizer from running
1971        for block in blocks {
1972            commit_vote_monitor.observe_block(&block);
1973        }
1974
1975        // WHEN start the synchronizer and wait for a couple of seconds where normally the synchronizer should have kicked in.
1976        let network_client = Arc::new(SynchronizerClient::new(
1977            context.clone(),
1978            Some(mock_client.clone()),
1979            Some(mock_client.clone()),
1980        ));
1981        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1982        let _handle = Synchronizer::start(
1983            network_client,
1984            context.clone(),
1985            core_dispatcher.clone(),
1986            commit_vote_monitor.clone(),
1987            block_verifier,
1988            transaction_vote_tracker,
1989            round_tracker,
1990            dag_state.clone(),
1991            peers_pool.clone(),
1992            false,
1993        );
1994
1995        // Wait long enough for periodic sync to have run, but stay under COMMIT_PROGRESS_TIMEOUT
1996        // to avoid triggering commit sync failover.
1997        sleep(COMMIT_PROGRESS_TIMEOUT / 2).await;
1998
1999        // Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
2000        // sent to core for processing.
2001        let added_blocks = core_dispatcher.get_add_blocks().await;
2002        assert_eq!(added_blocks, vec![]);
2003
2004        // AND advance now the local commit index by adding a new commit that matches the commit index
2005        // of quorum
2006        {
2007            let mut d = dag_state.write();
2008            for index in 1..=commit_index {
2009                let commit =
2010                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2011
2012                d.add_commit(commit);
2013            }
2014
2015            assert_eq!(
2016                d.last_commit_index(),
2017                commit_vote_monitor.quorum_commit_index()
2018            );
2019        }
2020
2021        // Now stub again the missing blocks to fetch the exact same ones.
2022        core_dispatcher
2023            .stub_missing_blocks(missing_blocks.clone())
2024            .await;
2025
2026        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2027
2028        // THEN the missing blocks should now be fetched and added to core
2029        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2030
2031        added_blocks.sort_by_key(|block| block.reference());
2032        expected_blocks.sort_by_key(|block| block.reference());
2033
2034        assert_eq!(added_blocks, expected_blocks);
2035    }
2036
2037    #[tokio::test(flavor = "current_thread", start_paused = true)]
2038    async fn synchronizer_periodic_sync_resumes_when_commit_sync_stalled() {
2039        // GIVEN
2040        let (context, _) = Context::new_for_test(4);
2041        let context = Arc::new(context);
2042        let block_verifier = Arc::new(NoopBlockVerifier {});
2043        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2044        let mock_client = Arc::new(MockNetworkClient::default());
2045        let store = Arc::new(MemStore::new());
2046        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2047        let transaction_vote_tracker =
2048            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2049        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2050        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2051
2052        // AND create missing blocks to fetch
2053        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2054        let stub_blocks = (sync_missing_block_round_threshold * 2
2055            ..sync_missing_block_round_threshold * 3)
2056            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2057            .collect::<Vec<_>>();
2058        let missing_blocks = stub_blocks
2059            .iter()
2060            .map(|block| block.reference())
2061            .collect::<BTreeSet<_>>();
2062        core_dispatcher
2063            .stub_missing_blocks(missing_blocks.clone())
2064            .await;
2065
2066        // AND stub the fetch responses for authority 1 & 2.
2067        // Stub all blocks since the full set is sent in one request.
2068        mock_client
2069            .stub_fetch_blocks(
2070                stub_blocks.clone(),
2071                AuthorityIndex::new_for_test(1),
2072                Some(FETCH_REQUEST_TIMEOUT),
2073            )
2074            .await;
2075        mock_client
2076            .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
2077            .await;
2078        let mut expected_blocks = stub_blocks
2079            .iter()
2080            .take(context.parameters.max_blocks_per_sync)
2081            .cloned()
2082            .collect::<Vec<_>>();
2083
2084        // AND create commit lag by observing high commit votes
2085        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2086        let commit_index: CommitIndex = round - 1;
2087        let blocks = (0..4)
2088            .map(|authority| {
2089                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2090                let block = TestBlock::new(round, authority)
2091                    .set_commit_votes(commit_votes)
2092                    .build();
2093                VerifiedBlock::new_for_test(block)
2094            })
2095            .collect::<Vec<_>>();
2096        for block in blocks {
2097            commit_vote_monitor.observe_block(&block);
2098        }
2099
2100        // WHEN start the synchronizer
2101        let network_client = Arc::new(SynchronizerClient::new(
2102            context.clone(),
2103            Some(mock_client.clone()),
2104            Some(mock_client.clone()),
2105        ));
2106        let peers_pool = Arc::new(PeersPool::new(context.clone()));
2107        let _handle = Synchronizer::start(
2108            network_client,
2109            context.clone(),
2110            core_dispatcher.clone(),
2111            commit_vote_monitor.clone(),
2112            block_verifier,
2113            transaction_vote_tracker,
2114            round_tracker,
2115            dag_state.clone(),
2116            peers_pool.clone(),
2117            false,
2118        );
2119
2120        // Wait just under COMMIT_PROGRESS_TIMEOUT — sync should be skipped (commit lagging).
2121        sleep(COMMIT_PROGRESS_TIMEOUT - Duration::from_millis(100)).await;
2122        let added_blocks = core_dispatcher.get_add_blocks().await;
2123        assert_eq!(added_blocks, vec![]);
2124
2125        // Re-stub missing blocks (consumed by earlier get_missing_blocks calls).
2126        core_dispatcher
2127            .stub_missing_blocks(missing_blocks.clone())
2128            .await;
2129        // Stub the failover fetch (empty block_refs key, peer 1) before the stall triggers.
2130        // Use latency to prevent immediate completion and re-triggering.
2131        mock_client
2132            .stub_fetch_blocks_for_key(
2133                vec![],
2134                expected_blocks.clone(),
2135                AuthorityIndex::new_for_test(1),
2136                Some(Duration::from_millis(500)),
2137            )
2138            .await;
2139
2140        // Now sleep past the stall trigger + time for the fetch to complete.
2141        sleep(Duration::from_millis(200) + FETCH_REQUEST_TIMEOUT).await;
2142
2143        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2144        assert!(
2145            !added_blocks.is_empty(),
2146            "Expected periodic sync to resume after commit sync stall"
2147        );
2148        added_blocks.sort_by_key(|block| block.reference());
2149        expected_blocks.sort_by_key(|block| block.reference());
2150        assert_eq!(added_blocks, expected_blocks);
2151
2152        // AND advance commit index enough to reach commit sync batch size from stall start.
2153        // This should resolve the failover and skip periodic sync again.
2154        core_dispatcher.get_add_blocks().await;
2155        {
2156            let current = dag_state.read().last_commit_index();
2157            let mut d = dag_state.write();
2158            // Advance well past the recovery threshold.
2159            for index in (current + 1)..=(current + context.parameters.commit_sync_batch_size) {
2160                let commit =
2161                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2162                d.add_commit(commit);
2163            }
2164        }
2165
2166        // Re-stub missing blocks and fetch response for another round of checking
2167        core_dispatcher
2168            .stub_missing_blocks(missing_blocks.clone())
2169            .await;
2170        mock_client
2171            .stub_fetch_blocks(
2172                stub_blocks
2173                    .iter()
2174                    .take(context.parameters.max_blocks_per_sync)
2175                    .cloned()
2176                    .collect::<Vec<_>>(),
2177                AuthorityIndex::new_for_test(2),
2178                None,
2179            )
2180            .await;
2181
2182        // Wait for a sync cycle — periodic sync should be skipped again since stall resolved
2183        // but commit is still lagging
2184        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2185        let added_blocks = core_dispatcher.get_add_blocks().await;
2186        assert_eq!(
2187            added_blocks,
2188            vec![],
2189            "Expected periodic sync to be skipped after stall resolved"
2190        );
2191    }
2192
2193    #[tokio::test(flavor = "current_thread", start_paused = true)]
2194    async fn synchronizer_fetch_own_last_block() {
2195        // GIVEN
2196        let (context, _) = Context::new_for_test(4);
2197        let context = Arc::new(context.with_parameters(Parameters {
2198            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2199            ..Default::default()
2200        }));
2201        let block_verifier = Arc::new(NoopBlockVerifier {});
2202        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2203        let mock_client = Arc::new(MockNetworkClient::default());
2204        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2205        let store = Arc::new(MemStore::new());
2206        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2207        let transaction_vote_tracker =
2208            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2209        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2210        let our_index = AuthorityIndex::new_for_test(0);
2211
2212        // Create some test blocks
2213        let mut expected_blocks = (9..=10)
2214            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2215            .collect::<Vec<_>>();
2216
2217        // Now set different latest blocks for the peers
2218        // For peer 1 we give the block of round 10 (highest)
2219        let block_1 = expected_blocks.pop().unwrap();
2220        mock_client
2221            .stub_fetch_latest_blocks(
2222                vec![block_1.clone()],
2223                AuthorityIndex::new_for_test(1),
2224                vec![our_index],
2225                None,
2226            )
2227            .await;
2228        mock_client
2229            .stub_fetch_latest_blocks(
2230                vec![block_1],
2231                AuthorityIndex::new_for_test(1),
2232                vec![our_index],
2233                None,
2234            )
2235            .await;
2236
2237        // For peer 2 we give the block of round 9
2238        let block_2 = expected_blocks.pop().unwrap();
2239        mock_client
2240            .stub_fetch_latest_blocks(
2241                vec![block_2.clone()],
2242                AuthorityIndex::new_for_test(2),
2243                vec![our_index],
2244                Some(Duration::from_secs(10)),
2245            )
2246            .await;
2247        mock_client
2248            .stub_fetch_latest_blocks(
2249                vec![block_2],
2250                AuthorityIndex::new_for_test(2),
2251                vec![our_index],
2252                None,
2253            )
2254            .await;
2255
2256        // For peer 3 we don't give any block - and it should return an empty vector
2257        mock_client
2258            .stub_fetch_latest_blocks(
2259                vec![],
2260                AuthorityIndex::new_for_test(3),
2261                vec![our_index],
2262                Some(Duration::from_secs(10)),
2263            )
2264            .await;
2265        mock_client
2266            .stub_fetch_latest_blocks(
2267                vec![],
2268                AuthorityIndex::new_for_test(3),
2269                vec![our_index],
2270                None,
2271            )
2272            .await;
2273
2274        // WHEN start the synchronizer and wait for a couple of seconds
2275        let network_client = Arc::new(SynchronizerClient::new(
2276            context.clone(),
2277            Some(mock_client.clone()),
2278            Some(mock_client.clone()),
2279        ));
2280        let peers_pool = Arc::new(PeersPool::new(context.clone()));
2281        let handle = Synchronizer::start(
2282            network_client,
2283            context.clone(),
2284            core_dispatcher.clone(),
2285            commit_vote_monitor,
2286            block_verifier,
2287            transaction_vote_tracker,
2288            round_tracker,
2289            dag_state,
2290            peers_pool.clone(),
2291            true,
2292        );
2293
2294        // Wait at least for the timeout time
2295        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2296
2297        // Assert that core has been called to set the min propose round
2298        assert_eq!(
2299            core_dispatcher.get_last_own_proposed_round().await,
2300            vec![10]
2301        );
2302
2303        // Ensure that all the requests have been called
2304        assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
2305
2306        // And we got one retry
2307        assert_eq!(
2308            context
2309                .metrics
2310                .node_metrics
2311                .sync_last_known_own_block_retries
2312                .get(),
2313            1
2314        );
2315
2316        // Ensure that no panic occurred
2317        if let Err(err) = handle.stop().await
2318            && err.is_panic()
2319        {
2320            std::panic::resume_unwind(err.into_panic());
2321        }
2322    }
2323
2324    #[tokio::test]
2325    async fn test_process_fetched_blocks() {
2326        // GIVEN
2327        let (context, _) = Context::new_for_test(4);
2328        let context = Arc::new(context);
2329        let block_verifier = Arc::new(NoopBlockVerifier {});
2330        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2331        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2332        let store = Arc::new(MemStore::new());
2333        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2334        let transaction_vote_tracker =
2335            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2336        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2337        let (commands_sender, _commands_receiver) =
2338            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2339
2340        // Create input test blocks:
2341        // - Authority 0 block at round 60.
2342        // - Authority 1 blocks from round 30 to 60.
2343        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2344        expected_blocks.extend(
2345            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2346        );
2347        assert_eq!(
2348            expected_blocks.len(),
2349            context.parameters.max_blocks_per_sync
2350        );
2351
2352        let expected_serialized_blocks = expected_blocks
2353            .iter()
2354            .map(|b| b.serialized().clone())
2355            .collect::<Vec<_>>();
2356
2357        let expected_block_refs = expected_blocks
2358            .iter()
2359            .map(|b| b.reference())
2360            .collect::<BTreeSet<_>>();
2361
2362        // GIVEN peer to fetch blocks from
2363        let peer_index = AuthorityIndex::new_for_test(2);
2364        let peer = PeerId::Validator(peer_index);
2365
2366        // Create blocks_guard
2367        let inflight_blocks_map = InflightBlocksMap::new();
2368        let blocks_guard = inflight_blocks_map
2369            .lock_blocks(expected_block_refs.clone(), peer.clone())
2370            .expect("Failed to lock blocks");
2371
2372        assert_eq!(
2373            inflight_blocks_map.num_of_locked_blocks(),
2374            expected_block_refs.len()
2375        );
2376
2377        // Create a Synchronizer
2378        let result = Synchronizer::<
2379            NoopBlockVerifier,
2380            MockCoreThreadDispatcher,
2381            MockNetworkClient,
2382            MockNetworkClient,
2383        >::process_fetched_blocks(
2384            expected_serialized_blocks,
2385            peer,
2386            blocks_guard, // The guard is consumed here
2387            core_dispatcher.clone(),
2388            block_verifier,
2389            transaction_vote_tracker,
2390            commit_vote_monitor,
2391            context.clone(),
2392            commands_sender,
2393            round_tracker,
2394            "test",
2395        )
2396        .await;
2397
2398        // THEN
2399        assert!(result.is_ok());
2400
2401        // Check blocks were sent to core
2402        let added_blocks = core_dispatcher.get_add_blocks().await;
2403        assert_eq!(
2404            added_blocks
2405                .iter()
2406                .map(|b| b.reference())
2407                .collect::<BTreeSet<_>>(),
2408            expected_block_refs,
2409        );
2410
2411        // Check blocks were unlocked
2412        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2413    }
2414}