consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{
4    collections::{BTreeMap, BTreeSet},
5    sync::Arc,
6    time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::{ZipDebugEqIteratorExt, debug_fatal};
15use mysten_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25    runtime::Handle,
26    sync::{mpsc::error::TrySendError, oneshot},
27    task::{JoinError, JoinSet},
28    time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, info, trace, warn};
31
32use crate::{
33    BlockAPI,
34    block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35    block_verifier::BlockVerifier,
36    commit::CommitIndex,
37    commit_vote_monitor::CommitVoteMonitor,
38    context::Context,
39    dag_state::DagState,
40    error::{ConsensusError, ConsensusResult},
41    network::{ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient},
42    peers_pool::PeersPool,
43    round_tracker::RoundTracker,
44};
45use crate::{
46    authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
47    transaction_vote_tracker::TransactionVoteTracker,
48};
49
50/// The number of concurrent fetch blocks requests per authority
51const FETCH_BLOCKS_CONCURRENCY: usize = 5;
52
53/// Timeouts when fetching blocks.
54const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
55const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
56
57const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
58
59// Max number of peers to request missing blocks concurrently in periodic sync.
60const MAX_PERIODIC_SYNC_PEERS: usize = 3;
61
62/// How long commit must be stalled before periodic sync kicks in as fallback.
63const COMMIT_PROGRESS_TIMEOUT: Duration = Duration::from_secs(10);
64
65struct BlocksGuard {
66    map: Arc<InflightBlocksMap>,
67    block_refs: BTreeSet<BlockRef>,
68    peer: PeerId,
69}
70
71impl Drop for BlocksGuard {
72    fn drop(&mut self) {
73        self.map.unlock_blocks(&self.block_refs, self.peer.clone());
74    }
75}
76
77// Keeps a mapping between the missing blocks that have been instructed to be fetched and the peers
78// that are currently fetching them. For a block ref there is a maximum number of peers that can
79// concurrently fetch it. The peer ids that are currently fetching a block are set on the corresponding
80// `BTreeSet` and basically they act as "locks".
81struct InflightBlocksMap {
82    inner: Mutex<BTreeMap<BlockRef, BTreeSet<PeerId>>>,
83}
84
85impl InflightBlocksMap {
86    fn new() -> Arc<Self> {
87        Arc::new(Self {
88            inner: Mutex::new(BTreeMap::new()),
89        })
90    }
91
92    /// Locks the blocks to be fetched for the assigned `peer`. We want to avoid re-fetching the
93    /// missing blocks from too many peers at the same time, thus we limit the concurrency
94    /// per block by attempting to lock per block. If a block is already fetched by the maximum allowed
95    /// number of peers, then the block ref will not be included in the returned set. The method
96    /// returns all the block refs that have been successfully locked and allowed to be fetched.
97    fn lock_blocks(
98        self: &Arc<Self>,
99        missing_block_refs: BTreeSet<BlockRef>,
100        peer: PeerId,
101    ) -> Option<BlocksGuard> {
102        let mut blocks = BTreeSet::new();
103        let mut inner = self.inner.lock();
104
105        for block_ref in missing_block_refs {
106            // check that the number of peers that are already instructed to fetch the block is not
107            // higher than the allowed and the `peer` has not already been instructed to do that.
108            let peers = inner.entry(block_ref).or_default();
109            if peers.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK && peers.get(&peer).is_none() {
110                assert!(peers.insert(peer.clone()));
111                blocks.insert(block_ref);
112            }
113        }
114
115        if blocks.is_empty() {
116            None
117        } else {
118            Some(BlocksGuard {
119                map: self.clone(),
120                block_refs: blocks,
121                peer,
122            })
123        }
124    }
125
126    /// Unlocks the provided block references for the given `peer`. The unlocking is strict, meaning that
127    /// if this method is called for a specific block ref and peer more times than the corresponding lock
128    /// has been called, it will panic.
129    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: PeerId) {
130        // Now mark all the blocks as fetched from the map
131        let mut blocks_to_fetch = self.inner.lock();
132        for block_ref in block_refs {
133            let peers = blocks_to_fetch
134                .get_mut(block_ref)
135                .expect("Should have found a non empty map");
136
137            assert!(peers.remove(&peer), "Peer should be present!");
138
139            // if the last one then just clean up
140            if peers.is_empty() {
141                blocks_to_fetch.remove(block_ref);
142            }
143        }
144    }
145
146    /// Drops the provided `blocks_guard` which will force to unlock the blocks, and lock now again the
147    /// referenced block refs. The swap is best effort and there is no guarantee that the `peer` will
148    /// be able to acquire the new locks.
149    fn swap_locks(
150        self: &Arc<Self>,
151        blocks_guard: BlocksGuard,
152        peer: PeerId,
153    ) -> Option<BlocksGuard> {
154        let block_refs = blocks_guard.block_refs.clone();
155
156        // Explicitly drop the guard
157        drop(blocks_guard);
158
159        // Now create new guard
160        self.lock_blocks(block_refs, peer)
161    }
162
163    #[cfg(test)]
164    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
165        let inner = self.inner.lock();
166        inner.len()
167    }
168}
169
170enum Command {
171    FetchBlocks {
172        missing_block_refs: BTreeSet<BlockRef>,
173        peer: PeerId,
174        result: oneshot::Sender<Result<(), ConsensusError>>,
175    },
176    FetchOwnLastBlock,
177    KickOffScheduler,
178}
179
180pub(crate) struct SynchronizerHandle {
181    commands_sender: Sender<Command>,
182    tasks: tokio::sync::Mutex<JoinSet<()>>,
183}
184
185impl SynchronizerHandle {
186    /// Explicitly asks from the synchronizer to fetch the blocks - provided the block_refs set - from
187    /// the peer.
188    pub(crate) async fn fetch_blocks(
189        &self,
190        missing_block_refs: BTreeSet<BlockRef>,
191        peer: PeerId,
192    ) -> ConsensusResult<()> {
193        let (sender, receiver) = oneshot::channel();
194        self.commands_sender
195            .send(Command::FetchBlocks {
196                missing_block_refs,
197                peer,
198                result: sender,
199            })
200            .await
201            .map_err(|_err| ConsensusError::Shutdown)?;
202        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
203    }
204
205    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
206        let mut tasks = self.tasks.lock().await;
207        tasks.abort_all();
208        while let Some(result) = tasks.join_next().await {
209            result?
210        }
211        Ok(())
212    }
213
214    #[cfg(test)]
215    /// Creates a mock synchronizer handle for testing
216    pub(crate) fn new_for_test() -> Arc<Self> {
217        use tokio::task::JoinSet;
218        let (tx, _rx) = channel("test_synchronizer", 1);
219        Arc::new(Self {
220            commands_sender: tx,
221            tasks: tokio::sync::Mutex::new(JoinSet::new()),
222        })
223    }
224}
225
226/// `Synchronizer` oversees live block synchronization, crucial for node progress. Live synchronization
227/// refers to the process of retrieving missing blocks, particularly those essential for advancing a node
228/// when data from only a few rounds is absent. If a node significantly lags behind the network,
229/// `commit_syncer` handles fetching missing blocks via a more efficient approach. `Synchronizer`
230/// aims for swift catch-up employing two mechanisms:
231///
232/// 1. Explicitly requesting missing blocks from designated authorities via the "block send" path.
233///    This includes attempting to fetch any missing ancestors necessary for processing a received block.
234///    Such requests prioritize the block author, maximizing the chance of prompt retrieval.
235///    A locking mechanism allows concurrent requests for missing blocks from up to two authorities
236///    simultaneously, enhancing the chances of timely retrieval. Notably, if additional missing blocks
237///    arise during block processing, requests to the same authority are deferred to the scheduler.
238///
239/// 2. Periodically requesting missing blocks via a scheduler. This primarily serves to retrieve
240///    missing blocks that were not ancestors of a received block via the "block send" path.
241///    The scheduler operates on either a fixed periodic basis or is triggered immediately
242///    after explicit fetches described in (1), ensuring continued block retrieval if gaps persist.
243///
244/// Additionally to the above, the synchronizer can synchronize and fetch the last own proposed block
245/// from the network peers as best effort approach to recover node from amnesia and avoid making the
246/// node equivocate.
247pub(crate) struct Synchronizer<
248    V: BlockVerifier,
249    D: CoreThreadDispatcher,
250    VC: ValidatorNetworkClient,
251    OC: ObserverNetworkClient,
252> {
253    context: Arc<Context>,
254    commands_receiver: Receiver<Command>,
255    fetch_block_senders: BTreeMap<PeerId, Sender<BlocksGuard>>,
256    core_dispatcher: Arc<D>,
257    commit_vote_monitor: Arc<CommitVoteMonitor>,
258    dag_state: Arc<RwLock<DagState>>,
259    fetch_blocks_scheduler_task: JoinSet<()>,
260    fetch_own_last_block_task: JoinSet<()>,
261    network_client: Arc<SynchronizerClient<VC, OC>>,
262    block_verifier: Arc<V>,
263    transaction_vote_tracker: TransactionVoteTracker,
264    round_tracker: Arc<RwLock<RoundTracker>>,
265    inflight_blocks_map: Arc<InflightBlocksMap>,
266    commands_sender: Sender<Command>,
267    last_changed_commit_index: CommitIndex,
268    last_commit_change_time: Instant,
269    // When commit is not progressing, commit sync fails over to periodic sync for catchup.
270    commit_sync_failover: bool,
271    peers_pool: Arc<PeersPool>,
272}
273
274impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
275where
276    V: BlockVerifier,
277    D: CoreThreadDispatcher,
278    VC: ValidatorNetworkClient,
279    OC: ObserverNetworkClient,
280{
281    pub(crate) fn start(
282        network_client: Arc<SynchronizerClient<VC, OC>>,
283        context: Arc<Context>,
284        core_dispatcher: Arc<D>,
285        commit_vote_monitor: Arc<CommitVoteMonitor>,
286        block_verifier: Arc<V>,
287        transaction_vote_tracker: TransactionVoteTracker,
288        round_tracker: Arc<RwLock<RoundTracker>>,
289        dag_state: Arc<RwLock<DagState>>,
290        peers_pool: Arc<PeersPool>,
291        sync_last_known_own_block: bool,
292    ) -> Arc<SynchronizerHandle> {
293        let (commands_sender, commands_receiver) =
294            channel("consensus_synchronizer_commands", 1_000);
295        let inflight_blocks_map = InflightBlocksMap::new();
296
297        // Spawn the tasks to fetch the blocks from the others
298        let mut fetch_block_senders = BTreeMap::new();
299        let mut tasks = JoinSet::new();
300
301        // Create fetch tasks for all known peers (validators and observers)
302        // TODO: refactor to update the sender tasks based on the registered/removed pool peers.
303        let known_peers = peers_pool.get_known_peers();
304        for peer in known_peers {
305            let (sender, receiver) =
306                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
307            let fetch_blocks_from_peer_async = Self::fetch_blocks_from_peer(
308                peer.clone(),
309                network_client.clone(),
310                block_verifier.clone(),
311                transaction_vote_tracker.clone(),
312                commit_vote_monitor.clone(),
313                context.clone(),
314                core_dispatcher.clone(),
315                dag_state.clone(),
316                receiver,
317                commands_sender.clone(),
318                round_tracker.clone(),
319                peers_pool.clone(),
320            );
321            tasks.spawn(monitored_future!(fetch_blocks_from_peer_async));
322            fetch_block_senders.insert(peer, sender);
323        }
324
325        let commands_sender_clone = commands_sender.clone();
326
327        if sync_last_known_own_block {
328            commands_sender
329                .try_send(Command::FetchOwnLastBlock)
330                .expect("Failed to sync our last block");
331        }
332
333        // Spawn the task to listen to the requests & periodic runs
334        tasks.spawn(monitored_future!(async move {
335            let mut s = Self {
336                context,
337                commands_receiver,
338                fetch_block_senders,
339                core_dispatcher,
340                commit_vote_monitor,
341                fetch_blocks_scheduler_task: JoinSet::new(),
342                fetch_own_last_block_task: JoinSet::new(),
343                network_client,
344                block_verifier,
345                transaction_vote_tracker,
346                inflight_blocks_map,
347                commands_sender: commands_sender_clone,
348                dag_state,
349                round_tracker,
350                last_changed_commit_index: 0,
351                last_commit_change_time: Instant::now(),
352                commit_sync_failover: false,
353                peers_pool,
354            };
355            s.run().await;
356        }));
357
358        Arc::new(SynchronizerHandle {
359            commands_sender,
360            tasks: tokio::sync::Mutex::new(tasks),
361        })
362    }
363
364    // The main loop to listen for the submitted commands.
365    async fn run(&mut self) {
366        // We want the synchronizer to run periodically every 200ms to fetch any missing blocks.
367        const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
368        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
369
370        tokio::pin!(scheduler_timeout);
371
372        loop {
373            tokio::select! {
374                Some(command) = self.commands_receiver.recv() => {
375                    match command {
376                        Command::FetchBlocks{ missing_block_refs, peer, result } => {
377                            // Check if peer is available. This check also makes sure that we are not trying to fetch from ourselves.
378                            if !self.peers_pool.is_peer_known(&peer) {
379                                result.send(Err(ConsensusError::PeerUnavailable(format!("{:?}", peer)))).ok();
380                                continue;
381                            }
382
383                            // Keep only the max allowed blocks to request. Additional missing blocks
384                            // will be fetched via periodic sync.
385                            // Fetch from the lowest to highest round, to ensure progress.
386                            let missing_block_refs = missing_block_refs
387                                .into_iter()
388                                .take(self.context.parameters.max_blocks_per_sync)
389                                .collect();
390
391                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer.clone());
392                            let Some(blocks_guard) = blocks_guard else {
393                                result.send(Ok(())).ok();
394                                continue;
395                            };
396
397                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
398                            // synchronization task will handle any still missing blocks in next run.
399                            let r = self
400                                .fetch_block_senders
401                                .get(&peer)
402                                .ok_or(ConsensusError::PeerNotFound(format!("Peer {} not found in fetch_block_senders", peer)))
403                                .and_then(|sender| {
404                                    sender
405                                        .try_send(blocks_guard)
406                                        .map_err(|err| {
407                                            match err {
408                                                TrySendError::Full(_) => {
409                                                    let peer_name = peer.labelname(&self.context);
410                                                    self.context
411                                                        .metrics
412                                                        .node_metrics
413                                                        .synchronizer_skipped_fetch_requests
414                                                        .with_label_values(&[peer_name])
415                                                        .inc();
416                                                    ConsensusError::SynchronizerSaturated(format!("{:?}", peer))
417                                                },
418                                                TrySendError::Closed(_) => ConsensusError::Shutdown
419                                            }
420                                        })
421                                });
422
423                            result.send(r).ok();
424                        }
425                        Command::FetchOwnLastBlock => {
426                            if self.fetch_own_last_block_task.is_empty() {
427                                self.start_fetch_own_last_block_task();
428                            }
429                        }
430                        Command::KickOffScheduler => {
431                            // just reset the scheduler timeout timer to run immediately if not already running.
432                            // If the scheduler is already running then just reduce the remaining time to run.
433                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
434                                Instant::now()
435                            } else {
436                                Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
437                            };
438
439                            // only reset if it is earlier than the next deadline
440                            if timeout < scheduler_timeout.deadline() {
441                                scheduler_timeout.as_mut().reset(timeout);
442                            }
443                        }
444                    }
445                },
446                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
447                    match result {
448                        Ok(()) => {},
449                        Err(e) => {
450                            if e.is_cancelled() {
451                            } else if e.is_panic() {
452                                std::panic::resume_unwind(e.into_panic());
453                            } else {
454                                panic!("fetch our last block task failed: {e}");
455                            }
456                        },
457                    };
458                },
459                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
460                    match result {
461                        Ok(()) => {},
462                        Err(e) => {
463                            if e.is_cancelled() {
464                            } else if e.is_panic() {
465                                std::panic::resume_unwind(e.into_panic());
466                            } else {
467                                panic!("fetch blocks scheduler task failed: {e}");
468                            }
469                        },
470                    };
471                },
472                () = &mut scheduler_timeout => {
473                    // we want to start a new task only if the previous one has already finished.
474                    // TODO: consider starting backup fetches in parallel, when a fetch takes too long?
475                    if self.fetch_blocks_scheduler_task.is_empty()
476                        && let Err(err) = self.start_fetch_missing_blocks_task().await {
477                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
478                            return;
479                        };
480
481                    scheduler_timeout
482                        .as_mut()
483                        .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
484                }
485            }
486        }
487    }
488
489    async fn fetch_blocks_from_peer(
490        peer: PeerId,
491        network_client: Arc<SynchronizerClient<VC, OC>>,
492        block_verifier: Arc<V>,
493        transaction_vote_tracker: TransactionVoteTracker,
494        commit_vote_monitor: Arc<CommitVoteMonitor>,
495        context: Arc<Context>,
496        core_dispatcher: Arc<D>,
497        dag_state: Arc<RwLock<DagState>>,
498        mut receiver: Receiver<BlocksGuard>,
499        commands_sender: Sender<Command>,
500        round_tracker: Arc<RwLock<RoundTracker>>,
501        _peers_pool: Arc<PeersPool>,
502    ) {
503        const MAX_RETRIES: u32 = 3;
504        let mut requests = FuturesUnordered::new();
505
506        loop {
507            tokio::select! {
508                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
509                    let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
510
511                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
512                },
513                Some((response, blocks_guard, retries, _peer, fetch_after_rounds)) = requests.next() => {
514                    match response {
515                        Ok(blocks) => {
516                            if let Err(err) = Self::process_fetched_blocks(blocks,
517                                peer.clone(),
518                                blocks_guard,
519                                core_dispatcher.clone(),
520                                block_verifier.clone(),
521                                transaction_vote_tracker.clone(),
522                                commit_vote_monitor.clone(),
523                                context.clone(),
524                                commands_sender.clone(),
525                                round_tracker.clone(),
526                                "live"
527                            ).await {
528                                warn!("Error while processing fetched blocks from peer {}: {err}", peer.hostname(&context));
529                                context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
530                            }
531                        },
532                        Err(_) => {
533                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer.labelname(&context).as_str(), "live"]).inc();
534                            if retries <= MAX_RETRIES {
535                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer.clone(), blocks_guard, fetch_after_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
536                            } else {
537                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {}.", peer.hostname(&context));
538                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
539                                drop(blocks_guard);
540                            }
541                        }
542                    }
543                },
544                else => {
545                    info!("Fetching blocks from peer {} task will now abort.", peer.hostname(&context));
546                    break;
547                }
548            }
549        }
550    }
551
552    /// Processes the requested raw fetched blocks from peer. If no error is returned then
553    /// the verified blocks are immediately sent to Core for processing.
554    async fn process_fetched_blocks(
555        mut serialized_blocks: Vec<Bytes>,
556        peer: PeerId,
557        requested_blocks_guard: BlocksGuard,
558        core_dispatcher: Arc<D>,
559        block_verifier: Arc<V>,
560        transaction_vote_tracker: TransactionVoteTracker,
561        commit_vote_monitor: Arc<CommitVoteMonitor>,
562        context: Arc<Context>,
563        commands_sender: Sender<Command>,
564        round_tracker: Arc<RwLock<RoundTracker>>,
565        sync_method: &str,
566    ) -> ConsensusResult<()> {
567        if serialized_blocks.is_empty() {
568            return Ok(());
569        }
570
571        // Limit the number of the returned blocks processed.
572        serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
573
574        // Verify all the fetched blocks
575        let blocks = Handle::current()
576            .spawn_blocking({
577                let block_verifier = block_verifier.clone();
578                let context = context.clone();
579                let peer = peer.clone();
580                move || {
581                    Self::verify_blocks(
582                        serialized_blocks,
583                        block_verifier,
584                        transaction_vote_tracker,
585                        &context,
586                        peer,
587                    )
588                }
589            })
590            .await
591            .expect("Spawn blocking should not fail")?;
592
593        // Record commit votes from the verified blocks.
594        for block in &blocks {
595            commit_vote_monitor.observe_block(block);
596        }
597
598        // Update round tracker from the verified blocks. For fetched blocks,
599        // excluded_ancestors are not available so we use an empty vector.
600        {
601            let mut tracker = round_tracker.write();
602            for block in &blocks {
603                tracker.update_from_verified_block(&ExtendedBlock {
604                    block: block.clone(),
605                    excluded_ancestors: vec![],
606                });
607            }
608        }
609
610        let metrics = &context.metrics.node_metrics;
611        metrics
612            .synchronizer_fetched_blocks_by_peer
613            .with_label_values(&[peer.labelname(&context).as_str(), sync_method])
614            .inc_by(blocks.len() as u64);
615        for block in &blocks {
616            let block_hostname = &context.committee.authority(block.author()).hostname;
617            metrics
618                .synchronizer_fetched_blocks_by_authority
619                .with_label_values(&[block_hostname.as_str(), sync_method])
620                .inc();
621        }
622
623        debug!(
624            "Synced {} missing blocks from peer {:?}: {}",
625            blocks.len(),
626            peer,
627            blocks.iter().map(|b| b.reference().to_string()).join(", "),
628        );
629
630        // Now send them to core for processing. Ignore the returned missing blocks as we don't want
631        // this mechanism to keep feedback looping on fetching more blocks. The periodic synchronization
632        // will take care of that.
633        let missing_blocks = core_dispatcher
634            .add_blocks(blocks)
635            .await
636            .map_err(|_| ConsensusError::Shutdown)?;
637
638        // now release all the locked blocks as they have been fetched, verified & processed
639        drop(requested_blocks_guard);
640
641        // kick off immediately the scheduled synchronizer
642        if !missing_blocks.is_empty() {
643            // do not block here, so we avoid any possible cycles.
644            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
645            {
646                warn!("Commands channel is full")
647            }
648        }
649
650        context
651            .metrics
652            .node_metrics
653            .missing_blocks_after_fetch_total
654            .inc_by(missing_blocks.len() as u64);
655
656        Ok(())
657    }
658
659    fn get_fetch_after_rounds(
660        context: &Arc<Context>,
661        dag_state: Arc<RwLock<DagState>>,
662    ) -> Vec<Round> {
663        let (blocks, gc_round) = {
664            let dag_state = dag_state.read();
665            (
666                dag_state.get_last_cached_block_per_authority(Round::MAX),
667                dag_state.gc_round(),
668            )
669        };
670        assert_eq!(blocks.len(), context.committee.size());
671
672        blocks
673            .into_iter()
674            .map(|(block, _)| block.round().max(gc_round))
675            .collect::<Vec<_>>()
676    }
677
678    fn verify_blocks(
679        serialized_blocks: Vec<Bytes>,
680        block_verifier: Arc<V>,
681        transaction_vote_tracker: TransactionVoteTracker,
682        context: &Context,
683        peer: PeerId,
684    ) -> ConsensusResult<Vec<VerifiedBlock>> {
685        let mut verified_blocks = Vec::new();
686        let mut voted_blocks = Vec::new();
687        for serialized_block in serialized_blocks {
688            let signed_block: SignedBlock =
689                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
690
691            // TODO: cache received and verified block refs to avoid duplicated work.
692            let (verified_block, reject_txn_votes) = block_verifier
693                .verify_and_vote(signed_block, serialized_block)
694                .tap_err(|e| {
695                    let peer_label = peer.labelname(context);
696                    context
697                        .metrics
698                        .node_metrics
699                        .invalid_blocks
700                        .with_label_values(&[peer_label.as_str(), "synchronizer", e.clone().name()])
701                        .inc();
702                    info!("Invalid block received from {}: {}", peer, e);
703                })?;
704
705            // TODO: improve efficiency, maybe suspend and continue processing the block asynchronously.
706            let now = context.clock.timestamp_utc_ms();
707            let drift = verified_block.timestamp_ms().saturating_sub(now);
708            if drift > 0 {
709                let peer_hostname = &context
710                    .committee
711                    .authority(verified_block.author())
712                    .hostname;
713                context
714                    .metrics
715                    .node_metrics
716                    .block_timestamp_drift_ms
717                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
718                    .inc_by(drift);
719
720                trace!(
721                    "Synced block {} timestamp {} is in the future (now={}).",
722                    verified_block.reference(),
723                    verified_block.timestamp_ms(),
724                    now
725                );
726            }
727
728            verified_blocks.push(verified_block.clone());
729            voted_blocks.push((verified_block, reject_txn_votes));
730        }
731
732        if context.protocol_config.transaction_voting_enabled() {
733            transaction_vote_tracker.add_voted_blocks(voted_blocks);
734        }
735
736        Ok(verified_blocks)
737    }
738
739    async fn fetch_blocks_request(
740        network_client: Arc<SynchronizerClient<VC, OC>>,
741        peer: PeerId,
742        blocks_guard: BlocksGuard,
743        fetch_after_rounds: Vec<Round>,
744        fetch_missing_ancestors: bool,
745        request_timeout: Duration,
746        mut retries: u32,
747    ) -> (
748        ConsensusResult<Vec<Bytes>>,
749        BlocksGuard,
750        u32,
751        PeerId,
752        Vec<Round>,
753    ) {
754        let start = Instant::now();
755        let resp = timeout(
756            request_timeout,
757            network_client.fetch_blocks(
758                peer.clone(),
759                blocks_guard
760                    .block_refs
761                    .clone()
762                    .into_iter()
763                    .collect::<Vec<_>>(),
764                fetch_after_rounds.clone().into_iter().collect::<Vec<_>>(),
765                fetch_missing_ancestors,
766                request_timeout,
767            ),
768        )
769        .await;
770
771        fail_point_async!("consensus-delay");
772
773        let resp = match resp {
774            Ok(Err(err)) => {
775                // Add a delay before retrying - if that is needed. If request has timed out then eventually
776                // this will be a no-op.
777                sleep_until(start + request_timeout).await;
778                retries += 1;
779                Err(err)
780            } // network error
781            Err(err) => {
782                // timeout
783                sleep_until(start + request_timeout).await;
784                retries += 1;
785                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
786            }
787            Ok(result) => result,
788        };
789        (resp, blocks_guard, retries, peer, fetch_after_rounds)
790    }
791
792    fn start_fetch_own_last_block_task(&mut self) {
793        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
794        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
795
796        let context = self.context.clone();
797        let dag_state = self.dag_state.clone();
798        let network_client = self.network_client.clone();
799        let block_verifier = self.block_verifier.clone();
800        let core_dispatcher = self.core_dispatcher.clone();
801
802        self.fetch_own_last_block_task
803            .spawn(monitored_future!(async move {
804                let _scope = monitored_scope("FetchOwnLastBlockTask");
805
806                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
807                    let network_client_cloned = network_client.clone();
808                    let own_index = context.own_index;
809                    async move {
810                        sleep(fetch_own_block_delay).await;
811                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
812                        (r, authority_index)
813                    }
814                };
815
816
817                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
818                    let mut result = Vec::new();
819                    for serialized_block in blocks {
820                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
821                        let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
822                            let hostname = context.committee.authority(authority_index).hostname.clone();
823                            context
824                                .metrics
825                                .node_metrics
826                                .invalid_blocks
827                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
828                                .inc();
829                            warn!("Invalid block received from {}: {}", authority_index, err);
830                        })?;
831
832                        if verified_block.author() != context.own_index {
833                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
834                        }
835                        result.push(verified_block);
836                    }
837                    Ok(result)
838                };
839
840                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
841                let mut highest_round;
842                let mut retries = 0;
843                let mut retry_delay_step = Duration::from_millis(500);
844                'main:loop {
845                    if context.committee.size() == 1 {
846                        highest_round = dag_state.read().get_last_proposed_block().expect("Last proposed block should be returned on validators").round();
847                        info!("Only one node in the network, will not try fetching own last block from peers.");
848                        break 'main;
849                    }
850
851                    let mut total_stake = 0;
852                    highest_round = 0;
853
854                    // Ask all the other peers about our last block
855                    let mut results = FuturesUnordered::new();
856
857                    for (authority_index, _authority) in context.committee.authorities() {
858                        if authority_index != context.own_index {
859                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
860                        }
861                    }
862
863                    // Gather the results but wait to timeout as well
864                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
865                    tokio::pin!(timer);
866
867                    'inner: loop {
868                        tokio::select! {
869                            result = results.next() => {
870                                let Some((result, authority_index)) = result else {
871                                    break 'inner;
872                                };
873                                match result {
874                                    Ok(result) => {
875                                        match process_blocks(result, authority_index) {
876                                            Ok(blocks) => {
877                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
878                                                highest_round = highest_round.max(max_round);
879
880                                                total_stake += context.committee.stake(authority_index);
881                                            },
882                                            Err(err) => {
883                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
884                                            }
885                                        }
886                                    },
887                                    Err(err) => {
888                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
889                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
890                                    }
891                                }
892                            },
893                            () = &mut timer => {
894                                info!("Timeout while trying to sync our own last block from peers");
895                                break 'inner;
896                            }
897                        }
898                    }
899
900                    // Request at least f+1 stake to have replied back.
901                    if context.committee.reached_validity(total_stake) {
902                        info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
903                        break 'main;
904                    }
905
906                    retries += 1;
907                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
908                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
909
910                    sleep(retry_delay_step).await;
911
912                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
913                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
914                }
915
916                // Update the Core with the highest detected round
917                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
918
919                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
920                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
921                }
922            }));
923    }
924
925    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
926        if self.context.committee.size() == 1 {
927            trace!(
928                "Only one node in the network, will not try fetching missing blocks from peers."
929            );
930            return Ok(());
931        }
932
933        // If commit is lagging and commit sync is making progress, skip periodic sync.
934        // Commit syncer fetches certified commits with all necessary causal history.
935        // If commit sync is not making progress, periodic sync resumes as a fallback.
936        if !self.should_run_periodic_sync() {
937            return Ok(());
938        }
939
940        let context = self.context.clone();
941        let network_client = self.network_client.clone();
942        let block_verifier = self.block_verifier.clone();
943        let transaction_vote_tracker = self.transaction_vote_tracker.clone();
944        let commit_vote_monitor = self.commit_vote_monitor.clone();
945        let core_dispatcher = self.core_dispatcher.clone();
946        let blocks_to_fetch = self.inflight_blocks_map.clone();
947        let commands_sender = self.commands_sender.clone();
948        let dag_state = self.dag_state.clone();
949        let round_tracker = self.round_tracker.clone();
950        let peers_pool = self.peers_pool.clone();
951
952        let mut missing_blocks = self
953            .core_dispatcher
954            .get_missing_blocks()
955            .await
956            .map_err(|_err| ConsensusError::Shutdown)?;
957        if self.commit_sync_failover {
958            // Keep missing blocks to those that must be included in fetch request.
959            // Filtered out missing blocks that will eventually be fetched with fetch_after_rounds.
960            let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
961            missing_blocks.retain(|block| block.round <= fetch_after_rounds[block.author.value()]);
962        } else if missing_blocks.is_empty() {
963            return Ok(());
964        }
965
966        self.fetch_blocks_scheduler_task
967            .spawn(monitored_future!(async move {
968                let _scope = monitored_scope("FetchMissingBlocksScheduler");
969                context
970                    .metrics
971                    .node_metrics
972                    .fetch_blocks_scheduler_inflight
973                    .inc();
974                let total_requested = missing_blocks.len();
975
976                let results = if missing_blocks.is_empty() {
977                    let _scope = monitored_scope("BlockSync::Periodic::HighestAcceptedRounds");
978                    // Fetch blocks from a random peer using highest accepted rounds (commit sync failover)
979                    Self::fetch_blocks_with_fetch_after_rounds(
980                        context.clone(),
981                        blocks_to_fetch.clone(),
982                        network_client,
983                        dag_state,
984                        peers_pool,
985                    )
986                    .await
987                } else {
988                    let _scope = monitored_scope("BlockSync::Periodic::MissingBlocks");
989                    // Fetch blocks from 1 to MAX_PERIODIC_SYNC_PEERS peers
990                    Self::fetch_blocks_from_peers(
991                        context.clone(),
992                        blocks_to_fetch.clone(),
993                        network_client,
994                        missing_blocks,
995                        dag_state,
996                        peers_pool,
997                    )
998                    .await
999                };
1000                context
1001                    .metrics
1002                    .node_metrics
1003                    .fetch_blocks_scheduler_inflight
1004                    .dec();
1005                if results.is_empty() {
1006                    return;
1007                }
1008
1009                fail_point_async!("consensus-delay");
1010
1011                // Now process the returned results
1012                let mut total_fetched = 0;
1013                for (blocks_guard, fetched_blocks, peer) in results {
1014                    total_fetched += fetched_blocks.len();
1015
1016                    if let Err(err) = Self::process_fetched_blocks(
1017                        fetched_blocks,
1018                        peer.clone(),
1019                        blocks_guard,
1020                        core_dispatcher.clone(),
1021                        block_verifier.clone(),
1022                        transaction_vote_tracker.clone(),
1023                        commit_vote_monitor.clone(),
1024                        context.clone(),
1025                        commands_sender.clone(),
1026                        round_tracker.clone(),
1027                        "periodic",
1028                    )
1029                    .await
1030                    {
1031                        warn!(
1032                            "Error occurred while processing fetched blocks from peer {:?}: {err}",
1033                            peer
1034                        );
1035                        let peer_name = peer.labelname(&context);
1036                        context
1037                            .metrics
1038                            .node_metrics
1039                            .synchronizer_process_fetched_failures
1040                            .with_label_values(&[peer_name.as_str(), "periodic"])
1041                            .inc();
1042                    }
1043                }
1044
1045                debug!(
1046                    "Total blocks requested to fetch: {}, total fetched: {}",
1047                    total_requested, total_fetched
1048                );
1049            }));
1050
1051        Ok(())
1052    }
1053
1054    fn should_run_periodic_sync(&mut self) -> bool {
1055        let current_commit_index = self.dag_state.read().last_commit_index();
1056        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1057        let commit_threshold = current_commit_index
1058            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1059        let now = Instant::now();
1060        let metrics = &self.context.metrics.node_metrics;
1061
1062        // Commit is not lagging.
1063        if quorum_commit_index <= commit_threshold {
1064            metrics
1065                .synchronizer_periodic_sync_decision
1066                .with_label_values(&["true", "default"])
1067                .inc();
1068            // Reset last commit state.
1069            self.last_changed_commit_index = current_commit_index;
1070            self.last_commit_change_time = now;
1071            self.commit_sync_failover = false;
1072            // Run periodic sync.
1073            return true;
1074        }
1075        // Commit is lagging.
1076
1077        // When in commit sync failover, check if enough progress has been made.
1078        if self.commit_sync_failover {
1079            if current_commit_index
1080                < self.last_changed_commit_index + self.context.parameters.commit_sync_batch_size
1081            {
1082                metrics
1083                    .synchronizer_periodic_sync_decision
1084                    .with_label_values(&["true", "commit_catchup::run"])
1085                    .inc();
1086                // Not enough progress has been made yet. Keep running periodic sync.
1087                // Do not update last commit state yet.
1088                // Run periodic sync.
1089                return true;
1090            } else {
1091                metrics
1092                    .synchronizer_periodic_sync_decision
1093                    .with_label_values(&["false", "commit_catchup::end"])
1094                    .inc();
1095                // Enough progress has been made. Disable commit sync failover and reset last commit state.
1096                self.last_changed_commit_index = current_commit_index;
1097                self.last_commit_change_time = now;
1098                self.commit_sync_failover = false;
1099                // Skip periodic sync because of commit lag.
1100                return false;
1101            }
1102        }
1103
1104        // The node is commit lagging and not in commit sync failover yet.
1105        if current_commit_index == self.last_changed_commit_index {
1106            // Enter commit sync failover if not enough progress has been made.
1107            if now.duration_since(self.last_commit_change_time) >= COMMIT_PROGRESS_TIMEOUT {
1108                metrics
1109                    .synchronizer_periodic_sync_decision
1110                    .with_label_values(&["true", "commit_catchup::start"])
1111                    .inc();
1112                self.commit_sync_failover = true;
1113                // Run periodic sync.
1114                return true;
1115            }
1116        } else {
1117            // IMPORTANT: Only update last commit state when commit index is changing.
1118            self.last_changed_commit_index = current_commit_index;
1119            self.last_commit_change_time = now;
1120        }
1121
1122        metrics
1123            .synchronizer_periodic_sync_decision
1124            .with_label_values(&["false", "commit_lag"])
1125            .inc();
1126        false
1127    }
1128
1129    /// Fetches blocks from a random peer using only fetch_after_rounds (no specific missing blocks).
1130    /// Used during commit sync failover to make progress when commit sync is stalled.
1131    async fn fetch_blocks_with_fetch_after_rounds(
1132        context: Arc<Context>,
1133        inflight_blocks: Arc<InflightBlocksMap>,
1134        network_client: Arc<SynchronizerClient<VC, OC>>,
1135        dag_state: Arc<RwLock<DagState>>,
1136        peers_pool: Arc<PeersPool>,
1137    ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1138        let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1139
1140        // Pick a random peer (excluding self).
1141        // Get available peers from the PeersPool
1142        let mut peers = peers_pool.get_known_peers();
1143
1144        // TODO: in the future it would be possible, temporarily, for an Observer node to not have peers to fetch from.
1145        // We should change this assertion to allow for this case.
1146        assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1147
1148        if cfg!(not(test)) {
1149            peers.shuffle(&mut ThreadRng::default());
1150        }
1151
1152        let peer = peers.first().unwrap().clone();
1153
1154        let response = timeout(
1155            FETCH_REQUEST_TIMEOUT,
1156            network_client.fetch_blocks(
1157                peer.clone(),
1158                vec![],
1159                fetch_after_rounds,
1160                false,
1161                FETCH_REQUEST_TIMEOUT,
1162            ),
1163        )
1164        .await;
1165
1166        let serialized_blocks = match response {
1167            Ok(Ok(blocks)) => blocks,
1168            Ok(Err(err)) => {
1169                debug!("Failed to fetch blocks with fetch_after_rounds from peer {peer}: {err}");
1170                return vec![];
1171            }
1172            Err(_) => {
1173                debug!("Timed out fetching blocks with fetch_after_rounds from peer {peer}");
1174                return vec![];
1175            }
1176        };
1177
1178        let blocks_guard = BlocksGuard {
1179            map: inflight_blocks,
1180            block_refs: BTreeSet::new(),
1181            peer: peer.clone(),
1182        };
1183
1184        vec![(blocks_guard, serialized_blocks, peer)]
1185    }
1186
1187    /// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
1188    /// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
1189    /// authorities with missing blocks.
1190    /// Each element of the vector is a tuple which contains the requested missing block refs, the returned blocks and
1191    /// the peer.
1192    async fn fetch_blocks_from_peers(
1193        context: Arc<Context>,
1194        inflight_blocks: Arc<InflightBlocksMap>,
1195        network_client: Arc<SynchronizerClient<VC, OC>>,
1196        missing_blocks: BTreeSet<BlockRef>,
1197        dag_state: Arc<RwLock<DagState>>,
1198        peers_pool: Arc<PeersPool>,
1199    ) -> Vec<(BlocksGuard, Vec<Bytes>, PeerId)> {
1200        // Preliminary truncation of missing blocks to fetch. Since each peer can have different
1201        // number of missing blocks and the fetching is batched by peer, so keep more than max_blocks_per_fetch
1202        // per peer on average.
1203        let missing_blocks = missing_blocks
1204            .into_iter()
1205            .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_fetch)
1206            .collect::<Vec<_>>();
1207
1208        // Maps authorities to the missing blocks they have.
1209        let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1210        for block_ref in &missing_blocks {
1211            authorities
1212                .entry(block_ref.author)
1213                .or_default()
1214                .push(*block_ref);
1215        }
1216
1217        // Get known peers from the PeersPool
1218        let mut peers = peers_pool.get_known_peers();
1219
1220        // Distribute the same number of authorities into each peer to sync.
1221        // Use the number of known peers from the pool, capped at MAX_PERIODIC_SYNC_PEERS
1222        // TODO: in the future it would be possible, temporarily, for an Observer node to not have peers to fetch from.
1223        // We should change this assertion to allow for this case.
1224        assert!(!peers.is_empty(), "No known peers to fetch blocks from");
1225
1226        let num_authorities_per_peer = authorities
1227            .len()
1228            .div_ceil(peers.len().min(MAX_PERIODIC_SYNC_PEERS));
1229
1230        // Update metrics related to missing blocks.
1231        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1232        for (authority, blocks) in &authorities {
1233            missing_blocks_per_authority[*authority] += blocks.len();
1234        }
1235        for (missing, (_, authority)) in missing_blocks_per_authority
1236            .into_iter()
1237            .zip_debug_eq(context.committee.authorities())
1238        {
1239            context
1240                .metrics
1241                .node_metrics
1242                .synchronizer_missing_blocks_by_authority
1243                .with_label_values(&[&authority.hostname])
1244                .inc_by(missing as u64);
1245            context
1246                .metrics
1247                .node_metrics
1248                .synchronizer_current_missing_blocks_by_authority
1249                .with_label_values(&[&authority.hostname])
1250                .set(missing as i64);
1251        }
1252
1253        // TODO: probably inject the RNG to allow unit testing - this is a work around for now.
1254        if cfg!(not(test)) {
1255            // Shuffle the peers
1256            peers.shuffle(&mut ThreadRng::default());
1257        }
1258
1259        let mut peers = peers.into_iter();
1260        let mut request_futures = FuturesUnordered::new();
1261
1262        // Shuffle the authorities for each request.
1263        let mut authorities = authorities.into_values().collect::<Vec<_>>();
1264        if cfg!(not(test)) {
1265            // Shuffle the authorities
1266            authorities.shuffle(&mut ThreadRng::default());
1267        }
1268
1269        let fetch_after_rounds = Self::get_fetch_after_rounds(&context, dag_state.clone());
1270
1271        // Send the fetch requests
1272        for batch in authorities.chunks(num_authorities_per_peer) {
1273            let Some(peer) = peers.next() else {
1274                debug_fatal!("No more peers left to fetch blocks!");
1275                break;
1276            };
1277            let peer_name = peer.hostname(&context);
1278            // Fetch from the lowest round missing blocks to ensure progress.
1279            // This may reduce efficiency and increase the chance of duplicated data transfer in edge cases.
1280            let block_refs = batch
1281                .iter()
1282                .flatten()
1283                .cloned()
1284                .collect::<BTreeSet<_>>()
1285                .into_iter()
1286                .take(context.parameters.max_blocks_per_fetch)
1287                .collect::<BTreeSet<_>>();
1288
1289            // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
1290            if let Some(blocks_guard) =
1291                inflight_blocks.lock_blocks(block_refs.clone(), peer.clone())
1292            {
1293                info!(
1294                    "Periodic sync of {} missing blocks from peer {} {:?}: {}",
1295                    peer_name.as_str(),
1296                    block_refs.len(),
1297                    peer,
1298                    block_refs
1299                        .iter()
1300                        .map(|b| b.to_string())
1301                        .collect::<Vec<_>>()
1302                        .join(", ")
1303                );
1304                request_futures.push(Self::fetch_blocks_request(
1305                    network_client.clone(),
1306                    peer,
1307                    blocks_guard,
1308                    fetch_after_rounds.clone(),
1309                    false,
1310                    FETCH_REQUEST_TIMEOUT,
1311                    1,
1312                ));
1313            }
1314        }
1315
1316        let mut results = Vec::new();
1317        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1318
1319        tokio::pin!(fetcher_timeout);
1320
1321        loop {
1322            tokio::select! {
1323                Some((response, blocks_guard, _retries, peer, fetch_after_rounds)) = request_futures.next() => {
1324                    match response {
1325                        Ok(fetched_blocks) => {
1326                            results.push((blocks_guard, fetched_blocks, peer));
1327
1328                            // no more pending requests are left, just break the loop
1329                            if request_futures.is_empty() {
1330                                break;
1331                            }
1332                        },
1333                        Err(_) => {
1334                            let peer_name = peer.labelname(&context);
1335                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_name.as_str(), "periodic"]).inc();
1336                            // try again if there is any peer left
1337                            if let Some(next_peer) = peers.next() {
1338                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1339                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer.clone()) {
1340                                    info!(
1341                                        "Retrying syncing {} missing blocks from peer {:?}: {}",
1342                                        blocks_guard.block_refs.len(),
1343                                        next_peer,
1344                                        blocks_guard.block_refs
1345                                            .iter()
1346                                            .map(|b| b.to_string())
1347                                            .collect::<Vec<_>>()
1348                                            .join(", ")
1349                                    );
1350                                    request_futures.push(Self::fetch_blocks_request(
1351                                        network_client.clone(),
1352                                        next_peer,
1353                                        blocks_guard,
1354                                        fetch_after_rounds,
1355                                        false,
1356                                        FETCH_REQUEST_TIMEOUT,
1357                                        1,
1358                                    ));
1359                                } else {
1360                                    debug!("Couldn't acquire locks to fetch blocks from peer {:?}.", next_peer)
1361                                }
1362                            } else {
1363                                debug!("No more peers left to fetch blocks");
1364                            }
1365                        }
1366                    }
1367                },
1368                _ = &mut fetcher_timeout => {
1369                    debug!("Timed out while fetching missing blocks");
1370                    break;
1371                }
1372            }
1373        }
1374
1375        results
1376    }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381    use std::{
1382        collections::{BTreeMap, BTreeSet},
1383        sync::Arc,
1384        time::Duration,
1385    };
1386
1387    use async_trait::async_trait;
1388    use bytes::Bytes;
1389    use consensus_config::{AuthorityIndex, Parameters};
1390    use consensus_types::block::{BlockDigest, BlockRef, Round};
1391    use mysten_metrics::monitored_mpsc;
1392    use parking_lot::RwLock;
1393    use tokio::{sync::Mutex, time::sleep};
1394
1395    use crate::commit::{CommitVote, TrustedCommit};
1396    use crate::{
1397        CommitDigest, CommitIndex,
1398        block::{TestBlock, VerifiedBlock},
1399        block_verifier::NoopBlockVerifier,
1400        commit_vote_monitor::CommitVoteMonitor,
1401        context::Context,
1402        core_thread::CoreThreadDispatcher,
1403        dag_state::DagState,
1404        error::{ConsensusError, ConsensusResult},
1405        network::{
1406            BlockStream, ObserverNetworkClient, PeerId, SynchronizerClient, ValidatorNetworkClient,
1407        },
1408        storage::mem_store::MemStore,
1409        synchronizer::{
1410            COMMIT_PROGRESS_TIMEOUT, FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT,
1411            InflightBlocksMap, Synchronizer,
1412        },
1413    };
1414    use crate::{
1415        authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1416        peers_pool::PeersPool, round_tracker::RoundTracker,
1417        transaction_vote_tracker::TransactionVoteTracker,
1418    };
1419
1420    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1421    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1422    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1423    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1424
1425    #[derive(Default)]
1426    struct MockNetworkClient {
1427        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1428        fetch_latest_blocks_requests:
1429            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1430    }
1431
1432    impl MockNetworkClient {
1433        async fn stub_fetch_blocks(
1434            &self,
1435            blocks: Vec<VerifiedBlock>,
1436            peer: AuthorityIndex,
1437            latency: Option<Duration>,
1438        ) {
1439            let mut lock = self.fetch_blocks_requests.lock().await;
1440            let block_refs = blocks
1441                .iter()
1442                .map(|block| block.reference())
1443                .collect::<Vec<_>>();
1444            lock.insert((block_refs, peer), (blocks, latency));
1445        }
1446
1447        async fn stub_fetch_blocks_for_key(
1448            &self,
1449            key_refs: Vec<BlockRef>,
1450            response_blocks: Vec<VerifiedBlock>,
1451            peer: AuthorityIndex,
1452            latency: Option<Duration>,
1453        ) {
1454            let mut lock = self.fetch_blocks_requests.lock().await;
1455            lock.insert((key_refs, peer), (response_blocks, latency));
1456        }
1457
1458        async fn stub_fetch_latest_blocks(
1459            &self,
1460            blocks: Vec<VerifiedBlock>,
1461            peer: AuthorityIndex,
1462            authorities: Vec<AuthorityIndex>,
1463            latency: Option<Duration>,
1464        ) {
1465            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1466            lock.entry((peer, authorities))
1467                .or_default()
1468                .push((blocks, latency));
1469        }
1470
1471        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1472            let lock = self.fetch_latest_blocks_requests.lock().await;
1473            lock.len()
1474        }
1475    }
1476
1477    #[async_trait]
1478    impl ValidatorNetworkClient for MockNetworkClient {
1479        async fn subscribe_blocks(
1480            &self,
1481            _peer: AuthorityIndex,
1482            _last_received: Round,
1483            _timeout: Duration,
1484        ) -> ConsensusResult<BlockStream> {
1485            unimplemented!("subscribe_blocks not implemented in mock")
1486        }
1487
1488        async fn fetch_blocks(
1489            &self,
1490            peer: AuthorityIndex,
1491            block_refs: Vec<BlockRef>,
1492            _fetch_after_rounds: Vec<Round>,
1493            _fetch_missing_ancestors: bool,
1494            _timeout: Duration,
1495        ) -> ConsensusResult<Vec<Bytes>> {
1496            let mut lock = self.fetch_blocks_requests.lock().await;
1497            let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1498                panic!(
1499                    "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1500                    block_refs, peer, lock
1501                );
1502            });
1503
1504            let serialised = response
1505                .0
1506                .into_iter()
1507                .map(|block| block.serialized().clone())
1508                .collect::<Vec<_>>();
1509
1510            drop(lock);
1511
1512            if let Some(latency) = response.1 {
1513                sleep(latency).await;
1514            }
1515
1516            Ok(serialised)
1517        }
1518
1519        async fn fetch_commits(
1520            &self,
1521            _peer: AuthorityIndex,
1522            _commit_range: crate::commit::CommitRange,
1523            _timeout: Duration,
1524        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1525            unimplemented!("fetch_commits not implemented in mock")
1526        }
1527
1528        async fn fetch_latest_blocks(
1529            &self,
1530            peer: AuthorityIndex,
1531            authorities: Vec<AuthorityIndex>,
1532            _timeout: Duration,
1533        ) -> ConsensusResult<Vec<Bytes>> {
1534            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1535            let mut responses = lock
1536                .remove(&(peer, authorities.clone()))
1537                .expect("Unexpected fetch blocks request made");
1538
1539            let response = responses.remove(0);
1540            let serialised = response
1541                .0
1542                .into_iter()
1543                .map(|block| block.serialized().clone())
1544                .collect::<Vec<_>>();
1545
1546            if !responses.is_empty() {
1547                lock.insert((peer, authorities), responses);
1548            }
1549
1550            drop(lock);
1551
1552            if let Some(latency) = response.1 {
1553                sleep(latency).await;
1554            }
1555
1556            Ok(serialised)
1557        }
1558
1559        async fn get_latest_rounds(
1560            &self,
1561            _peer: AuthorityIndex,
1562            _timeout: Duration,
1563        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1564            unimplemented!("get_latest_rounds not implemented in mock")
1565        }
1566
1567        #[cfg(test)]
1568        async fn send_block(
1569            &self,
1570            _peer: AuthorityIndex,
1571            _block: &VerifiedBlock,
1572            _timeout: Duration,
1573        ) -> ConsensusResult<()> {
1574            unimplemented!("send_block not implemented in mock")
1575        }
1576    }
1577
1578    #[async_trait]
1579    impl ObserverNetworkClient for MockNetworkClient {
1580        async fn stream_blocks(
1581            &self,
1582            _peer: crate::network::PeerId,
1583            _highest_round_per_authority: Vec<u64>,
1584            _timeout: Duration,
1585        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1586            unimplemented!("stream_blocks not implemented in mock")
1587        }
1588
1589        async fn fetch_blocks(
1590            &self,
1591            _peer: crate::network::PeerId,
1592            _block_refs: Vec<BlockRef>,
1593            _fetch_after_rounds: Vec<Round>,
1594            _fetch_missing_ancestors: bool,
1595            _timeout: Duration,
1596        ) -> ConsensusResult<Vec<Bytes>> {
1597            unimplemented!("Observer fetch_blocks not implemented in mock")
1598        }
1599
1600        async fn fetch_commits(
1601            &self,
1602            _peer: crate::network::PeerId,
1603            _commit_range: crate::commit::CommitRange,
1604            _timeout: Duration,
1605        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1606            unimplemented!("Observer fetch_commits not implemented in mock")
1607        }
1608    }
1609
1610    #[test]
1611    fn test_inflight_blocks_map() {
1612        // GIVEN
1613        let map = InflightBlocksMap::new();
1614        let some_block_refs = [
1615            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1616            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1617            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1618            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1619        ];
1620        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1621
1622        // Lock & unlock blocks
1623        {
1624            let mut all_guards = Vec::new();
1625
1626            // Try to acquire the block locks for authorities 1 & 2
1627            for i in 1..=2 {
1628                let authority = AuthorityIndex::new_for_test(i);
1629                let peer = PeerId::Validator(authority);
1630
1631                let guard = map.lock_blocks(missing_block_refs.clone(), peer.clone());
1632                let guard = guard.expect("Guard should be created");
1633                assert_eq!(guard.block_refs.len(), 4);
1634
1635                all_guards.push(guard);
1636
1637                // trying to acquire any of them again will not succeed
1638                let guard = map.lock_blocks(missing_block_refs.clone(), peer);
1639                assert!(guard.is_none());
1640            }
1641
1642            // Trying to acquire for authority 3 it will fail - as we have maxed out the number of allowed peers
1643            let authority_3 = AuthorityIndex::new_for_test(3);
1644            let peer_3 = PeerId::Validator(authority_3);
1645
1646            let guard = map.lock_blocks(missing_block_refs.clone(), peer_3.clone());
1647            assert!(guard.is_none());
1648
1649            // Explicitly drop the guard of authority 1 and try for authority 3 again - it will now succeed
1650            drop(all_guards.remove(0));
1651
1652            let guard = map.lock_blocks(missing_block_refs.clone(), peer_3);
1653            let guard = guard.expect("Guard should be successfully acquired");
1654
1655            assert_eq!(guard.block_refs, missing_block_refs);
1656
1657            // Dropping all guards should unlock on the block refs
1658            drop(guard);
1659            drop(all_guards);
1660
1661            assert_eq!(map.num_of_locked_blocks(), 0);
1662        }
1663
1664        // Swap locks
1665        {
1666            // acquire a lock for authority 1
1667            let authority_1 = AuthorityIndex::new_for_test(1);
1668            let peer_1 = PeerId::Validator(authority_1);
1669            let guard = map.lock_blocks(missing_block_refs.clone(), peer_1).unwrap();
1670
1671            // Now swap the locks for authority 2
1672            let authority_2 = AuthorityIndex::new_for_test(2);
1673            let peer_2 = PeerId::Validator(authority_2);
1674            let guard = map.swap_locks(guard, peer_2);
1675
1676            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1677        }
1678    }
1679
1680    #[tokio::test]
1681    async fn successful_fetch_blocks_from_peer() {
1682        // GIVEN
1683        let (context, _) = Context::new_for_test(4);
1684        let context = Arc::new(context);
1685        let block_verifier = Arc::new(NoopBlockVerifier {});
1686        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1687        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1688        let mock_client = Arc::new(MockNetworkClient::default());
1689        let store = Arc::new(MemStore::new());
1690        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1691        let transaction_vote_tracker =
1692            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1693        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1694
1695        let network_client = Arc::new(SynchronizerClient::new(
1696            context.clone(),
1697            Some(mock_client.clone()),
1698            Some(mock_client.clone()),
1699        ));
1700        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1701        let handle = Synchronizer::start(
1702            network_client,
1703            context.clone(),
1704            core_dispatcher.clone(),
1705            commit_vote_monitor,
1706            block_verifier,
1707            transaction_vote_tracker,
1708            round_tracker,
1709            dag_state,
1710            peers_pool.clone(),
1711            false,
1712        );
1713
1714        // Create some test blocks
1715        let expected_blocks = (0..10)
1716            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1717            .collect::<Vec<_>>();
1718        let missing_blocks = expected_blocks
1719            .iter()
1720            .map(|block| block.reference())
1721            .collect::<BTreeSet<_>>();
1722
1723        // AND stub the fetch_blocks request from peer 1
1724        let peer = AuthorityIndex::new_for_test(1);
1725        mock_client
1726            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1727            .await;
1728
1729        // WHEN request missing blocks from peer 1
1730        assert!(
1731            handle
1732                .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1733                .await
1734                .is_ok()
1735        );
1736
1737        // Wait a little bit until those have been added in core
1738        sleep(Duration::from_millis(1_000)).await;
1739
1740        // THEN ensure those ended up in Core
1741        let added_blocks = core_dispatcher.get_add_blocks().await;
1742        assert_eq!(added_blocks, expected_blocks);
1743    }
1744
1745    #[tokio::test]
1746    async fn saturate_fetch_blocks_from_peer() {
1747        // GIVEN
1748        let (context, _) = Context::new_for_test(4);
1749        let context = Arc::new(context);
1750        let block_verifier = Arc::new(NoopBlockVerifier {});
1751        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1752        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1753        let mock_client = Arc::new(MockNetworkClient::default());
1754        let store = Arc::new(MemStore::new());
1755        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1756        let transaction_vote_tracker =
1757            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1758        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1759
1760        let network_client = Arc::new(SynchronizerClient::new(
1761            context.clone(),
1762            Some(mock_client.clone()),
1763            Some(mock_client.clone()),
1764        ));
1765        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1766        let handle = Synchronizer::start(
1767            network_client,
1768            context.clone(),
1769            core_dispatcher.clone(),
1770            commit_vote_monitor,
1771            block_verifier,
1772            transaction_vote_tracker,
1773            round_tracker,
1774            dag_state,
1775            peers_pool.clone(),
1776            false,
1777        );
1778
1779        // Create some test blocks
1780        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1781            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1782            .collect::<Vec<_>>();
1783
1784        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1785        let peer = AuthorityIndex::new_for_test(1);
1786        let mut iter = expected_blocks.iter().peekable();
1787        while let Some(block) = iter.next() {
1788            // stub the fetch_blocks request from peer 1 and give some high response latency so requests
1789            // can start blocking the peer task.
1790            mock_client
1791                .stub_fetch_blocks(
1792                    vec![block.clone()],
1793                    peer,
1794                    Some(Duration::from_millis(5_000)),
1795                )
1796                .await;
1797
1798            let mut missing_blocks = BTreeSet::new();
1799            missing_blocks.insert(block.reference());
1800
1801            // WHEN requesting to fetch the blocks, it should not succeed for the last request and get
1802            // an error with "saturated" synchronizer
1803            if iter.peek().is_none() {
1804                match handle
1805                    .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1806                    .await
1807                {
1808                    Err(ConsensusError::SynchronizerSaturated(peer_str)) => {
1809                        assert_eq!(peer_str, format!("{:?}", PeerId::Validator(peer)));
1810                    }
1811                    _ => panic!("A saturated synchronizer error was expected"),
1812                }
1813            } else {
1814                assert!(
1815                    handle
1816                        .fetch_blocks(missing_blocks, PeerId::Validator(peer))
1817                        .await
1818                        .is_ok()
1819                );
1820            }
1821        }
1822    }
1823
1824    #[tokio::test(flavor = "current_thread", start_paused = true)]
1825    async fn synchronizer_periodic_task_fetch_blocks() {
1826        // GIVEN
1827        let (context, _) = Context::new_for_test(4);
1828        let context = Arc::new(context);
1829        let block_verifier = Arc::new(NoopBlockVerifier {});
1830        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1831        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1832        let mock_client = Arc::new(MockNetworkClient::default());
1833        let store = Arc::new(MemStore::new());
1834        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1835        let transaction_vote_tracker =
1836            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1837        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1838
1839        // Create some test blocks
1840        let expected_blocks = (0..10)
1841            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1842            .collect::<Vec<_>>();
1843        let missing_blocks = expected_blocks
1844            .iter()
1845            .map(|block| block.reference())
1846            .collect::<BTreeSet<_>>();
1847
1848        // AND stub the missing blocks
1849        core_dispatcher
1850            .stub_missing_blocks(missing_blocks.clone())
1851            .await;
1852
1853        // AND stub the requests for authority 1 & 2
1854        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1855        // we are skipped anyways.
1856        mock_client
1857            .stub_fetch_blocks(
1858                expected_blocks.clone(),
1859                AuthorityIndex::new_for_test(1),
1860                Some(FETCH_REQUEST_TIMEOUT),
1861            )
1862            .await;
1863        mock_client
1864            .stub_fetch_blocks(
1865                expected_blocks.clone(),
1866                AuthorityIndex::new_for_test(2),
1867                None,
1868            )
1869            .await;
1870
1871        // WHEN start the synchronizer and wait for a couple of seconds
1872        let network_client = Arc::new(SynchronizerClient::new(
1873            context.clone(),
1874            Some(mock_client.clone()),
1875            Some(mock_client.clone()),
1876        ));
1877        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1878        let _handle = Synchronizer::start(
1879            network_client,
1880            context.clone(),
1881            core_dispatcher.clone(),
1882            commit_vote_monitor,
1883            block_verifier,
1884            transaction_vote_tracker,
1885            round_tracker,
1886            dag_state,
1887            peers_pool.clone(),
1888            false,
1889        );
1890
1891        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1892
1893        // THEN the missing blocks should now be fetched and added to core
1894        let added_blocks = core_dispatcher.get_add_blocks().await;
1895        assert_eq!(added_blocks, expected_blocks);
1896
1897        // AND missing blocks should have been consumed by the stub
1898        assert!(
1899            core_dispatcher
1900                .get_missing_blocks()
1901                .await
1902                .unwrap()
1903                .is_empty()
1904        );
1905    }
1906
1907    #[tokio::test(flavor = "current_thread", start_paused = true)]
1908    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1909        // GIVEN
1910        let (context, _) = Context::new_for_test(4);
1911        let context = Arc::new(context);
1912        let block_verifier = Arc::new(NoopBlockVerifier {});
1913        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1914        let mock_client = Arc::new(MockNetworkClient::default());
1915        let store = Arc::new(MemStore::new());
1916        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1917        let transaction_vote_tracker =
1918            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1919        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1920        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1921
1922        // AND stub some missing blocks. The highest accepted round is 0. Create blocks that are above the sync threshold.
1923        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1924        let stub_blocks = (sync_missing_block_round_threshold * 2
1925            ..sync_missing_block_round_threshold * 3)
1926            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1927            .collect::<Vec<_>>();
1928        let missing_blocks = stub_blocks
1929            .iter()
1930            .map(|block| block.reference())
1931            .collect::<BTreeSet<_>>();
1932        core_dispatcher
1933            .stub_missing_blocks(missing_blocks.clone())
1934            .await;
1935
1936        // AND stub the requests for authority 1 & 2
1937        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1938        // we are skipped anyways. Stub all blocks since the full set is sent in one request.
1939        // Only the first max_blocks_per_sync blocks will be processed by process_fetched_blocks.
1940        mock_client
1941            .stub_fetch_blocks(
1942                stub_blocks.clone(),
1943                AuthorityIndex::new_for_test(1),
1944                Some(FETCH_REQUEST_TIMEOUT),
1945            )
1946            .await;
1947        mock_client
1948            .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
1949            .await;
1950        let mut expected_blocks = stub_blocks
1951            .iter()
1952            .take(context.parameters.max_blocks_per_sync)
1953            .cloned()
1954            .collect::<Vec<_>>();
1955
1956        // Now create some blocks to simulate a commit lag
1957        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1958        let commit_index: CommitIndex = round - 1;
1959        let blocks = (0..4)
1960            .map(|authority| {
1961                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1962                let block = TestBlock::new(round, authority)
1963                    .set_commit_votes(commit_votes)
1964                    .build();
1965
1966                VerifiedBlock::new_for_test(block)
1967            })
1968            .collect::<Vec<_>>();
1969
1970        // Pass them through the commit vote monitor - so now there will be a big commit lag to prevent
1971        // the scheduled synchronizer from running
1972        for block in blocks {
1973            commit_vote_monitor.observe_block(&block);
1974        }
1975
1976        // WHEN start the synchronizer and wait for a couple of seconds where normally the synchronizer should have kicked in.
1977        let network_client = Arc::new(SynchronizerClient::new(
1978            context.clone(),
1979            Some(mock_client.clone()),
1980            Some(mock_client.clone()),
1981        ));
1982        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1983        let _handle = Synchronizer::start(
1984            network_client,
1985            context.clone(),
1986            core_dispatcher.clone(),
1987            commit_vote_monitor.clone(),
1988            block_verifier,
1989            transaction_vote_tracker,
1990            round_tracker,
1991            dag_state.clone(),
1992            peers_pool.clone(),
1993            false,
1994        );
1995
1996        // Wait long enough for periodic sync to have run, but stay under COMMIT_PROGRESS_TIMEOUT
1997        // to avoid triggering commit sync failover.
1998        sleep(COMMIT_PROGRESS_TIMEOUT / 2).await;
1999
2000        // Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
2001        // sent to core for processing.
2002        let added_blocks = core_dispatcher.get_add_blocks().await;
2003        assert_eq!(added_blocks, vec![]);
2004
2005        // AND advance now the local commit index by adding a new commit that matches the commit index
2006        // of quorum
2007        {
2008            let mut d = dag_state.write();
2009            for index in 1..=commit_index {
2010                let commit =
2011                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2012
2013                d.add_commit(commit);
2014            }
2015
2016            assert_eq!(
2017                d.last_commit_index(),
2018                commit_vote_monitor.quorum_commit_index()
2019            );
2020        }
2021
2022        // Now stub again the missing blocks to fetch the exact same ones.
2023        core_dispatcher
2024            .stub_missing_blocks(missing_blocks.clone())
2025            .await;
2026
2027        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2028
2029        // THEN the missing blocks should now be fetched and added to core
2030        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2031
2032        added_blocks.sort_by_key(|block| block.reference());
2033        expected_blocks.sort_by_key(|block| block.reference());
2034
2035        assert_eq!(added_blocks, expected_blocks);
2036    }
2037
2038    #[tokio::test(flavor = "current_thread", start_paused = true)]
2039    async fn synchronizer_periodic_sync_resumes_when_commit_sync_stalled() {
2040        // GIVEN
2041        let (context, _) = Context::new_for_test(4);
2042        let context = Arc::new(context);
2043        let block_verifier = Arc::new(NoopBlockVerifier {});
2044        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2045        let mock_client = Arc::new(MockNetworkClient::default());
2046        let store = Arc::new(MemStore::new());
2047        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2048        let transaction_vote_tracker =
2049            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2050        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2051        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2052
2053        // AND create missing blocks to fetch
2054        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
2055        let stub_blocks = (sync_missing_block_round_threshold * 2
2056            ..sync_missing_block_round_threshold * 3)
2057            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2058            .collect::<Vec<_>>();
2059        let missing_blocks = stub_blocks
2060            .iter()
2061            .map(|block| block.reference())
2062            .collect::<BTreeSet<_>>();
2063        core_dispatcher
2064            .stub_missing_blocks(missing_blocks.clone())
2065            .await;
2066
2067        // AND stub the fetch responses for authority 1 & 2.
2068        // Stub all blocks since the full set is sent in one request.
2069        mock_client
2070            .stub_fetch_blocks(
2071                stub_blocks.clone(),
2072                AuthorityIndex::new_for_test(1),
2073                Some(FETCH_REQUEST_TIMEOUT),
2074            )
2075            .await;
2076        mock_client
2077            .stub_fetch_blocks(stub_blocks.clone(), AuthorityIndex::new_for_test(2), None)
2078            .await;
2079        let mut expected_blocks = stub_blocks
2080            .iter()
2081            .take(context.parameters.max_blocks_per_sync)
2082            .cloned()
2083            .collect::<Vec<_>>();
2084
2085        // AND create commit lag by observing high commit votes
2086        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
2087        let commit_index: CommitIndex = round - 1;
2088        let blocks = (0..4)
2089            .map(|authority| {
2090                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
2091                let block = TestBlock::new(round, authority)
2092                    .set_commit_votes(commit_votes)
2093                    .build();
2094                VerifiedBlock::new_for_test(block)
2095            })
2096            .collect::<Vec<_>>();
2097        for block in blocks {
2098            commit_vote_monitor.observe_block(&block);
2099        }
2100
2101        // WHEN start the synchronizer
2102        let network_client = Arc::new(SynchronizerClient::new(
2103            context.clone(),
2104            Some(mock_client.clone()),
2105            Some(mock_client.clone()),
2106        ));
2107        let peers_pool = Arc::new(PeersPool::new(context.clone()));
2108        let _handle = Synchronizer::start(
2109            network_client,
2110            context.clone(),
2111            core_dispatcher.clone(),
2112            commit_vote_monitor.clone(),
2113            block_verifier,
2114            transaction_vote_tracker,
2115            round_tracker,
2116            dag_state.clone(),
2117            peers_pool.clone(),
2118            false,
2119        );
2120
2121        // Wait just under COMMIT_PROGRESS_TIMEOUT — sync should be skipped (commit lagging).
2122        sleep(COMMIT_PROGRESS_TIMEOUT - Duration::from_millis(100)).await;
2123        let added_blocks = core_dispatcher.get_add_blocks().await;
2124        assert_eq!(added_blocks, vec![]);
2125
2126        // Re-stub missing blocks (consumed by earlier get_missing_blocks calls).
2127        core_dispatcher
2128            .stub_missing_blocks(missing_blocks.clone())
2129            .await;
2130        // Stub the failover fetch (empty block_refs key, peer 1) before the stall triggers.
2131        // Use latency to prevent immediate completion and re-triggering.
2132        mock_client
2133            .stub_fetch_blocks_for_key(
2134                vec![],
2135                expected_blocks.clone(),
2136                AuthorityIndex::new_for_test(1),
2137                Some(Duration::from_millis(500)),
2138            )
2139            .await;
2140
2141        // Now sleep past the stall trigger + time for the fetch to complete.
2142        sleep(Duration::from_millis(200) + FETCH_REQUEST_TIMEOUT).await;
2143
2144        let mut added_blocks = core_dispatcher.get_add_blocks().await;
2145        assert!(
2146            !added_blocks.is_empty(),
2147            "Expected periodic sync to resume after commit sync stall"
2148        );
2149        added_blocks.sort_by_key(|block| block.reference());
2150        expected_blocks.sort_by_key(|block| block.reference());
2151        assert_eq!(added_blocks, expected_blocks);
2152
2153        // AND advance commit index enough to reach commit sync batch size from stall start.
2154        // This should resolve the failover and skip periodic sync again.
2155        core_dispatcher.get_add_blocks().await;
2156        {
2157            let current = dag_state.read().last_commit_index();
2158            let mut d = dag_state.write();
2159            // Advance well past the recovery threshold.
2160            for index in (current + 1)..=(current + context.parameters.commit_sync_batch_size) {
2161                let commit =
2162                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
2163                d.add_commit(commit);
2164            }
2165        }
2166
2167        // Re-stub missing blocks and fetch response for another round of checking
2168        core_dispatcher
2169            .stub_missing_blocks(missing_blocks.clone())
2170            .await;
2171        mock_client
2172            .stub_fetch_blocks(
2173                stub_blocks
2174                    .iter()
2175                    .take(context.parameters.max_blocks_per_sync)
2176                    .cloned()
2177                    .collect::<Vec<_>>(),
2178                AuthorityIndex::new_for_test(2),
2179                None,
2180            )
2181            .await;
2182
2183        // Wait for a sync cycle — periodic sync should be skipped again since stall resolved
2184        // but commit is still lagging
2185        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
2186        let added_blocks = core_dispatcher.get_add_blocks().await;
2187        assert_eq!(
2188            added_blocks,
2189            vec![],
2190            "Expected periodic sync to be skipped after stall resolved"
2191        );
2192    }
2193
2194    #[tokio::test(flavor = "current_thread", start_paused = true)]
2195    async fn synchronizer_fetch_own_last_block() {
2196        // GIVEN
2197        let (context, _) = Context::new_for_test(4);
2198        let context = Arc::new(context.with_parameters(Parameters {
2199            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
2200            ..Default::default()
2201        }));
2202        let block_verifier = Arc::new(NoopBlockVerifier {});
2203        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2204        let mock_client = Arc::new(MockNetworkClient::default());
2205        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2206        let store = Arc::new(MemStore::new());
2207        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2208        let transaction_vote_tracker =
2209            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2210        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2211        let our_index = AuthorityIndex::new_for_test(0);
2212
2213        // Create some test blocks
2214        let mut expected_blocks = (9..=10)
2215            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
2216            .collect::<Vec<_>>();
2217
2218        // Now set different latest blocks for the peers
2219        // For peer 1 we give the block of round 10 (highest)
2220        let block_1 = expected_blocks.pop().unwrap();
2221        mock_client
2222            .stub_fetch_latest_blocks(
2223                vec![block_1.clone()],
2224                AuthorityIndex::new_for_test(1),
2225                vec![our_index],
2226                None,
2227            )
2228            .await;
2229        mock_client
2230            .stub_fetch_latest_blocks(
2231                vec![block_1],
2232                AuthorityIndex::new_for_test(1),
2233                vec![our_index],
2234                None,
2235            )
2236            .await;
2237
2238        // For peer 2 we give the block of round 9
2239        let block_2 = expected_blocks.pop().unwrap();
2240        mock_client
2241            .stub_fetch_latest_blocks(
2242                vec![block_2.clone()],
2243                AuthorityIndex::new_for_test(2),
2244                vec![our_index],
2245                Some(Duration::from_secs(10)),
2246            )
2247            .await;
2248        mock_client
2249            .stub_fetch_latest_blocks(
2250                vec![block_2],
2251                AuthorityIndex::new_for_test(2),
2252                vec![our_index],
2253                None,
2254            )
2255            .await;
2256
2257        // For peer 3 we don't give any block - and it should return an empty vector
2258        mock_client
2259            .stub_fetch_latest_blocks(
2260                vec![],
2261                AuthorityIndex::new_for_test(3),
2262                vec![our_index],
2263                Some(Duration::from_secs(10)),
2264            )
2265            .await;
2266        mock_client
2267            .stub_fetch_latest_blocks(
2268                vec![],
2269                AuthorityIndex::new_for_test(3),
2270                vec![our_index],
2271                None,
2272            )
2273            .await;
2274
2275        // WHEN start the synchronizer and wait for a couple of seconds
2276        let network_client = Arc::new(SynchronizerClient::new(
2277            context.clone(),
2278            Some(mock_client.clone()),
2279            Some(mock_client.clone()),
2280        ));
2281        let peers_pool = Arc::new(PeersPool::new(context.clone()));
2282        let handle = Synchronizer::start(
2283            network_client,
2284            context.clone(),
2285            core_dispatcher.clone(),
2286            commit_vote_monitor,
2287            block_verifier,
2288            transaction_vote_tracker,
2289            round_tracker,
2290            dag_state,
2291            peers_pool.clone(),
2292            true,
2293        );
2294
2295        // Wait at least for the timeout time
2296        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
2297
2298        // Assert that core has been called to set the min propose round
2299        assert_eq!(
2300            core_dispatcher.get_last_own_proposed_round().await,
2301            vec![10]
2302        );
2303
2304        // Ensure that all the requests have been called
2305        assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
2306
2307        // And we got one retry
2308        assert_eq!(
2309            context
2310                .metrics
2311                .node_metrics
2312                .sync_last_known_own_block_retries
2313                .get(),
2314            1
2315        );
2316
2317        // Ensure that no panic occurred
2318        if let Err(err) = handle.stop().await
2319            && err.is_panic()
2320        {
2321            std::panic::resume_unwind(err.into_panic());
2322        }
2323    }
2324
2325    #[tokio::test]
2326    async fn test_process_fetched_blocks() {
2327        // GIVEN
2328        let (context, _) = Context::new_for_test(4);
2329        let context = Arc::new(context);
2330        let block_verifier = Arc::new(NoopBlockVerifier {});
2331        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
2332        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
2333        let store = Arc::new(MemStore::new());
2334        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
2335        let transaction_vote_tracker =
2336            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
2337        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
2338        let (commands_sender, _commands_receiver) =
2339            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
2340
2341        // Create input test blocks:
2342        // - Authority 0 block at round 60.
2343        // - Authority 1 blocks from round 30 to 60.
2344        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2345        expected_blocks.extend(
2346            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2347        );
2348        assert_eq!(
2349            expected_blocks.len(),
2350            context.parameters.max_blocks_per_sync
2351        );
2352
2353        let expected_serialized_blocks = expected_blocks
2354            .iter()
2355            .map(|b| b.serialized().clone())
2356            .collect::<Vec<_>>();
2357
2358        let expected_block_refs = expected_blocks
2359            .iter()
2360            .map(|b| b.reference())
2361            .collect::<BTreeSet<_>>();
2362
2363        // GIVEN peer to fetch blocks from
2364        let peer_index = AuthorityIndex::new_for_test(2);
2365        let peer = PeerId::Validator(peer_index);
2366
2367        // Create blocks_guard
2368        let inflight_blocks_map = InflightBlocksMap::new();
2369        let blocks_guard = inflight_blocks_map
2370            .lock_blocks(expected_block_refs.clone(), peer.clone())
2371            .expect("Failed to lock blocks");
2372
2373        assert_eq!(
2374            inflight_blocks_map.num_of_locked_blocks(),
2375            expected_block_refs.len()
2376        );
2377
2378        // Create a Synchronizer
2379        let result = Synchronizer::<
2380            NoopBlockVerifier,
2381            MockCoreThreadDispatcher,
2382            MockNetworkClient,
2383            MockNetworkClient,
2384        >::process_fetched_blocks(
2385            expected_serialized_blocks,
2386            peer,
2387            blocks_guard, // The guard is consumed here
2388            core_dispatcher.clone(),
2389            block_verifier,
2390            transaction_vote_tracker,
2391            commit_vote_monitor,
2392            context.clone(),
2393            commands_sender,
2394            round_tracker,
2395            "test",
2396        )
2397        .await;
2398
2399        // THEN
2400        assert!(result.is_ok());
2401
2402        // Check blocks were sent to core
2403        let added_blocks = core_dispatcher.get_add_blocks().await;
2404        assert_eq!(
2405            added_blocks
2406                .iter()
2407                .map(|b| b.reference())
2408                .collect::<BTreeSet<_>>(),
2409            expected_block_refs,
2410        );
2411
2412        // Check blocks were unlocked
2413        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2414    }
2415}