consensus_core/
synchronizer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{
4    collections::{BTreeMap, BTreeSet, HashMap},
5    sync::Arc,
6    time::Duration,
7};
8
9use bytes::Bytes;
10use consensus_config::AuthorityIndex;
11use consensus_types::block::{BlockRef, Round};
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use itertools::Itertools as _;
14use mysten_common::debug_fatal;
15use mysten_metrics::{
16    monitored_future,
17    monitored_mpsc::{Receiver, Sender, channel},
18    monitored_scope,
19};
20use parking_lot::{Mutex, RwLock};
21use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
22use sui_macros::fail_point_async;
23use tap::TapFallible;
24use tokio::{
25    runtime::Handle,
26    sync::{mpsc::error::TrySendError, oneshot},
27    task::{JoinError, JoinSet},
28    time::{Instant, sleep, sleep_until, timeout},
29};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::{
33    BlockAPI,
34    block::{ExtendedBlock, SignedBlock, VerifiedBlock},
35    block_verifier::BlockVerifier,
36    commit_vote_monitor::CommitVoteMonitor,
37    context::Context,
38    dag_state::DagState,
39    error::{ConsensusError, ConsensusResult},
40    network::{ObserverNetworkClient, SynchronizerClient, ValidatorNetworkClient},
41    round_tracker::RoundTracker,
42};
43use crate::{
44    authority_service::COMMIT_LAG_MULTIPLIER, core_thread::CoreThreadDispatcher,
45    transaction_certifier::TransactionCertifier,
46};
47
48/// The number of concurrent fetch blocks requests per authority
49const FETCH_BLOCKS_CONCURRENCY: usize = 5;
50
51/// Timeouts when fetching blocks.
52const FETCH_REQUEST_TIMEOUT: Duration = Duration::from_millis(2_000);
53const FETCH_FROM_PEERS_TIMEOUT: Duration = Duration::from_millis(4_000);
54
55const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
56
57// Max number of peers to request missing blocks concurrently in periodic sync.
58const MAX_PERIODIC_SYNC_PEERS: usize = 3;
59
60struct BlocksGuard {
61    map: Arc<InflightBlocksMap>,
62    block_refs: BTreeSet<BlockRef>,
63    peer: AuthorityIndex,
64}
65
66impl Drop for BlocksGuard {
67    fn drop(&mut self) {
68        self.map.unlock_blocks(&self.block_refs, self.peer);
69    }
70}
71
72// Keeps a mapping between the missing blocks that have been instructed to be fetched and the authorities
73// that are currently fetching them. For a block ref there is a maximum number of authorities that can
74// concurrently fetch it. The authority ids that are currently fetching a block are set on the corresponding
75// `BTreeSet` and basically they act as "locks".
76struct InflightBlocksMap {
77    inner: Mutex<HashMap<BlockRef, BTreeSet<AuthorityIndex>>>,
78}
79
80impl InflightBlocksMap {
81    fn new() -> Arc<Self> {
82        Arc::new(Self {
83            inner: Mutex::new(HashMap::new()),
84        })
85    }
86
87    /// Locks the blocks to be fetched for the assigned `peer_index`. We want to avoid re-fetching the
88    /// missing blocks from too many authorities at the same time, thus we limit the concurrency
89    /// per block by attempting to lock per block. If a block is already fetched by the maximum allowed
90    /// number of authorities, then the block ref will not be included in the returned set. The method
91    /// returns all the block refs that have been successfully locked and allowed to be fetched.
92    fn lock_blocks(
93        self: &Arc<Self>,
94        missing_block_refs: BTreeSet<BlockRef>,
95        peer: AuthorityIndex,
96    ) -> Option<BlocksGuard> {
97        let mut blocks = BTreeSet::new();
98        let mut inner = self.inner.lock();
99
100        for block_ref in missing_block_refs {
101            // check that the number of authorities that are already instructed to fetch the block is not
102            // higher than the allowed and the `peer_index` has not already been instructed to do that.
103            let authorities = inner.entry(block_ref).or_default();
104            if authorities.len() < MAX_AUTHORITIES_TO_FETCH_PER_BLOCK
105                && authorities.get(&peer).is_none()
106            {
107                assert!(authorities.insert(peer));
108                blocks.insert(block_ref);
109            }
110        }
111
112        if blocks.is_empty() {
113            None
114        } else {
115            Some(BlocksGuard {
116                map: self.clone(),
117                block_refs: blocks,
118                peer,
119            })
120        }
121    }
122
123    /// Unlocks the provided block references for the given `peer`. The unlocking is strict, meaning that
124    /// if this method is called for a specific block ref and peer more times than the corresponding lock
125    /// has been called, it will panic.
126    fn unlock_blocks(self: &Arc<Self>, block_refs: &BTreeSet<BlockRef>, peer: AuthorityIndex) {
127        // Now mark all the blocks as fetched from the map
128        let mut blocks_to_fetch = self.inner.lock();
129        for block_ref in block_refs {
130            let authorities = blocks_to_fetch
131                .get_mut(block_ref)
132                .expect("Should have found a non empty map");
133
134            assert!(authorities.remove(&peer), "Peer index should be present!");
135
136            // if the last one then just clean up
137            if authorities.is_empty() {
138                blocks_to_fetch.remove(block_ref);
139            }
140        }
141    }
142
143    /// Drops the provided `blocks_guard` which will force to unlock the blocks, and lock now again the
144    /// referenced block refs. The swap is best effort and there is no guarantee that the `peer` will
145    /// be able to acquire the new locks.
146    fn swap_locks(
147        self: &Arc<Self>,
148        blocks_guard: BlocksGuard,
149        peer: AuthorityIndex,
150    ) -> Option<BlocksGuard> {
151        let block_refs = blocks_guard.block_refs.clone();
152
153        // Explicitly drop the guard
154        drop(blocks_guard);
155
156        // Now create new guard
157        self.lock_blocks(block_refs, peer)
158    }
159
160    #[cfg(test)]
161    fn num_of_locked_blocks(self: &Arc<Self>) -> usize {
162        let inner = self.inner.lock();
163        inner.len()
164    }
165}
166
167enum Command {
168    FetchBlocks {
169        missing_block_refs: BTreeSet<BlockRef>,
170        peer_index: AuthorityIndex,
171        result: oneshot::Sender<Result<(), ConsensusError>>,
172    },
173    FetchOwnLastBlock,
174    KickOffScheduler,
175}
176
177pub(crate) struct SynchronizerHandle {
178    commands_sender: Sender<Command>,
179    tasks: tokio::sync::Mutex<JoinSet<()>>,
180}
181
182impl SynchronizerHandle {
183    /// Explicitly asks from the synchronizer to fetch the blocks - provided the block_refs set - from
184    /// the peer authority.
185    pub(crate) async fn fetch_blocks(
186        &self,
187        missing_block_refs: BTreeSet<BlockRef>,
188        peer_index: AuthorityIndex,
189    ) -> ConsensusResult<()> {
190        let (sender, receiver) = oneshot::channel();
191        self.commands_sender
192            .send(Command::FetchBlocks {
193                missing_block_refs,
194                peer_index,
195                result: sender,
196            })
197            .await
198            .map_err(|_err| ConsensusError::Shutdown)?;
199        receiver.await.map_err(|_err| ConsensusError::Shutdown)?
200    }
201
202    pub(crate) async fn stop(&self) -> Result<(), JoinError> {
203        let mut tasks = self.tasks.lock().await;
204        tasks.abort_all();
205        while let Some(result) = tasks.join_next().await {
206            result?
207        }
208        Ok(())
209    }
210}
211
212/// `Synchronizer` oversees live block synchronization, crucial for node progress. Live synchronization
213/// refers to the process of retrieving missing blocks, particularly those essential for advancing a node
214/// when data from only a few rounds is absent. If a node significantly lags behind the network,
215/// `commit_syncer` handles fetching missing blocks via a more efficient approach. `Synchronizer`
216/// aims for swift catch-up employing two mechanisms:
217///
218/// 1. Explicitly requesting missing blocks from designated authorities via the "block send" path.
219///    This includes attempting to fetch any missing ancestors necessary for processing a received block.
220///    Such requests prioritize the block author, maximizing the chance of prompt retrieval.
221///    A locking mechanism allows concurrent requests for missing blocks from up to two authorities
222///    simultaneously, enhancing the chances of timely retrieval. Notably, if additional missing blocks
223///    arise during block processing, requests to the same authority are deferred to the scheduler.
224///
225/// 2. Periodically requesting missing blocks via a scheduler. This primarily serves to retrieve
226///    missing blocks that were not ancestors of a received block via the "block send" path.
227///    The scheduler operates on either a fixed periodic basis or is triggered immediately
228///    after explicit fetches described in (1), ensuring continued block retrieval if gaps persist.
229///
230/// Additionally to the above, the synchronizer can synchronize and fetch the last own proposed block
231/// from the network peers as best effort approach to recover node from amnesia and avoid making the
232/// node equivocate.
233pub(crate) struct Synchronizer<
234    V: BlockVerifier,
235    D: CoreThreadDispatcher,
236    VC: ValidatorNetworkClient,
237    OC: ObserverNetworkClient,
238> {
239    context: Arc<Context>,
240    commands_receiver: Receiver<Command>,
241    fetch_block_senders: BTreeMap<AuthorityIndex, Sender<BlocksGuard>>,
242    core_dispatcher: Arc<D>,
243    commit_vote_monitor: Arc<CommitVoteMonitor>,
244    dag_state: Arc<RwLock<DagState>>,
245    fetch_blocks_scheduler_task: JoinSet<()>,
246    fetch_own_last_block_task: JoinSet<()>,
247    network_client: Arc<SynchronizerClient<VC, OC>>,
248    block_verifier: Arc<V>,
249    transaction_certifier: TransactionCertifier,
250    round_tracker: Arc<RwLock<RoundTracker>>,
251    inflight_blocks_map: Arc<InflightBlocksMap>,
252    commands_sender: Sender<Command>,
253}
254
255impl<V, D, VC, OC> Synchronizer<V, D, VC, OC>
256where
257    V: BlockVerifier,
258    D: CoreThreadDispatcher,
259    VC: ValidatorNetworkClient,
260    OC: ObserverNetworkClient,
261{
262    pub(crate) fn start(
263        network_client: Arc<SynchronizerClient<VC, OC>>,
264        context: Arc<Context>,
265        core_dispatcher: Arc<D>,
266        commit_vote_monitor: Arc<CommitVoteMonitor>,
267        block_verifier: Arc<V>,
268        transaction_certifier: TransactionCertifier,
269        round_tracker: Arc<RwLock<RoundTracker>>,
270        dag_state: Arc<RwLock<DagState>>,
271        sync_last_known_own_block: bool,
272    ) -> Arc<SynchronizerHandle> {
273        let (commands_sender, commands_receiver) =
274            channel("consensus_synchronizer_commands", 1_000);
275        let inflight_blocks_map = InflightBlocksMap::new();
276
277        // Spawn the tasks to fetch the blocks from the others
278        let mut fetch_block_senders = BTreeMap::new();
279        let mut tasks = JoinSet::new();
280        for (index, _) in context.committee.authorities() {
281            if index == context.own_index {
282                continue;
283            }
284            let (sender, receiver) =
285                channel("consensus_synchronizer_fetches", FETCH_BLOCKS_CONCURRENCY);
286            let fetch_blocks_from_authority_async = Self::fetch_blocks_from_authority(
287                index,
288                network_client.clone(),
289                block_verifier.clone(),
290                transaction_certifier.clone(),
291                commit_vote_monitor.clone(),
292                context.clone(),
293                core_dispatcher.clone(),
294                dag_state.clone(),
295                receiver,
296                commands_sender.clone(),
297                round_tracker.clone(),
298            );
299            tasks.spawn(monitored_future!(fetch_blocks_from_authority_async));
300            fetch_block_senders.insert(index, sender);
301        }
302
303        let commands_sender_clone = commands_sender.clone();
304
305        if sync_last_known_own_block {
306            commands_sender
307                .try_send(Command::FetchOwnLastBlock)
308                .expect("Failed to sync our last block");
309        }
310
311        // Spawn the task to listen to the requests & periodic runs
312        tasks.spawn(monitored_future!(async move {
313            let mut s = Self {
314                context,
315                commands_receiver,
316                fetch_block_senders,
317                core_dispatcher,
318                commit_vote_monitor,
319                fetch_blocks_scheduler_task: JoinSet::new(),
320                fetch_own_last_block_task: JoinSet::new(),
321                network_client,
322                block_verifier,
323                transaction_certifier,
324                inflight_blocks_map,
325                commands_sender: commands_sender_clone,
326                dag_state,
327                round_tracker,
328            };
329            s.run().await;
330        }));
331
332        Arc::new(SynchronizerHandle {
333            commands_sender,
334            tasks: tokio::sync::Mutex::new(tasks),
335        })
336    }
337
338    // The main loop to listen for the submitted commands.
339    async fn run(&mut self) {
340        // We want the synchronizer to run periodically every 200ms to fetch any missing blocks.
341        const PERIODIC_FETCH_INTERVAL: Duration = Duration::from_millis(200);
342        let scheduler_timeout = sleep_until(Instant::now() + PERIODIC_FETCH_INTERVAL);
343
344        tokio::pin!(scheduler_timeout);
345
346        loop {
347            tokio::select! {
348                Some(command) = self.commands_receiver.recv() => {
349                    match command {
350                        Command::FetchBlocks{ missing_block_refs, peer_index, result } => {
351                            if peer_index == self.context.own_index {
352                                error!("We should never attempt to fetch blocks from our own node");
353                                continue;
354                            }
355
356                            // Keep only the max allowed blocks to request. Additional missing blocks
357                            // will be fetched via periodic sync.
358                            // Fetch from the lowest to highest round, to ensure progress.
359                            let missing_block_refs = missing_block_refs
360                                .into_iter()
361                                .take(self.context.parameters.max_blocks_per_sync)
362                                .collect();
363
364                            let blocks_guard = self.inflight_blocks_map.lock_blocks(missing_block_refs, peer_index);
365                            let Some(blocks_guard) = blocks_guard else {
366                                result.send(Ok(())).ok();
367                                continue;
368                            };
369
370                            // We don't block if the corresponding peer task is saturated - but we rather drop the request. That's ok as the periodic
371                            // synchronization task will handle any still missing blocks in next run.
372                            let r = self
373                                .fetch_block_senders
374                                .get(&peer_index)
375                                .expect("Fatal error, sender should be present")
376                                .try_send(blocks_guard)
377                                .map_err(|err| {
378                                    match err {
379                                        TrySendError::Full(_) => {
380                                            let peer_hostname = &self.context.committee.authority(peer_index).hostname;
381                                            self.context
382                                                .metrics
383                                                .node_metrics
384                                                .synchronizer_skipped_fetch_requests
385                                                .with_label_values(&[peer_hostname])
386                                                .inc();
387                                            ConsensusError::SynchronizerSaturated(peer_index)
388                                        },
389                                        TrySendError::Closed(_) => ConsensusError::Shutdown
390                                    }
391                                });
392
393                            result.send(r).ok();
394                        }
395                        Command::FetchOwnLastBlock => {
396                            if self.fetch_own_last_block_task.is_empty() {
397                                self.start_fetch_own_last_block_task();
398                            }
399                        }
400                        Command::KickOffScheduler => {
401                            // just reset the scheduler timeout timer to run immediately if not already running.
402                            // If the scheduler is already running then just reduce the remaining time to run.
403                            let timeout = if self.fetch_blocks_scheduler_task.is_empty() {
404                                Instant::now()
405                            } else {
406                                Instant::now() + PERIODIC_FETCH_INTERVAL.checked_div(2).unwrap()
407                            };
408
409                            // only reset if it is earlier than the next deadline
410                            if timeout < scheduler_timeout.deadline() {
411                                scheduler_timeout.as_mut().reset(timeout);
412                            }
413                        }
414                    }
415                },
416                Some(result) = self.fetch_own_last_block_task.join_next(), if !self.fetch_own_last_block_task.is_empty() => {
417                    match result {
418                        Ok(()) => {},
419                        Err(e) => {
420                            if e.is_cancelled() {
421                            } else if e.is_panic() {
422                                std::panic::resume_unwind(e.into_panic());
423                            } else {
424                                panic!("fetch our last block task failed: {e}");
425                            }
426                        },
427                    };
428                },
429                Some(result) = self.fetch_blocks_scheduler_task.join_next(), if !self.fetch_blocks_scheduler_task.is_empty() => {
430                    match result {
431                        Ok(()) => {},
432                        Err(e) => {
433                            if e.is_cancelled() {
434                            } else if e.is_panic() {
435                                std::panic::resume_unwind(e.into_panic());
436                            } else {
437                                panic!("fetch blocks scheduler task failed: {e}");
438                            }
439                        },
440                    };
441                },
442                () = &mut scheduler_timeout => {
443                    // we want to start a new task only if the previous one has already finished.
444                    // TODO: consider starting backup fetches in parallel, when a fetch takes too long?
445                    if self.fetch_blocks_scheduler_task.is_empty()
446                        && let Err(err) = self.start_fetch_missing_blocks_task().await {
447                            debug!("Core is shutting down, synchronizer is shutting down: {err:?}");
448                            return;
449                        };
450
451                    scheduler_timeout
452                        .as_mut()
453                        .reset(Instant::now() + PERIODIC_FETCH_INTERVAL);
454                }
455            }
456        }
457    }
458
459    async fn fetch_blocks_from_authority(
460        peer_index: AuthorityIndex,
461        network_client: Arc<SynchronizerClient<VC, OC>>,
462        block_verifier: Arc<V>,
463        transaction_certifier: TransactionCertifier,
464        commit_vote_monitor: Arc<CommitVoteMonitor>,
465        context: Arc<Context>,
466        core_dispatcher: Arc<D>,
467        dag_state: Arc<RwLock<DagState>>,
468        mut receiver: Receiver<BlocksGuard>,
469        commands_sender: Sender<Command>,
470        round_tracker: Arc<RwLock<RoundTracker>>,
471    ) {
472        const MAX_RETRIES: u32 = 3;
473        let peer_hostname = &context.committee.authority(peer_index).hostname;
474        let mut requests = FuturesUnordered::new();
475
476        loop {
477            tokio::select! {
478                Some(blocks_guard) = receiver.recv(), if requests.len() < FETCH_BLOCKS_CONCURRENCY => {
479                    // get the highest accepted rounds
480                    let highest_rounds = Self::get_highest_accepted_rounds(dag_state.clone(), &context);
481
482                    requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, 1))
483                },
484                Some((response, blocks_guard, retries, _peer, highest_rounds)) = requests.next() => {
485                    match response {
486                        Ok(blocks) => {
487                            if let Err(err) = Self::process_fetched_blocks(blocks,
488                                peer_index,
489                                blocks_guard,
490                                core_dispatcher.clone(),
491                                block_verifier.clone(),
492                                transaction_certifier.clone(),
493                                commit_vote_monitor.clone(),
494                                context.clone(),
495                                commands_sender.clone(),
496                                round_tracker.clone(),
497                                "live"
498                            ).await {
499                                warn!("Error while processing fetched blocks from peer {peer_index} {peer_hostname}: {err}");
500                                context.metrics.node_metrics.synchronizer_process_fetched_failures.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
501                            }
502                        },
503                        Err(_) => {
504                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname.as_str(), "live"]).inc();
505                            if retries <= MAX_RETRIES {
506                                requests.push(Self::fetch_blocks_request(network_client.clone(), peer_index, blocks_guard, highest_rounds, true, FETCH_REQUEST_TIMEOUT, retries))
507                            } else {
508                                warn!("Max retries {retries} reached while trying to fetch blocks from peer {peer_index} {peer_hostname}.");
509                                // we don't necessarily need to do, but dropping the guard here to unlock the blocks
510                                drop(blocks_guard);
511                            }
512                        }
513                    }
514                },
515                else => {
516                    info!("Fetching blocks from authority {peer_index} task will now abort.");
517                    break;
518                }
519            }
520        }
521    }
522
523    /// Processes the requested raw fetched blocks from peer `peer_index`. If no error is returned then
524    /// the verified blocks are immediately sent to Core for processing.
525    async fn process_fetched_blocks(
526        mut serialized_blocks: Vec<Bytes>,
527        peer_index: AuthorityIndex,
528        requested_blocks_guard: BlocksGuard,
529        core_dispatcher: Arc<D>,
530        block_verifier: Arc<V>,
531        transaction_certifier: TransactionCertifier,
532        commit_vote_monitor: Arc<CommitVoteMonitor>,
533        context: Arc<Context>,
534        commands_sender: Sender<Command>,
535        round_tracker: Arc<RwLock<RoundTracker>>,
536        sync_method: &str,
537    ) -> ConsensusResult<()> {
538        if serialized_blocks.is_empty() {
539            return Ok(());
540        }
541
542        // Limit the number of the returned blocks processed.
543        serialized_blocks.truncate(context.parameters.max_blocks_per_sync);
544
545        // Verify all the fetched blocks
546        let blocks = Handle::current()
547            .spawn_blocking({
548                let block_verifier = block_verifier.clone();
549                let context = context.clone();
550                move || {
551                    Self::verify_blocks(
552                        serialized_blocks,
553                        block_verifier,
554                        transaction_certifier,
555                        &context,
556                        peer_index,
557                    )
558                }
559            })
560            .await
561            .expect("Spawn blocking should not fail")?;
562
563        // Record commit votes from the verified blocks.
564        for block in &blocks {
565            commit_vote_monitor.observe_block(block);
566        }
567
568        // Update round tracker from the verified blocks. For fetched blocks,
569        // excluded_ancestors are not available so we use an empty vector.
570        {
571            let mut tracker = round_tracker.write();
572            for block in &blocks {
573                tracker.update_from_verified_block(&ExtendedBlock {
574                    block: block.clone(),
575                    excluded_ancestors: vec![],
576                });
577            }
578        }
579
580        let metrics = &context.metrics.node_metrics;
581        let peer_hostname = &context.committee.authority(peer_index).hostname;
582        metrics
583            .synchronizer_fetched_blocks_by_peer
584            .with_label_values(&[peer_hostname.as_str(), sync_method])
585            .inc_by(blocks.len() as u64);
586        for block in &blocks {
587            let block_hostname = &context.committee.authority(block.author()).hostname;
588            metrics
589                .synchronizer_fetched_blocks_by_authority
590                .with_label_values(&[block_hostname.as_str(), sync_method])
591                .inc();
592        }
593
594        debug!(
595            "Synced {} missing blocks from peer {peer_index} {peer_hostname}: {}",
596            blocks.len(),
597            blocks.iter().map(|b| b.reference().to_string()).join(", "),
598        );
599
600        // Now send them to core for processing. Ignore the returned missing blocks as we don't want
601        // this mechanism to keep feedback looping on fetching more blocks. The periodic synchronization
602        // will take care of that.
603        let missing_blocks = core_dispatcher
604            .add_blocks(blocks)
605            .await
606            .map_err(|_| ConsensusError::Shutdown)?;
607
608        // now release all the locked blocks as they have been fetched, verified & processed
609        drop(requested_blocks_guard);
610
611        // kick off immediately the scheduled synchronizer
612        if !missing_blocks.is_empty() {
613            // do not block here, so we avoid any possible cycles.
614            if let Err(TrySendError::Full(_)) = commands_sender.try_send(Command::KickOffScheduler)
615            {
616                warn!("Commands channel is full")
617            }
618        }
619
620        context
621            .metrics
622            .node_metrics
623            .missing_blocks_after_fetch_total
624            .inc_by(missing_blocks.len() as u64);
625
626        Ok(())
627    }
628
629    fn get_highest_accepted_rounds(
630        dag_state: Arc<RwLock<DagState>>,
631        context: &Arc<Context>,
632    ) -> Vec<Round> {
633        let blocks = dag_state
634            .read()
635            .get_last_cached_block_per_authority(Round::MAX);
636        assert_eq!(blocks.len(), context.committee.size());
637
638        blocks
639            .into_iter()
640            .map(|(block, _)| block.round())
641            .collect::<Vec<_>>()
642    }
643
644    fn verify_blocks(
645        serialized_blocks: Vec<Bytes>,
646        block_verifier: Arc<V>,
647        transaction_certifier: TransactionCertifier,
648        context: &Context,
649        peer_index: AuthorityIndex,
650    ) -> ConsensusResult<Vec<VerifiedBlock>> {
651        let mut verified_blocks = Vec::new();
652        let mut voted_blocks = Vec::new();
653        for serialized_block in serialized_blocks {
654            let signed_block: SignedBlock =
655                bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
656
657            // TODO: cache received and verified block refs to avoid duplicated work.
658            let (verified_block, reject_txn_votes) = block_verifier
659                .verify_and_vote(signed_block, serialized_block)
660                .tap_err(|e| {
661                    let hostname = context.committee.authority(peer_index).hostname.clone();
662                    context
663                        .metrics
664                        .node_metrics
665                        .invalid_blocks
666                        .with_label_values(&[hostname.as_str(), "synchronizer", e.clone().name()])
667                        .inc();
668                    info!("Invalid block received from {}: {}", peer_index, e);
669                })?;
670
671            // TODO: improve efficiency, maybe suspend and continue processing the block asynchronously.
672            let now = context.clock.timestamp_utc_ms();
673            let drift = verified_block.timestamp_ms().saturating_sub(now);
674            if drift > 0 {
675                let peer_hostname = &context
676                    .committee
677                    .authority(verified_block.author())
678                    .hostname;
679                context
680                    .metrics
681                    .node_metrics
682                    .block_timestamp_drift_ms
683                    .with_label_values(&[peer_hostname.as_str(), "synchronizer"])
684                    .inc_by(drift);
685
686                trace!(
687                    "Synced block {} timestamp {} is in the future (now={}).",
688                    verified_block.reference(),
689                    verified_block.timestamp_ms(),
690                    now
691                );
692            }
693
694            verified_blocks.push(verified_block.clone());
695            voted_blocks.push((verified_block, reject_txn_votes));
696        }
697
698        if context.protocol_config.mysticeti_fastpath() {
699            transaction_certifier.add_voted_blocks(voted_blocks);
700        }
701
702        Ok(verified_blocks)
703    }
704
705    async fn fetch_blocks_request(
706        network_client: Arc<SynchronizerClient<VC, OC>>,
707        peer: AuthorityIndex,
708        blocks_guard: BlocksGuard,
709        highest_rounds: Vec<Round>,
710        breadth_first: bool,
711        request_timeout: Duration,
712        mut retries: u32,
713    ) -> (
714        ConsensusResult<Vec<Bytes>>,
715        BlocksGuard,
716        u32,
717        AuthorityIndex,
718        Vec<Round>,
719    ) {
720        use crate::network::PeerId;
721        let start = Instant::now();
722        let resp = timeout(
723            request_timeout,
724            network_client.fetch_blocks(
725                PeerId::Validator(peer),
726                blocks_guard
727                    .block_refs
728                    .clone()
729                    .into_iter()
730                    .collect::<Vec<_>>(),
731                highest_rounds.clone().into_iter().collect::<Vec<_>>(),
732                breadth_first,
733                request_timeout,
734            ),
735        )
736        .await;
737
738        fail_point_async!("consensus-delay");
739
740        let resp = match resp {
741            Ok(Err(err)) => {
742                // Add a delay before retrying - if that is needed. If request has timed out then eventually
743                // this will be a no-op.
744                sleep_until(start + request_timeout).await;
745                retries += 1;
746                Err(err)
747            } // network error
748            Err(err) => {
749                // timeout
750                sleep_until(start + request_timeout).await;
751                retries += 1;
752                Err(ConsensusError::NetworkRequestTimeout(err.to_string()))
753            }
754            Ok(result) => result,
755        };
756        (resp, blocks_guard, retries, peer, highest_rounds)
757    }
758
759    fn start_fetch_own_last_block_task(&mut self) {
760        const FETCH_OWN_BLOCK_RETRY_DELAY: Duration = Duration::from_millis(1_000);
761        const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);
762
763        let context = self.context.clone();
764        let dag_state = self.dag_state.clone();
765        let network_client = self.network_client.clone();
766        let block_verifier = self.block_verifier.clone();
767        let core_dispatcher = self.core_dispatcher.clone();
768
769        self.fetch_own_last_block_task
770            .spawn(monitored_future!(async move {
771                let _scope = monitored_scope("FetchOwnLastBlockTask");
772
773                let fetch_own_block = |authority_index: AuthorityIndex, fetch_own_block_delay: Duration| {
774                    let network_client_cloned = network_client.clone();
775                    let own_index = context.own_index;
776                    async move {
777                        sleep(fetch_own_block_delay).await;
778                        let r = network_client_cloned.fetch_latest_blocks(authority_index, vec![own_index], FETCH_REQUEST_TIMEOUT).await;
779                        (r, authority_index)
780                    }
781                };
782
783
784                let process_blocks = |blocks: Vec<Bytes>, authority_index: AuthorityIndex| -> ConsensusResult<Vec<VerifiedBlock>> {
785                    let mut result = Vec::new();
786                    for serialized_block in blocks {
787                        let signed_block = bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
788                        let (verified_block, _) = block_verifier.verify_and_vote(signed_block, serialized_block).tap_err(|err|{
789                            let hostname = context.committee.authority(authority_index).hostname.clone();
790                            context
791                                .metrics
792                                .node_metrics
793                                .invalid_blocks
794                                .with_label_values(&[hostname.as_str(), "synchronizer_own_block", err.clone().name()])
795                                .inc();
796                            warn!("Invalid block received from {}: {}", authority_index, err);
797                        })?;
798
799                        if verified_block.author() != context.own_index {
800                            return Err(ConsensusError::UnexpectedLastOwnBlock { index: authority_index, block_ref: verified_block.reference()});
801                        }
802                        result.push(verified_block);
803                    }
804                    Ok(result)
805                };
806
807                // Get the highest of all the results. Retry until at least `f+1` results have been gathered.
808                let mut highest_round;
809                let mut retries = 0;
810                let mut retry_delay_step = Duration::from_millis(500);
811                'main:loop {
812                    if context.committee.size() == 1 {
813                        highest_round = dag_state.read().get_last_proposed_block().round();
814                        info!("Only one node in the network, will not try fetching own last block from peers.");
815                        break 'main;
816                    }
817
818                    let mut total_stake = 0;
819                    highest_round = 0;
820
821                    // Ask all the other peers about our last block
822                    let mut results = FuturesUnordered::new();
823
824                    for (authority_index, _authority) in context.committee.authorities() {
825                        if authority_index != context.own_index {
826                            results.push(fetch_own_block(authority_index, Duration::from_millis(0)));
827                        }
828                    }
829
830                    // Gather the results but wait to timeout as well
831                    let timer = sleep_until(Instant::now() + context.parameters.sync_last_known_own_block_timeout);
832                    tokio::pin!(timer);
833
834                    'inner: loop {
835                        tokio::select! {
836                            result = results.next() => {
837                                let Some((result, authority_index)) = result else {
838                                    break 'inner;
839                                };
840                                match result {
841                                    Ok(result) => {
842                                        match process_blocks(result, authority_index) {
843                                            Ok(blocks) => {
844                                                let max_round = blocks.into_iter().map(|b|b.round()).max().unwrap_or(0);
845                                                highest_round = highest_round.max(max_round);
846
847                                                total_stake += context.committee.stake(authority_index);
848                                            },
849                                            Err(err) => {
850                                                warn!("Invalid result returned from {authority_index} while fetching last own block: {err}");
851                                            }
852                                        }
853                                    },
854                                    Err(err) => {
855                                        warn!("Error {err} while fetching our own block from peer {authority_index}. Will retry.");
856                                        results.push(fetch_own_block(authority_index, FETCH_OWN_BLOCK_RETRY_DELAY));
857                                    }
858                                }
859                            },
860                            () = &mut timer => {
861                                info!("Timeout while trying to sync our own last block from peers");
862                                break 'inner;
863                            }
864                        }
865                    }
866
867                    // Request at least f+1 stake to have replied back.
868                    if context.committee.reached_validity(total_stake) {
869                        info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
870                        break 'main;
871                    }
872
873                    retries += 1;
874                    context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
875                    warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
876
877                    sleep(retry_delay_step).await;
878
879                    retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
880                    retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
881                }
882
883                // Update the Core with the highest detected round
884                context.metrics.node_metrics.last_known_own_block_round.set(highest_round as i64);
885
886                if let Err(err) = core_dispatcher.set_last_known_proposed_round(highest_round) {
887                    warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
888                }
889            }));
890    }
891
892    async fn start_fetch_missing_blocks_task(&mut self) -> ConsensusResult<()> {
893        if self.context.committee.size() == 1 {
894            trace!(
895                "Only one node in the network, will not try fetching missing blocks from peers."
896            );
897            return Ok(());
898        }
899
900        let missing_blocks = self
901            .core_dispatcher
902            .get_missing_blocks()
903            .await
904            .map_err(|_err| ConsensusError::Shutdown)?;
905
906        // No reason to kick off the scheduler if there are no missing blocks to fetch
907        if missing_blocks.is_empty() {
908            return Ok(());
909        }
910
911        let context = self.context.clone();
912        let network_client = self.network_client.clone();
913        let block_verifier = self.block_verifier.clone();
914        let transaction_certifier = self.transaction_certifier.clone();
915        let commit_vote_monitor = self.commit_vote_monitor.clone();
916        let core_dispatcher = self.core_dispatcher.clone();
917        let blocks_to_fetch = self.inflight_blocks_map.clone();
918        let commands_sender = self.commands_sender.clone();
919        let dag_state = self.dag_state.clone();
920        let round_tracker = self.round_tracker.clone();
921
922        // If we are commit lagging, then we don't want to enable the scheduler. As the node is sycnhronizing via the commit syncer, the certified commits
923        // will bring all the necessary blocks to run the commits. As the commits are certified, we are guaranteed that all the necessary causal history is present.
924        if self.is_commit_lagging() {
925            return Ok(());
926        }
927
928        self.fetch_blocks_scheduler_task
929            .spawn(monitored_future!(async move {
930                let _scope = monitored_scope("FetchMissingBlocksScheduler");
931                context
932                    .metrics
933                    .node_metrics
934                    .fetch_blocks_scheduler_inflight
935                    .inc();
936                let total_requested = missing_blocks.len();
937
938                fail_point_async!("consensus-delay");
939                // Fetch blocks from peers
940                let results = Self::fetch_blocks_from_authorities(
941                    context.clone(),
942                    blocks_to_fetch.clone(),
943                    network_client,
944                    missing_blocks,
945                    dag_state,
946                )
947                .await;
948                context
949                    .metrics
950                    .node_metrics
951                    .fetch_blocks_scheduler_inflight
952                    .dec();
953                if results.is_empty() {
954                    return;
955                }
956
957                // Now process the returned results
958                let mut total_fetched = 0;
959                for (blocks_guard, fetched_blocks, peer) in results {
960                    total_fetched += fetched_blocks.len();
961
962                    if let Err(err) = Self::process_fetched_blocks(
963                        fetched_blocks,
964                        peer,
965                        blocks_guard,
966                        core_dispatcher.clone(),
967                        block_verifier.clone(),
968                        transaction_certifier.clone(),
969                        commit_vote_monitor.clone(),
970                        context.clone(),
971                        commands_sender.clone(),
972                        round_tracker.clone(),
973                        "periodic",
974                    )
975                    .await
976                    {
977                        warn!(
978                            "Error occurred while processing fetched blocks from peer {peer}: {err}"
979                        );
980                        context
981                            .metrics
982                            .node_metrics
983                            .synchronizer_process_fetched_failures
984                            .with_label_values(&[
985                                context.committee.authority(peer).hostname.as_str(),
986                                "periodic",
987                            ])
988                            .inc();
989                    }
990                }
991
992                debug!(
993                    "Total blocks requested to fetch: {}, total fetched: {}",
994                    total_requested, total_fetched
995                );
996            }));
997        Ok(())
998    }
999
1000    fn is_commit_lagging(&self) -> bool {
1001        let last_commit_index = self.dag_state.read().last_commit_index();
1002        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
1003        let commit_threshold = last_commit_index
1004            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER;
1005        commit_threshold < quorum_commit_index
1006    }
1007
1008    /// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
1009    /// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
1010    /// authorities with missing blocks.
1011    /// Each element of the vector is a tuple which contains the requested missing block refs, the returned blocks and
1012    /// the peer authority index.
1013    async fn fetch_blocks_from_authorities(
1014        context: Arc<Context>,
1015        inflight_blocks: Arc<InflightBlocksMap>,
1016        network_client: Arc<SynchronizerClient<VC, OC>>,
1017        missing_blocks: BTreeSet<BlockRef>,
1018        dag_state: Arc<RwLock<DagState>>,
1019    ) -> Vec<(BlocksGuard, Vec<Bytes>, AuthorityIndex)> {
1020        // Preliminary truncation of missing blocks to fetch. Since each peer can have different
1021        // number of missing blocks and the fetching is batched by peer, so keep more than max_blocks_per_sync
1022        // per peer on average.
1023        let missing_blocks = missing_blocks
1024            .into_iter()
1025            .take(2 * MAX_PERIODIC_SYNC_PEERS * context.parameters.max_blocks_per_sync)
1026            .collect::<Vec<_>>();
1027
1028        // Maps authorities to the missing blocks they have.
1029        let mut authorities = BTreeMap::<AuthorityIndex, Vec<BlockRef>>::new();
1030        for block_ref in &missing_blocks {
1031            authorities
1032                .entry(block_ref.author)
1033                .or_default()
1034                .push(*block_ref);
1035        }
1036        // Distribute the same number of authorities into each peer to sync.
1037        // When running this function, context.committee.size() is always greater than 1.
1038        let num_authorities_per_peer = authorities
1039            .len()
1040            .div_ceil((context.committee.size() - 1).min(MAX_PERIODIC_SYNC_PEERS));
1041
1042        // Update metrics related to missing blocks.
1043        let mut missing_blocks_per_authority = vec![0; context.committee.size()];
1044        for (authority, blocks) in &authorities {
1045            missing_blocks_per_authority[*authority] += blocks.len();
1046        }
1047        for (missing, (_, authority)) in missing_blocks_per_authority
1048            .into_iter()
1049            .zip(context.committee.authorities())
1050        {
1051            context
1052                .metrics
1053                .node_metrics
1054                .synchronizer_missing_blocks_by_authority
1055                .with_label_values(&[&authority.hostname])
1056                .inc_by(missing as u64);
1057            context
1058                .metrics
1059                .node_metrics
1060                .synchronizer_current_missing_blocks_by_authority
1061                .with_label_values(&[&authority.hostname])
1062                .set(missing as i64);
1063        }
1064
1065        let mut peers = context
1066            .committee
1067            .authorities()
1068            .filter_map(|(peer_index, _)| (peer_index != context.own_index).then_some(peer_index))
1069            .collect::<Vec<_>>();
1070
1071        // TODO: probably inject the RNG to allow unit testing - this is a work around for now.
1072        if cfg!(not(test)) {
1073            // Shuffle the peers
1074            peers.shuffle(&mut ThreadRng::default());
1075        }
1076
1077        let mut peers = peers.into_iter();
1078        let mut request_futures = FuturesUnordered::new();
1079
1080        let highest_rounds = Self::get_highest_accepted_rounds(dag_state, &context);
1081
1082        // Shuffle the authorities for each request.
1083        let mut authorities = authorities.into_values().collect::<Vec<_>>();
1084        if cfg!(not(test)) {
1085            // Shuffle the authorities
1086            authorities.shuffle(&mut ThreadRng::default());
1087        }
1088
1089        // Send the fetch requests
1090        for batch in authorities.chunks(num_authorities_per_peer) {
1091            let Some(peer) = peers.next() else {
1092                debug_fatal!("No more peers left to fetch blocks!");
1093                break;
1094            };
1095            let peer_hostname = &context.committee.authority(peer).hostname;
1096            // Fetch from the lowest round missing blocks to ensure progress.
1097            // This may reduce efficiency and increase the chance of duplicated data transfer in edge cases.
1098            let block_refs = batch
1099                .iter()
1100                .flatten()
1101                .cloned()
1102                .collect::<BTreeSet<_>>()
1103                .into_iter()
1104                .take(context.parameters.max_blocks_per_sync)
1105                .collect::<BTreeSet<_>>();
1106
1107            // lock the blocks to be fetched. If no lock can be acquired for any of the blocks then don't bother
1108            if let Some(blocks_guard) = inflight_blocks.lock_blocks(block_refs.clone(), peer) {
1109                info!(
1110                    "Periodic sync of {} missing blocks from peer {} {}: {}",
1111                    block_refs.len(),
1112                    peer,
1113                    peer_hostname,
1114                    block_refs
1115                        .iter()
1116                        .map(|b| b.to_string())
1117                        .collect::<Vec<_>>()
1118                        .join(", ")
1119                );
1120                request_futures.push(Self::fetch_blocks_request(
1121                    network_client.clone(),
1122                    peer,
1123                    blocks_guard,
1124                    highest_rounds.clone(),
1125                    false,
1126                    FETCH_REQUEST_TIMEOUT,
1127                    1,
1128                ));
1129            }
1130        }
1131
1132        let mut results = Vec::new();
1133        let fetcher_timeout = sleep(FETCH_FROM_PEERS_TIMEOUT);
1134
1135        tokio::pin!(fetcher_timeout);
1136
1137        loop {
1138            tokio::select! {
1139                Some((response, blocks_guard, _retries, peer_index, highest_rounds)) = request_futures.next() => {
1140                    let peer_hostname = &context.committee.authority(peer_index).hostname;
1141                    match response {
1142                        Ok(fetched_blocks) => {
1143                            results.push((blocks_guard, fetched_blocks, peer_index));
1144
1145                            // no more pending requests are left, just break the loop
1146                            if request_futures.is_empty() {
1147                                break;
1148                            }
1149                        },
1150                        Err(_) => {
1151                            context.metrics.node_metrics.synchronizer_fetch_failures.with_label_values(&[peer_hostname.as_str(), "periodic"]).inc();
1152                            // try again if there is any peer left
1153                            if let Some(next_peer) = peers.next() {
1154                                // do best effort to lock guards. If we can't lock then don't bother at this run.
1155                                if let Some(blocks_guard) = inflight_blocks.swap_locks(blocks_guard, next_peer) {
1156                                    info!(
1157                                        "Retrying syncing {} missing blocks from peer {}: {}",
1158                                        blocks_guard.block_refs.len(),
1159                                        peer_hostname,
1160                                        blocks_guard.block_refs
1161                                            .iter()
1162                                            .map(|b| b.to_string())
1163                                            .collect::<Vec<_>>()
1164                                            .join(", ")
1165                                    );
1166                                    request_futures.push(Self::fetch_blocks_request(
1167                                        network_client.clone(),
1168                                        next_peer,
1169                                        blocks_guard,
1170                                        highest_rounds,
1171                                        false,
1172                                        FETCH_REQUEST_TIMEOUT,
1173                                        1,
1174                                    ));
1175                                } else {
1176                                    debug!("Couldn't acquire locks to fetch blocks from peer {next_peer}.")
1177                                }
1178                            } else {
1179                                debug!("No more peers left to fetch blocks");
1180                            }
1181                        }
1182                    }
1183                },
1184                _ = &mut fetcher_timeout => {
1185                    debug!("Timed out while fetching missing blocks");
1186                    break;
1187                }
1188            }
1189        }
1190
1191        results
1192    }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use std::{
1198        collections::{BTreeMap, BTreeSet},
1199        sync::Arc,
1200        time::Duration,
1201    };
1202
1203    use async_trait::async_trait;
1204    use bytes::Bytes;
1205    use consensus_config::{AuthorityIndex, Parameters};
1206    use consensus_types::block::{BlockDigest, BlockRef, Round};
1207    use mysten_metrics::monitored_mpsc;
1208    use parking_lot::RwLock;
1209    use tokio::{sync::Mutex, time::sleep};
1210
1211    use crate::commit::{CommitVote, TrustedCommit};
1212    use crate::{
1213        CommitDigest, CommitIndex,
1214        block::{TestBlock, VerifiedBlock},
1215        block_verifier::NoopBlockVerifier,
1216        commit_vote_monitor::CommitVoteMonitor,
1217        context::Context,
1218        core_thread::CoreThreadDispatcher,
1219        dag_state::DagState,
1220        error::{ConsensusError, ConsensusResult},
1221        network::{BlockStream, ObserverNetworkClient, SynchronizerClient, ValidatorNetworkClient},
1222        storage::mem_store::MemStore,
1223        synchronizer::{
1224            FETCH_BLOCKS_CONCURRENCY, FETCH_REQUEST_TIMEOUT, InflightBlocksMap, Synchronizer,
1225        },
1226    };
1227    use crate::{
1228        authority_service::COMMIT_LAG_MULTIPLIER, core_thread::MockCoreThreadDispatcher,
1229        round_tracker::RoundTracker, transaction_certifier::TransactionCertifier,
1230    };
1231
1232    type FetchRequestKey = (Vec<BlockRef>, AuthorityIndex);
1233    type FetchRequestResponse = (Vec<VerifiedBlock>, Option<Duration>);
1234    type FetchLatestBlockKey = (AuthorityIndex, Vec<AuthorityIndex>);
1235    type FetchLatestBlockResponse = (Vec<VerifiedBlock>, Option<Duration>);
1236
1237    #[derive(Default)]
1238    struct MockNetworkClient {
1239        fetch_blocks_requests: Mutex<BTreeMap<FetchRequestKey, FetchRequestResponse>>,
1240        fetch_latest_blocks_requests:
1241            Mutex<BTreeMap<FetchLatestBlockKey, Vec<FetchLatestBlockResponse>>>,
1242    }
1243
1244    impl MockNetworkClient {
1245        async fn stub_fetch_blocks(
1246            &self,
1247            blocks: Vec<VerifiedBlock>,
1248            peer: AuthorityIndex,
1249            latency: Option<Duration>,
1250        ) {
1251            let mut lock = self.fetch_blocks_requests.lock().await;
1252            let block_refs = blocks
1253                .iter()
1254                .map(|block| block.reference())
1255                .collect::<Vec<_>>();
1256            lock.insert((block_refs, peer), (blocks, latency));
1257        }
1258
1259        async fn stub_fetch_latest_blocks(
1260            &self,
1261            blocks: Vec<VerifiedBlock>,
1262            peer: AuthorityIndex,
1263            authorities: Vec<AuthorityIndex>,
1264            latency: Option<Duration>,
1265        ) {
1266            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1267            lock.entry((peer, authorities))
1268                .or_default()
1269                .push((blocks, latency));
1270        }
1271
1272        async fn fetch_latest_blocks_pending_calls(&self) -> usize {
1273            let lock = self.fetch_latest_blocks_requests.lock().await;
1274            lock.len()
1275        }
1276    }
1277
1278    #[async_trait]
1279    impl ValidatorNetworkClient for MockNetworkClient {
1280        async fn subscribe_blocks(
1281            &self,
1282            _peer: AuthorityIndex,
1283            _last_received: Round,
1284            _timeout: Duration,
1285        ) -> ConsensusResult<BlockStream> {
1286            unimplemented!("subscribe_blocks not implemented in mock")
1287        }
1288
1289        async fn fetch_blocks(
1290            &self,
1291            peer: AuthorityIndex,
1292            block_refs: Vec<BlockRef>,
1293            _highest_accepted_rounds: Vec<Round>,
1294            _breadth_first: bool,
1295            _timeout: Duration,
1296        ) -> ConsensusResult<Vec<Bytes>> {
1297            let mut lock = self.fetch_blocks_requests.lock().await;
1298            let response = lock.remove(&(block_refs.clone(), peer)).unwrap_or_else(|| {
1299                panic!(
1300                    "Unexpected fetch blocks request made: {:?} {}. Current lock: {:?}",
1301                    block_refs, peer, lock
1302                );
1303            });
1304
1305            let serialised = response
1306                .0
1307                .into_iter()
1308                .map(|block| block.serialized().clone())
1309                .collect::<Vec<_>>();
1310
1311            drop(lock);
1312
1313            if let Some(latency) = response.1 {
1314                sleep(latency).await;
1315            }
1316
1317            Ok(serialised)
1318        }
1319
1320        async fn fetch_commits(
1321            &self,
1322            _peer: AuthorityIndex,
1323            _commit_range: crate::commit::CommitRange,
1324            _timeout: Duration,
1325        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1326            unimplemented!("fetch_commits not implemented in mock")
1327        }
1328
1329        async fn fetch_latest_blocks(
1330            &self,
1331            peer: AuthorityIndex,
1332            authorities: Vec<AuthorityIndex>,
1333            _timeout: Duration,
1334        ) -> ConsensusResult<Vec<Bytes>> {
1335            let mut lock = self.fetch_latest_blocks_requests.lock().await;
1336            let mut responses = lock
1337                .remove(&(peer, authorities.clone()))
1338                .expect("Unexpected fetch blocks request made");
1339
1340            let response = responses.remove(0);
1341            let serialised = response
1342                .0
1343                .into_iter()
1344                .map(|block| block.serialized().clone())
1345                .collect::<Vec<_>>();
1346
1347            if !responses.is_empty() {
1348                lock.insert((peer, authorities), responses);
1349            }
1350
1351            drop(lock);
1352
1353            if let Some(latency) = response.1 {
1354                sleep(latency).await;
1355            }
1356
1357            Ok(serialised)
1358        }
1359
1360        async fn get_latest_rounds(
1361            &self,
1362            _peer: AuthorityIndex,
1363            _timeout: Duration,
1364        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1365            unimplemented!("get_latest_rounds not implemented in mock")
1366        }
1367
1368        #[cfg(test)]
1369        async fn send_block(
1370            &self,
1371            _peer: AuthorityIndex,
1372            _block: &VerifiedBlock,
1373            _timeout: Duration,
1374        ) -> ConsensusResult<()> {
1375            unimplemented!("send_block not implemented in mock")
1376        }
1377    }
1378
1379    #[async_trait]
1380    impl ObserverNetworkClient for MockNetworkClient {
1381        async fn stream_blocks(
1382            &self,
1383            _peer: crate::network::PeerId,
1384            _request_stream: crate::network::BlockRequestStream,
1385            _timeout: Duration,
1386        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1387            unimplemented!("stream_blocks not implemented in mock")
1388        }
1389
1390        async fn fetch_blocks(
1391            &self,
1392            _peer: crate::network::PeerId,
1393            _block_refs: Vec<BlockRef>,
1394            _timeout: Duration,
1395        ) -> ConsensusResult<Vec<Bytes>> {
1396            unimplemented!("Observer fetch_blocks not implemented in mock")
1397        }
1398
1399        async fn fetch_commits(
1400            &self,
1401            _peer: crate::network::PeerId,
1402            _commit_range: crate::commit::CommitRange,
1403            _timeout: Duration,
1404        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1405            unimplemented!("Observer fetch_commits not implemented in mock")
1406        }
1407    }
1408
1409    #[test]
1410    fn test_inflight_blocks_map() {
1411        // GIVEN
1412        let map = InflightBlocksMap::new();
1413        let some_block_refs = [
1414            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1415            BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
1416            BlockRef::new(12, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
1417            BlockRef::new(15, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
1418        ];
1419        let missing_block_refs = some_block_refs.iter().cloned().collect::<BTreeSet<_>>();
1420
1421        // Lock & unlock blocks
1422        {
1423            let mut all_guards = Vec::new();
1424
1425            // Try to acquire the block locks for authorities 1 & 2
1426            for i in 1..=2 {
1427                let authority = AuthorityIndex::new_for_test(i);
1428
1429                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1430                let guard = guard.expect("Guard should be created");
1431                assert_eq!(guard.block_refs.len(), 4);
1432
1433                all_guards.push(guard);
1434
1435                // trying to acquire any of them again will not succeed
1436                let guard = map.lock_blocks(missing_block_refs.clone(), authority);
1437                assert!(guard.is_none());
1438            }
1439
1440            // Trying to acquire for authority 3 it will fail - as we have maxed out the number of allowed peers
1441            let authority_3 = AuthorityIndex::new_for_test(3);
1442
1443            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1444            assert!(guard.is_none());
1445
1446            // Explicitly drop the guard of authority 1 and try for authority 3 again - it will now succeed
1447            drop(all_guards.remove(0));
1448
1449            let guard = map.lock_blocks(missing_block_refs.clone(), authority_3);
1450            let guard = guard.expect("Guard should be successfully acquired");
1451
1452            assert_eq!(guard.block_refs, missing_block_refs);
1453
1454            // Dropping all guards should unlock on the block refs
1455            drop(guard);
1456            drop(all_guards);
1457
1458            assert_eq!(map.num_of_locked_blocks(), 0);
1459        }
1460
1461        // Swap locks
1462        {
1463            // acquire a lock for authority 1
1464            let authority_1 = AuthorityIndex::new_for_test(1);
1465            let guard = map
1466                .lock_blocks(missing_block_refs.clone(), authority_1)
1467                .unwrap();
1468
1469            // Now swap the locks for authority 2
1470            let authority_2 = AuthorityIndex::new_for_test(2);
1471            let guard = map.swap_locks(guard, authority_2);
1472
1473            assert_eq!(guard.unwrap().block_refs, missing_block_refs);
1474        }
1475    }
1476
1477    #[tokio::test]
1478    async fn successful_fetch_blocks_from_peer() {
1479        // GIVEN
1480        let (context, _) = Context::new_for_test(4);
1481        let context = Arc::new(context);
1482        let block_verifier = Arc::new(NoopBlockVerifier {});
1483        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1484        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1485        let mock_client = Arc::new(MockNetworkClient::default());
1486        let (blocks_sender, _blocks_receiver) =
1487            monitored_mpsc::unbounded_channel("consensus_block_output");
1488        let store = Arc::new(MemStore::new());
1489        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1490        let transaction_certifier = TransactionCertifier::new(
1491            context.clone(),
1492            block_verifier.clone(),
1493            dag_state.clone(),
1494            blocks_sender,
1495        );
1496        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1497
1498        let network_client = Arc::new(SynchronizerClient::new(
1499            context.clone(),
1500            Some(mock_client.clone()),
1501            Some(mock_client.clone()),
1502        ));
1503        let handle = Synchronizer::start(
1504            network_client,
1505            context,
1506            core_dispatcher.clone(),
1507            commit_vote_monitor,
1508            block_verifier,
1509            transaction_certifier,
1510            round_tracker,
1511            dag_state,
1512            false,
1513        );
1514
1515        // Create some test blocks
1516        let expected_blocks = (0..10)
1517            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1518            .collect::<Vec<_>>();
1519        let missing_blocks = expected_blocks
1520            .iter()
1521            .map(|block| block.reference())
1522            .collect::<BTreeSet<_>>();
1523
1524        // AND stub the fetch_blocks request from peer 1
1525        let peer = AuthorityIndex::new_for_test(1);
1526        mock_client
1527            .stub_fetch_blocks(expected_blocks.clone(), peer, None)
1528            .await;
1529
1530        // WHEN request missing blocks from peer 1
1531        assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1532
1533        // Wait a little bit until those have been added in core
1534        sleep(Duration::from_millis(1_000)).await;
1535
1536        // THEN ensure those ended up in Core
1537        let added_blocks = core_dispatcher.get_add_blocks().await;
1538        assert_eq!(added_blocks, expected_blocks);
1539    }
1540
1541    #[tokio::test]
1542    async fn saturate_fetch_blocks_from_peer() {
1543        // GIVEN
1544        let (context, _) = Context::new_for_test(4);
1545        let context = Arc::new(context);
1546        let block_verifier = Arc::new(NoopBlockVerifier {});
1547        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1548        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1549        let mock_client = Arc::new(MockNetworkClient::default());
1550        let (blocks_sender, _blocks_receiver) =
1551            monitored_mpsc::unbounded_channel("consensus_block_output");
1552        let store = Arc::new(MemStore::new());
1553        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1554        let transaction_certifier = TransactionCertifier::new(
1555            context.clone(),
1556            block_verifier.clone(),
1557            dag_state.clone(),
1558            blocks_sender,
1559        );
1560        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1561
1562        let network_client = Arc::new(SynchronizerClient::new(
1563            context.clone(),
1564            Some(mock_client.clone()),
1565            Some(mock_client.clone()),
1566        ));
1567        let handle = Synchronizer::start(
1568            network_client,
1569            context,
1570            core_dispatcher.clone(),
1571            commit_vote_monitor,
1572            block_verifier,
1573            transaction_certifier,
1574            round_tracker,
1575            dag_state,
1576            false,
1577        );
1578
1579        // Create some test blocks
1580        let expected_blocks = (0..=2 * FETCH_BLOCKS_CONCURRENCY)
1581            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round as Round, 0).build()))
1582            .collect::<Vec<_>>();
1583
1584        // Now start sending requests to fetch blocks by trying to saturate peer 1 task
1585        let peer = AuthorityIndex::new_for_test(1);
1586        let mut iter = expected_blocks.iter().peekable();
1587        while let Some(block) = iter.next() {
1588            // stub the fetch_blocks request from peer 1 and give some high response latency so requests
1589            // can start blocking the peer task.
1590            mock_client
1591                .stub_fetch_blocks(
1592                    vec![block.clone()],
1593                    peer,
1594                    Some(Duration::from_millis(5_000)),
1595                )
1596                .await;
1597
1598            let mut missing_blocks = BTreeSet::new();
1599            missing_blocks.insert(block.reference());
1600
1601            // WHEN requesting to fetch the blocks, it should not succeed for the last request and get
1602            // an error with "saturated" synchronizer
1603            if iter.peek().is_none() {
1604                match handle.fetch_blocks(missing_blocks, peer).await {
1605                    Err(ConsensusError::SynchronizerSaturated(index)) => {
1606                        assert_eq!(index, peer);
1607                    }
1608                    _ => panic!("A saturated synchronizer error was expected"),
1609                }
1610            } else {
1611                assert!(handle.fetch_blocks(missing_blocks, peer).await.is_ok());
1612            }
1613        }
1614    }
1615
1616    #[tokio::test(flavor = "current_thread", start_paused = true)]
1617    async fn synchronizer_periodic_task_fetch_blocks() {
1618        // GIVEN
1619        let (context, _) = Context::new_for_test(4);
1620        let context = Arc::new(context);
1621        let block_verifier = Arc::new(NoopBlockVerifier {});
1622        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1623        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1624        let mock_client = Arc::new(MockNetworkClient::default());
1625        let (blocks_sender, _blocks_receiver) =
1626            monitored_mpsc::unbounded_channel("consensus_block_output");
1627        let store = Arc::new(MemStore::new());
1628        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1629        let transaction_certifier = TransactionCertifier::new(
1630            context.clone(),
1631            block_verifier.clone(),
1632            dag_state.clone(),
1633            blocks_sender,
1634        );
1635        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1636
1637        // Create some test blocks
1638        let expected_blocks = (0..10)
1639            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1640            .collect::<Vec<_>>();
1641        let missing_blocks = expected_blocks
1642            .iter()
1643            .map(|block| block.reference())
1644            .collect::<BTreeSet<_>>();
1645
1646        // AND stub the missing blocks
1647        core_dispatcher
1648            .stub_missing_blocks(missing_blocks.clone())
1649            .await;
1650
1651        // AND stub the requests for authority 1 & 2
1652        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1653        // we are skipped anyways.
1654        mock_client
1655            .stub_fetch_blocks(
1656                expected_blocks.clone(),
1657                AuthorityIndex::new_for_test(1),
1658                Some(FETCH_REQUEST_TIMEOUT),
1659            )
1660            .await;
1661        mock_client
1662            .stub_fetch_blocks(
1663                expected_blocks.clone(),
1664                AuthorityIndex::new_for_test(2),
1665                None,
1666            )
1667            .await;
1668
1669        // WHEN start the synchronizer and wait for a couple of seconds
1670        let network_client = Arc::new(SynchronizerClient::new(
1671            context.clone(),
1672            Some(mock_client.clone()),
1673            Some(mock_client.clone()),
1674        ));
1675        let _handle = Synchronizer::start(
1676            network_client,
1677            context,
1678            core_dispatcher.clone(),
1679            commit_vote_monitor,
1680            block_verifier,
1681            transaction_certifier,
1682            round_tracker,
1683            dag_state,
1684            false,
1685        );
1686
1687        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1688
1689        // THEN the missing blocks should now be fetched and added to core
1690        let added_blocks = core_dispatcher.get_add_blocks().await;
1691        assert_eq!(added_blocks, expected_blocks);
1692
1693        // AND missing blocks should have been consumed by the stub
1694        assert!(
1695            core_dispatcher
1696                .get_missing_blocks()
1697                .await
1698                .unwrap()
1699                .is_empty()
1700        );
1701    }
1702
1703    #[tokio::test(flavor = "current_thread", start_paused = true)]
1704    async fn synchronizer_periodic_task_when_commit_lagging_gets_disabled() {
1705        // GIVEN
1706        let (context, _) = Context::new_for_test(4);
1707        let context = Arc::new(context);
1708        let block_verifier = Arc::new(NoopBlockVerifier {});
1709        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1710        let mock_client = Arc::new(MockNetworkClient::default());
1711        let (blocks_sender, _blocks_receiver) =
1712            monitored_mpsc::unbounded_channel("consensus_block_output");
1713        let store = Arc::new(MemStore::new());
1714        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1715        let transaction_certifier = TransactionCertifier::new(
1716            context.clone(),
1717            block_verifier.clone(),
1718            dag_state.clone(),
1719            blocks_sender,
1720        );
1721        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1722        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1723
1724        // AND stub some missing blocks. The highest accepted round is 0. Create blocks that are above the sync threshold.
1725        let sync_missing_block_round_threshold = context.parameters.commit_sync_batch_size;
1726        let stub_blocks = (sync_missing_block_round_threshold * 2
1727            ..sync_missing_block_round_threshold * 3)
1728            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1729            .collect::<Vec<_>>();
1730        let missing_blocks = stub_blocks
1731            .iter()
1732            .map(|block| block.reference())
1733            .collect::<BTreeSet<_>>();
1734        core_dispatcher
1735            .stub_missing_blocks(missing_blocks.clone())
1736            .await;
1737
1738        // AND stub the requests for authority 1 & 2
1739        // Make the first authority timeout, so the second will be called. "We" are authority = 0, so
1740        // we are skipped anyways.
1741        let mut expected_blocks = stub_blocks
1742            .iter()
1743            .take(context.parameters.max_blocks_per_sync)
1744            .cloned()
1745            .collect::<Vec<_>>();
1746        mock_client
1747            .stub_fetch_blocks(
1748                expected_blocks.clone(),
1749                AuthorityIndex::new_for_test(1),
1750                Some(FETCH_REQUEST_TIMEOUT),
1751            )
1752            .await;
1753        mock_client
1754            .stub_fetch_blocks(
1755                expected_blocks.clone(),
1756                AuthorityIndex::new_for_test(2),
1757                None,
1758            )
1759            .await;
1760
1761        // Now create some blocks to simulate a commit lag
1762        let round = context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER * 2;
1763        let commit_index: CommitIndex = round - 1;
1764        let blocks = (0..4)
1765            .map(|authority| {
1766                let commit_votes = vec![CommitVote::new(commit_index, CommitDigest::MIN)];
1767                let block = TestBlock::new(round, authority)
1768                    .set_commit_votes(commit_votes)
1769                    .build();
1770
1771                VerifiedBlock::new_for_test(block)
1772            })
1773            .collect::<Vec<_>>();
1774
1775        // Pass them through the commit vote monitor - so now there will be a big commit lag to prevent
1776        // the scheduled synchronizer from running
1777        for block in blocks {
1778            commit_vote_monitor.observe_block(&block);
1779        }
1780
1781        // WHEN start the synchronizer and wait for a couple of seconds where normally the synchronizer should have kicked in.
1782        let network_client = Arc::new(SynchronizerClient::new(
1783            context.clone(),
1784            Some(mock_client.clone()),
1785            Some(mock_client.clone()),
1786        ));
1787        let _handle = Synchronizer::start(
1788            network_client,
1789            context.clone(),
1790            core_dispatcher.clone(),
1791            commit_vote_monitor.clone(),
1792            block_verifier,
1793            transaction_certifier,
1794            round_tracker,
1795            dag_state.clone(),
1796            false,
1797        );
1798
1799        sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1800
1801        // Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
1802        // sent to core for processing.
1803        let added_blocks = core_dispatcher.get_add_blocks().await;
1804        assert_eq!(added_blocks, vec![]);
1805
1806        // AND advance now the local commit index by adding a new commit that matches the commit index
1807        // of quorum
1808        {
1809            let mut d = dag_state.write();
1810            for index in 1..=commit_index {
1811                let commit =
1812                    TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1813
1814                d.add_commit(commit);
1815            }
1816
1817            assert_eq!(
1818                d.last_commit_index(),
1819                commit_vote_monitor.quorum_commit_index()
1820            );
1821        }
1822
1823        // Now stub again the missing blocks to fetch the exact same ones.
1824        core_dispatcher
1825            .stub_missing_blocks(missing_blocks.clone())
1826            .await;
1827
1828        sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1829
1830        // THEN the missing blocks should now be fetched and added to core
1831        let mut added_blocks = core_dispatcher.get_add_blocks().await;
1832
1833        added_blocks.sort_by_key(|block| block.reference());
1834        expected_blocks.sort_by_key(|block| block.reference());
1835
1836        assert_eq!(added_blocks, expected_blocks);
1837    }
1838
1839    #[tokio::test(flavor = "current_thread", start_paused = true)]
1840    async fn synchronizer_fetch_own_last_block() {
1841        // GIVEN
1842        let (context, _) = Context::new_for_test(4);
1843        let context = Arc::new(context.with_parameters(Parameters {
1844            sync_last_known_own_block_timeout: Duration::from_millis(2_000),
1845            ..Default::default()
1846        }));
1847        let block_verifier = Arc::new(NoopBlockVerifier {});
1848        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1849        let mock_client = Arc::new(MockNetworkClient::default());
1850        let (blocks_sender, _blocks_receiver) =
1851            monitored_mpsc::unbounded_channel("consensus_block_output");
1852        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1853        let store = Arc::new(MemStore::new());
1854        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1855        let transaction_certifier = TransactionCertifier::new(
1856            context.clone(),
1857            block_verifier.clone(),
1858            dag_state.clone(),
1859            blocks_sender,
1860        );
1861        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1862        let our_index = AuthorityIndex::new_for_test(0);
1863
1864        // Create some test blocks
1865        let mut expected_blocks = (9..=10)
1866            .map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 0).build()))
1867            .collect::<Vec<_>>();
1868
1869        // Now set different latest blocks for the peers
1870        // For peer 1 we give the block of round 10 (highest)
1871        let block_1 = expected_blocks.pop().unwrap();
1872        mock_client
1873            .stub_fetch_latest_blocks(
1874                vec![block_1.clone()],
1875                AuthorityIndex::new_for_test(1),
1876                vec![our_index],
1877                None,
1878            )
1879            .await;
1880        mock_client
1881            .stub_fetch_latest_blocks(
1882                vec![block_1],
1883                AuthorityIndex::new_for_test(1),
1884                vec![our_index],
1885                None,
1886            )
1887            .await;
1888
1889        // For peer 2 we give the block of round 9
1890        let block_2 = expected_blocks.pop().unwrap();
1891        mock_client
1892            .stub_fetch_latest_blocks(
1893                vec![block_2.clone()],
1894                AuthorityIndex::new_for_test(2),
1895                vec![our_index],
1896                Some(Duration::from_secs(10)),
1897            )
1898            .await;
1899        mock_client
1900            .stub_fetch_latest_blocks(
1901                vec![block_2],
1902                AuthorityIndex::new_for_test(2),
1903                vec![our_index],
1904                None,
1905            )
1906            .await;
1907
1908        // For peer 3 we don't give any block - and it should return an empty vector
1909        mock_client
1910            .stub_fetch_latest_blocks(
1911                vec![],
1912                AuthorityIndex::new_for_test(3),
1913                vec![our_index],
1914                Some(Duration::from_secs(10)),
1915            )
1916            .await;
1917        mock_client
1918            .stub_fetch_latest_blocks(
1919                vec![],
1920                AuthorityIndex::new_for_test(3),
1921                vec![our_index],
1922                None,
1923            )
1924            .await;
1925
1926        // WHEN start the synchronizer and wait for a couple of seconds
1927        let network_client = Arc::new(SynchronizerClient::new(
1928            context.clone(),
1929            Some(mock_client.clone()),
1930            Some(mock_client.clone()),
1931        ));
1932        let handle = Synchronizer::start(
1933            network_client,
1934            context.clone(),
1935            core_dispatcher.clone(),
1936            commit_vote_monitor,
1937            block_verifier,
1938            transaction_certifier,
1939            round_tracker,
1940            dag_state,
1941            true,
1942        );
1943
1944        // Wait at least for the timeout time
1945        sleep(context.parameters.sync_last_known_own_block_timeout * 2).await;
1946
1947        // Assert that core has been called to set the min propose round
1948        assert_eq!(
1949            core_dispatcher.get_last_own_proposed_round().await,
1950            vec![10]
1951        );
1952
1953        // Ensure that all the requests have been called
1954        assert_eq!(mock_client.fetch_latest_blocks_pending_calls().await, 0);
1955
1956        // And we got one retry
1957        assert_eq!(
1958            context
1959                .metrics
1960                .node_metrics
1961                .sync_last_known_own_block_retries
1962                .get(),
1963            1
1964        );
1965
1966        // Ensure that no panic occurred
1967        if let Err(err) = handle.stop().await
1968            && err.is_panic()
1969        {
1970            std::panic::resume_unwind(err.into_panic());
1971        }
1972    }
1973
1974    #[tokio::test]
1975    async fn test_process_fetched_blocks() {
1976        // GIVEN
1977        let (context, _) = Context::new_for_test(4);
1978        let context = Arc::new(context);
1979        let block_verifier = Arc::new(NoopBlockVerifier {});
1980        let core_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1981        let (blocks_sender, _blocks_receiver) =
1982            monitored_mpsc::unbounded_channel("consensus_block_output");
1983        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1984        let store = Arc::new(MemStore::new());
1985        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1986        let transaction_certifier = TransactionCertifier::new(
1987            context.clone(),
1988            block_verifier.clone(),
1989            dag_state.clone(),
1990            blocks_sender,
1991        );
1992        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1993        let (commands_sender, _commands_receiver) =
1994            monitored_mpsc::channel("consensus_synchronizer_commands", 1000);
1995
1996        // Create input test blocks:
1997        // - Authority 0 block at round 60.
1998        // - Authority 1 blocks from round 30 to 60.
1999        let mut expected_blocks = vec![VerifiedBlock::new_for_test(TestBlock::new(60, 0).build())];
2000        expected_blocks.extend(
2001            (30..=60).map(|round| VerifiedBlock::new_for_test(TestBlock::new(round, 1).build())),
2002        );
2003        assert_eq!(
2004            expected_blocks.len(),
2005            context.parameters.max_blocks_per_sync
2006        );
2007
2008        let expected_serialized_blocks = expected_blocks
2009            .iter()
2010            .map(|b| b.serialized().clone())
2011            .collect::<Vec<_>>();
2012
2013        let expected_block_refs = expected_blocks
2014            .iter()
2015            .map(|b| b.reference())
2016            .collect::<BTreeSet<_>>();
2017
2018        // GIVEN peer to fetch blocks from
2019        let peer_index = AuthorityIndex::new_for_test(2);
2020
2021        // Create blocks_guard
2022        let inflight_blocks_map = InflightBlocksMap::new();
2023        let blocks_guard = inflight_blocks_map
2024            .lock_blocks(expected_block_refs.clone(), peer_index)
2025            .expect("Failed to lock blocks");
2026
2027        assert_eq!(
2028            inflight_blocks_map.num_of_locked_blocks(),
2029            expected_block_refs.len()
2030        );
2031
2032        // Create a Synchronizer
2033        let result = Synchronizer::<
2034            NoopBlockVerifier,
2035            MockCoreThreadDispatcher,
2036            MockNetworkClient,
2037            MockNetworkClient,
2038        >::process_fetched_blocks(
2039            expected_serialized_blocks,
2040            peer_index,
2041            blocks_guard, // The guard is consumed here
2042            core_dispatcher.clone(),
2043            block_verifier,
2044            transaction_certifier,
2045            commit_vote_monitor,
2046            context.clone(),
2047            commands_sender,
2048            round_tracker,
2049            "test",
2050        )
2051        .await;
2052
2053        // THEN
2054        assert!(result.is_ok());
2055
2056        // Check blocks were sent to core
2057        let added_blocks = core_dispatcher.get_add_blocks().await;
2058        assert_eq!(
2059            added_blocks
2060                .iter()
2061                .map(|b| b.reference())
2062                .collect::<BTreeSet<_>>(),
2063            expected_block_refs,
2064        );
2065
2066        // Check blocks were unlocked
2067        assert_eq!(inflight_blocks_map.num_of_locked_blocks(), 0);
2068    }
2069}