consensus_core/
commit_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CommitSyncer implements efficient synchronization of committed data.
5//!
6//! During the operation of a committee of authorities for consensus, one or more authorities
7//! can fall behind the quorum in their received and accepted blocks. This can happen due to
8//! network disruptions, host crash, or other reasons. Authorities fell behind need to catch up to
9//! the quorum to be able to vote on the latest leaders. So efficient synchronization is necessary
10//! to minimize the impact of temporary disruptions and maintain smooth operations of the network.
11//!  
12//! CommitSyncer achieves efficient synchronization by relying on the following: when blocks
13//! are included in commits with >= 2f+1 certifiers by stake, these blocks must have passed
14//! verifications on some honest validators, so re-verifying them is unnecessary. In fact, the
15//! quorum certified commits themselves can be trusted to be sent to Sui directly, but for
16//! simplicity this is not done. Blocks from trusted commits still go through Core and committer.
17//!
18//! Another way CommitSyncer improves the efficiency of synchronization is parallel fetching:
19//! commits have a simple dependency graph (linear), so it is easy to fetch ranges of commits
20//! in parallel.
21//!
22//! Commit synchronization is an expensive operation, involving transferring large amount of data via
23//! the network. And it is not on the critical path of block processing. So the heuristics for
24//! synchronization, including triggers and retries, should be chosen to favor throughput and
25//! efficient resource usage, over faster reactions.
26
27use std::{
28    collections::{BTreeMap, BTreeSet},
29    sync::Arc,
30    time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_types::block::BlockRef;
35use futures::{StreamExt as _, stream::FuturesOrdered};
36use itertools::Itertools as _;
37use mysten_common::ZipDebugEqIteratorExt;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42    runtime::Handle,
43    sync::oneshot,
44    task::{JoinHandle, JoinSet},
45    time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50    CommitConsumerMonitor, CommitIndex,
51    block::{BlockAPI, ExtendedBlock, SignedBlock, VerifiedBlock},
52    block_verifier::BlockVerifier,
53    commit::{
54        CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55        CommitRef, TrustedCommit,
56    },
57    commit_vote_monitor::CommitVoteMonitor,
58    context::Context,
59    core_thread::CoreThreadDispatcher,
60    dag_state::DagState,
61    error::{ConsensusError, ConsensusResult},
62    network::{CommitSyncerClient, ObserverNetworkClient, PeerId, ValidatorNetworkClient},
63    peers_pool::PeersPool,
64    round_tracker::RoundTracker,
65    stake_aggregator::{QuorumThreshold, StakeAggregator},
66    transaction_vote_tracker::TransactionVoteTracker,
67};
68
69// Handle to stop the CommitSyncer loop.
70pub(crate) struct CommitSyncerHandle {
71    schedule_task: JoinHandle<()>,
72    tx_shutdown: oneshot::Sender<()>,
73}
74
75impl CommitSyncerHandle {
76    pub(crate) async fn stop(self) {
77        let _ = self.tx_shutdown.send(());
78        // Do not abort schedule task, which waits for fetches to shut down.
79        if let Err(e) = self.schedule_task.await
80            && e.is_panic()
81        {
82            std::panic::resume_unwind(e.into_panic());
83        }
84    }
85}
86
87pub(crate) struct CommitSyncer<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
88    // States shared by scheduler and fetch tasks.
89
90    // Shared components wrapper.
91    inner: Arc<Inner<VC, OC>>,
92
93    // States only used by the scheduler.
94
95    // Inflight requests to fetch commits from different authorities.
96    inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
97    // Additional ranges of commits to fetch.
98    pending_fetches: BTreeSet<CommitRange>,
99    // Fetched commits and blocks by commit range.
100    fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
101    // Highest commit index among inflight and pending fetches.
102    // Used to determine the start of new ranges to be fetched.
103    highest_scheduled_index: Option<CommitIndex>,
104    // Highest index among fetched commits, after commits and blocks are verified.
105    // Used for metrics.
106    highest_fetched_commit_index: CommitIndex,
107    // The commit index that is the max of highest local commit index and commit index inflight to Core.
108    // Used to determine if fetched blocks can be sent to Core without gaps.
109    synced_commit_index: CommitIndex,
110}
111
112impl<VC, OC> CommitSyncer<VC, OC>
113where
114    VC: ValidatorNetworkClient,
115    OC: ObserverNetworkClient,
116{
117    pub(crate) fn new(
118        context: Arc<Context>,
119        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
120        commit_vote_monitor: Arc<CommitVoteMonitor>,
121        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
122        block_verifier: Arc<dyn BlockVerifier>,
123        transaction_vote_tracker: TransactionVoteTracker,
124        round_tracker: Arc<RwLock<RoundTracker>>,
125        network_client: Arc<CommitSyncerClient<VC, OC>>,
126        dag_state: Arc<RwLock<DagState>>,
127        peers_pool: Arc<PeersPool>,
128    ) -> Self {
129        let inner = Arc::new(Inner {
130            context,
131            core_thread_dispatcher,
132            commit_vote_monitor,
133            commit_consumer_monitor,
134            block_verifier,
135            transaction_vote_tracker,
136            round_tracker,
137            network_client,
138            dag_state,
139            peers_pool,
140        });
141        let synced_commit_index = inner.dag_state.read().last_commit_index();
142        CommitSyncer {
143            inner,
144            inflight_fetches: JoinSet::new(),
145            pending_fetches: BTreeSet::new(),
146            fetched_ranges: BTreeMap::new(),
147            highest_scheduled_index: None,
148            highest_fetched_commit_index: 0,
149            synced_commit_index,
150        }
151    }
152
153    pub(crate) fn start(self) -> CommitSyncerHandle {
154        let (tx_shutdown, rx_shutdown) = oneshot::channel();
155        let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
156        CommitSyncerHandle {
157            schedule_task,
158            tx_shutdown,
159        }
160    }
161
162    async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
163        let mut interval = tokio::time::interval(Duration::from_secs(2));
164        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
165
166        loop {
167            tokio::select! {
168                // Periodically, schedule new fetches if the node is falling behind.
169                _ = interval.tick() => {
170                    self.try_schedule_once();
171                }
172                // Handles results from fetch tasks.
173                Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
174                    if let Err(e) = result {
175                        if e.is_panic() {
176                            std::panic::resume_unwind(e.into_panic());
177                        }
178                        warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
179                        // If any fetch is cancelled or panicked, try to shutdown and exit the loop.
180                        self.inflight_fetches.shutdown().await;
181                        return;
182                    }
183                    let (target_end, commits) = result.unwrap();
184                    self.handle_fetch_result(target_end, commits).await;
185                }
186                _ = &mut rx_shutdown => {
187                    // Shutdown requested.
188                    info!("CommitSyncer shutting down ...");
189                    self.inflight_fetches.shutdown().await;
190                    return;
191                }
192            }
193
194            self.try_start_fetches();
195        }
196    }
197
198    fn try_schedule_once(&mut self) {
199        let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
200        let local_commit_index = self.inner.dag_state.read().last_commit_index();
201        let metrics = &self.inner.context.metrics.node_metrics;
202        metrics
203            .commit_sync_quorum_index
204            .set(quorum_commit_index as i64);
205        metrics
206            .commit_sync_local_index
207            .set(local_commit_index as i64);
208        let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
209        let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
210        // Update synced_commit_index periodically to make sure it is no smaller than
211        // local commit index.
212        self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
213        let unhandled_commits_threshold = self.unhandled_commits_threshold();
214        info!(
215            "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
216            self.synced_commit_index,
217            highest_handled_index,
218            highest_scheduled_index,
219            quorum_commit_index,
220            unhandled_commits_threshold,
221        );
222
223        // TODO: cleanup inflight fetches that are no longer needed.
224        let fetch_after_index = self
225            .synced_commit_index
226            .max(self.highest_scheduled_index.unwrap_or(0));
227        // When the node is falling behind, schedule pending fetches which will be executed on later.
228        for prev_end in (fetch_after_index..=quorum_commit_index)
229            .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
230        {
231            // Create range with inclusive start and end.
232            let range_start = prev_end + 1;
233            let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
234            // Commit range is not fetched when [range_start, range_end] contains less number of commits
235            // than the target batch size. This is to avoid the cost of processing more and smaller batches.
236            // Block broadcast, subscription and synchronization will help the node catchup.
237            if quorum_commit_index < range_end {
238                break;
239            }
240            // Pause scheduling new fetches when handling of commits is lagging.
241            if highest_handled_index + unhandled_commits_threshold < range_end {
242                warn!(
243                    "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
244                    highest_handled_index, highest_scheduled_index
245                );
246                break;
247            }
248            self.pending_fetches
249                .insert((range_start..=range_end).into());
250            // quorum_commit_index should be non-decreasing, so highest_scheduled_index should not
251            // decrease either.
252            self.highest_scheduled_index = Some(range_end);
253        }
254    }
255
256    async fn handle_fetch_result(
257        &mut self,
258        target_end: CommitIndex,
259        certified_commits: CertifiedCommits,
260    ) {
261        assert!(!certified_commits.commits().is_empty());
262
263        let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
264            .commits()
265            .iter()
266            .fold((0, 0), |(blocks, bytes), c| {
267                (
268                    blocks + c.blocks().len(),
269                    bytes
270                        + c.blocks()
271                            .iter()
272                            .map(|b| b.serialized().len())
273                            .sum::<usize>() as u64,
274                )
275            });
276
277        let metrics = &self.inner.context.metrics.node_metrics;
278        metrics
279            .commit_sync_fetched_commits
280            .inc_by(certified_commits.commits().len() as u64);
281        metrics
282            .commit_sync_fetched_blocks
283            .inc_by(total_blocks_fetched as u64);
284        metrics
285            .commit_sync_total_fetched_blocks_size
286            .inc_by(total_blocks_size_bytes);
287
288        let (commit_start, commit_end) = (
289            certified_commits.commits().first().unwrap().index(),
290            certified_commits.commits().last().unwrap().index(),
291        );
292        self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
293        metrics
294            .commit_sync_highest_fetched_index
295            .set(self.highest_fetched_commit_index as i64);
296
297        // Allow returning partial results, and try fetching the rest separately.
298        if commit_end < target_end {
299            self.pending_fetches
300                .insert((commit_end + 1..=target_end).into());
301        }
302        // Make sure synced_commit_index is up to date.
303        self.synced_commit_index = self
304            .synced_commit_index
305            .max(self.inner.dag_state.read().last_commit_index());
306        // Only add new blocks if at least some of them are not already synced.
307        if self.synced_commit_index < commit_end {
308            self.fetched_ranges
309                .insert((commit_start..=commit_end).into(), certified_commits);
310        }
311        // Try to process as many fetched blocks as possible.
312        while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
313            // Only pop fetched_ranges if there is no gap with blocks already synced.
314            // Note: start, end and synced_commit_index are all inclusive.
315            let (fetched_commit_range, commits) =
316                if fetched_commit_range.start() <= self.synced_commit_index + 1 {
317                    self.fetched_ranges.pop_first().unwrap()
318                } else {
319                    // Found gap between earliest fetched block and latest synced block,
320                    // so not sending additional blocks to Core.
321                    metrics.commit_sync_gap_on_processing.inc();
322                    break;
323                };
324            // Avoid sending to Core a whole batch of already synced blocks.
325            if fetched_commit_range.end() <= self.synced_commit_index {
326                continue;
327            }
328
329            debug!(
330                "Fetched blocks for commit range {:?}: {}",
331                fetched_commit_range,
332                commits
333                    .commits()
334                    .iter()
335                    .flat_map(|c| c.blocks())
336                    .map(|b| b.reference().to_string())
337                    .join(","),
338            );
339
340            // If core thread cannot handle the incoming blocks, it is ok to block here
341            // to slow down the commit syncer.
342            match self
343                .inner
344                .core_thread_dispatcher
345                .add_certified_commits(commits)
346                .await
347            {
348                // Missing ancestors are possible from certification blocks, but
349                // it is unnecessary to try to sync their causal history. If they are required
350                // for the progress of the DAG, they will be included in a future commit.
351                Ok(missing) => {
352                    if !missing.is_empty() {
353                        info!(
354                            "Certification blocks have missing ancestors: {} for commit range {:?}",
355                            missing.iter().map(|b| b.to_string()).join(","),
356                            fetched_commit_range,
357                        );
358                    }
359                    for block_ref in missing {
360                        let hostname = &self
361                            .inner
362                            .context
363                            .committee
364                            .authority(block_ref.author)
365                            .hostname;
366                        metrics
367                            .commit_sync_fetch_missing_blocks
368                            .with_label_values(&[hostname])
369                            .inc();
370                    }
371                }
372                Err(e) => {
373                    info!("Failed to add blocks, shutting down: {}", e);
374                    return;
375                }
376            };
377
378            // Once commits and blocks are sent to Core, ratchet up synced_commit_index
379            self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
380        }
381
382        metrics
383            .commit_sync_inflight_fetches
384            .set(self.inflight_fetches.len() as i64);
385        metrics
386            .commit_sync_pending_fetches
387            .set(self.pending_fetches.len() as i64);
388        metrics
389            .commit_sync_highest_synced_index
390            .set(self.synced_commit_index as i64);
391    }
392
393    fn try_start_fetches(&mut self) {
394        // Cap parallel fetches based on configured limit and known peers, to avoid overloading the network.
395        // Also when there are too many fetched blocks that cannot be sent to Core before an earlier fetch
396        // has not finished, reduce parallelism so the earlier fetch can retry on a better host and succeed.
397        // For validators, use committee size for the calculation. For observers, don't apply the 2/3 limit.
398        let known_peers_count = self.inner.peers_pool.get_known_peers().len();
399        let target_parallel_fetches = if self.inner.context.is_validator() {
400            self.inner
401                .context
402                .parameters
403                .commit_sync_parallel_fetches
404                .min(known_peers_count * 2 / 3)
405                .min(
406                    self.inner
407                        .context
408                        .parameters
409                        .commit_sync_batches_ahead
410                        .saturating_sub(self.fetched_ranges.len()),
411                )
412                .max(1)
413        } else {
414            // For observers, currently the node is probably connected only to another peer. In the future probably more observer peers might be available
415            // to sync from. That's why we do not cap the number of parallel fetches by the number of known peers.
416            self.inner
417                .context
418                .parameters
419                .commit_sync_parallel_fetches
420                .min(
421                    self.inner
422                        .context
423                        .parameters
424                        .commit_sync_batches_ahead
425                        .saturating_sub(self.fetched_ranges.len()),
426                )
427                .max(1)
428        };
429        // Start new fetches if there are pending batches and available slots.
430        loop {
431            if self.inflight_fetches.len() >= target_parallel_fetches {
432                break;
433            }
434            let Some(commit_range) = self.pending_fetches.pop_first() else {
435                break;
436            };
437            self.inflight_fetches
438                .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
439        }
440
441        let metrics = &self.inner.context.metrics.node_metrics;
442        metrics
443            .commit_sync_inflight_fetches
444            .set(self.inflight_fetches.len() as i64);
445        metrics
446            .commit_sync_pending_fetches
447            .set(self.pending_fetches.len() as i64);
448        metrics
449            .commit_sync_highest_synced_index
450            .set(self.synced_commit_index as i64);
451    }
452
453    // Retries fetching commits and blocks from available authorities, until a request succeeds
454    // where at least a prefix of the commit range is fetched.
455    // Returns the fetched commits and blocks referenced by the commits.
456    async fn fetch_loop(
457        inner: Arc<Inner<VC, OC>>,
458        commit_range: CommitRange,
459    ) -> (CommitIndex, CertifiedCommits) {
460        let base_timeout = inner.context.parameters.commit_sync_request_timeout;
461        // Max per-request timeout will be base timeout times a multiplier.
462        // At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks.
463        const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
464        // timeout * max number of targets should be reasonably small, so the
465        // system can adjust to slow network or large data sizes quickly.
466        const MAX_NUM_TARGETS: usize = 24;
467        let mut timeout_multiplier = 0;
468        let _timer = inner
469            .context
470            .metrics
471            .node_metrics
472            .commit_sync_fetch_loop_latency
473            .start_timer();
474        info!("Starting to fetch commits in {commit_range:?} ...",);
475        loop {
476            // Attempt to fetch commits and blocks through min(available peers count, MAX_NUM_TARGETS) peers.
477            let mut target_peers = inner.peers_pool.get_known_peers();
478            target_peers.shuffle(&mut ThreadRng::default());
479            target_peers.truncate(MAX_NUM_TARGETS);
480            // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
481            timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
482            let request_timeout = base_timeout * timeout_multiplier;
483            // Give enough overall timeout for fetching commits and blocks.
484            // - Timeout for fetching commits and commit certifying blocks.
485            // - Timeout for fetching blocks referenced by the commits.
486            // - Time spent on pipelining requests to fetch blocks.
487            // - Another headroom to allow fetch_once() to timeout gracefully if possible.
488            let fetch_timeout = request_timeout * 4;
489            // Try fetching from selected target peers.
490            for peer in target_peers {
491                match tokio::time::timeout(
492                    fetch_timeout,
493                    Self::fetch_once(
494                        inner.clone(),
495                        peer.clone(),
496                        commit_range.clone(),
497                        request_timeout,
498                    ),
499                )
500                .await
501                {
502                    Ok(Ok(commits)) => {
503                        info!("Finished fetching commits in {commit_range:?}",);
504                        return (commit_range.end(), commits);
505                    }
506                    Ok(Err(e)) => {
507                        warn!(
508                            "Failed to fetch {commit_range:?} from {}: {}",
509                            peer.hostname(&inner.context),
510                            e
511                        );
512                        inner
513                            .context
514                            .metrics
515                            .node_metrics
516                            .commit_sync_fetch_once_errors
517                            .with_label_values(&[peer.labelname(&inner.context).as_str(), e.name()])
518                            .inc();
519                    }
520                    Err(_) => {
521                        warn!(
522                            "Timed out fetching {commit_range:?} from {}",
523                            peer.hostname(&inner.context)
524                        );
525                        inner
526                            .context
527                            .metrics
528                            .node_metrics
529                            .commit_sync_fetch_once_errors
530                            .with_label_values(&[
531                                peer.labelname(&inner.context).as_str(),
532                                "FetchTimeout",
533                            ])
534                            .inc();
535                    }
536                }
537            }
538            // Avoid busy looping, by waiting for a while before retrying.
539            sleep(base_timeout).await;
540        }
541    }
542
543    // Fetches commits and blocks from a single peer. At a high level, first the commits are
544    // fetched and verified. After that, blocks referenced in the certified commits are fetched
545    // and sent to Core for processing.
546    async fn fetch_once(
547        inner: Arc<Inner<VC, OC>>,
548        target_peer: PeerId,
549        commit_range: CommitRange,
550        timeout: Duration,
551    ) -> ConsensusResult<CertifiedCommits> {
552        let _timer = inner
553            .context
554            .metrics
555            .node_metrics
556            .commit_sync_fetch_once_latency
557            .start_timer();
558
559        // 0. Probe the target to check reachability before committing to the full fetch.
560        // This skips unreachable and slow peers quickly.
561        let probe_timeout = inner.context.parameters.commit_sync_probe_timeout;
562        inner
563            .network_client
564            .probe_connectivity(target_peer.clone(), probe_timeout)
565            .await?;
566
567        // 1. Fetch commits in the commit range from the target authority.
568        let (serialized_commits, serialized_blocks) = inner
569            .network_client
570            .fetch_commits(target_peer.clone(), commit_range.clone(), timeout)
571            .await?;
572
573        // 2. Verify the response contains blocks that can certify the last returned commit,
574        // and the returned commits are chained by digests, so earlier commits are certified
575        // as well.
576        let (commits, vote_blocks) = Handle::current()
577            .spawn_blocking({
578                let inner = inner.clone();
579                let peer = target_peer.clone();
580                move || {
581                    inner.verify_commits(peer, commit_range, serialized_commits, serialized_blocks)
582                }
583            })
584            .await
585            .expect("Spawn blocking should not fail")?;
586
587        // 3. Fetch blocks referenced by the commits, from the same peer where commits are fetched.
588        let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
589        block_refs.sort();
590        let num_chunks = block_refs
591            .len()
592            .div_ceil(inner.context.parameters.max_blocks_per_fetch)
593            as u32;
594        let mut requests: FuturesOrdered<_> = block_refs
595            .chunks(inner.context.parameters.max_blocks_per_fetch)
596            .enumerate()
597            .map(|(i, request_block_refs)| {
598                let inner = inner.clone();
599                let peer = target_peer.clone();
600                async move {
601                    // 4. Send out pipelined fetch requests to avoid overloading the target authority.
602                    sleep(timeout * i as u32 / num_chunks).await;
603                    // TODO: add some retries.
604                    let serialized_blocks = inner
605                        .network_client
606                        .fetch_blocks(
607                            peer.clone(),
608                            request_block_refs.to_vec(),
609                            vec![],
610                            false,
611                            timeout,
612                        )
613                        .await?;
614                    // 5. Verify the same number of blocks are returned as requested.
615                    if request_block_refs.len() != serialized_blocks.len() {
616                        return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
617                            peer,
618                            requested: request_block_refs.len(),
619                            received: serialized_blocks.len(),
620                        });
621                    }
622                    // 6. Verify returned blocks have valid formats.
623                    let signed_blocks = serialized_blocks
624                        .iter()
625                        .map(|serialized| {
626                            let block: SignedBlock = bcs::from_bytes(serialized)
627                                .map_err(ConsensusError::MalformedBlock)?;
628                            Ok(block)
629                        })
630                        .collect::<ConsensusResult<Vec<_>>>()?;
631                    // 7. Verify the returned blocks match the requested block refs.
632                    // If they do match, the returned blocks can be considered verified as well.
633                    let mut blocks = Vec::new();
634                    for ((requested_block_ref, signed_block), serialized) in request_block_refs
635                        .iter()
636                        .zip_debug_eq(signed_blocks.into_iter())
637                        .zip_debug_eq(serialized_blocks.into_iter())
638                    {
639                        let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
640                        let received_block_ref = BlockRef::new(
641                            signed_block.round(),
642                            signed_block.author(),
643                            signed_block_digest,
644                        );
645                        if *requested_block_ref != received_block_ref {
646                            return Err(ConsensusError::UnexpectedBlockForCommit {
647                                peer,
648                                requested: *requested_block_ref,
649                                received: received_block_ref,
650                            });
651                        }
652                        blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
653                    }
654                    Ok(blocks)
655                }
656            })
657            .collect();
658
659        let mut fetched_blocks = BTreeMap::new();
660        while let Some(result) = requests.next().await {
661            for block in result? {
662                fetched_blocks.insert(block.reference(), block);
663            }
664        }
665
666        // 8. Check if the block timestamps are lower than current time - this is for metrics only.
667        for block in fetched_blocks.values().chain(vote_blocks.iter()) {
668            let now_ms = inner.context.clock.timestamp_utc_ms();
669            let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
670            if forward_drift == 0 {
671                continue;
672            };
673            // Extract hostname based on peer type
674            inner
675                .context
676                .metrics
677                .node_metrics
678                .block_timestamp_drift_ms
679                .with_label_values(&[
680                    target_peer.labelname(&inner.context).as_str(),
681                    "commit_syncer",
682                ])
683                .inc_by(forward_drift);
684        }
685
686        // 9. Now create certified commits by assigning the blocks to each commit.
687        let mut certified_commits = Vec::new();
688        for commit in &commits {
689            let blocks = commit
690                .blocks()
691                .iter()
692                .map(|block_ref| {
693                    fetched_blocks
694                        .remove(block_ref)
695                        .expect("Block should exist")
696                })
697                .collect::<Vec<_>>();
698            certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
699        }
700
701        // 10. Add blocks in certified commits to the transaction vote tracker.
702        for commit in &certified_commits {
703            for block in commit.blocks() {
704                // Only account for reject votes in the block, since they may vote on uncommitted
705                // blocks or transactions. It is unnecessary to vote on the committed blocks
706                // themselves.
707                if inner.context.protocol_config.transaction_voting_enabled() {
708                    inner
709                        .transaction_vote_tracker
710                        .add_voted_blocks(vec![(block.clone(), vec![])]);
711                }
712            }
713        }
714
715        // 11. Record commit votes from the fetched blocks.
716        for commit in &certified_commits {
717            for block in commit.blocks() {
718                inner.commit_vote_monitor.observe_block(block);
719            }
720        }
721        for block in &vote_blocks {
722            inner.commit_vote_monitor.observe_block(block);
723        }
724
725        // 12. Update round tracker from the fetched blocks. For fetched blocks,
726        // excluded_ancestors are not available so we use an empty vector.
727        {
728            let mut tracker = inner.round_tracker.write();
729            // Update from commit blocks
730            for commit in &certified_commits {
731                for block in commit.blocks() {
732                    tracker.update_from_verified_block(&ExtendedBlock {
733                        block: block.clone(),
734                        excluded_ancestors: vec![],
735                    });
736                }
737            }
738            // Update from vote blocks
739            for block in &vote_blocks {
740                tracker.update_from_verified_block(&ExtendedBlock {
741                    block: block.clone(),
742                    excluded_ancestors: vec![],
743                });
744            }
745        }
746
747        Ok(CertifiedCommits::new(certified_commits, vote_blocks))
748    }
749
750    fn unhandled_commits_threshold(&self) -> CommitIndex {
751        self.inner.context.parameters.commit_sync_batch_size
752            * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
753    }
754
755    #[cfg(test)]
756    fn pending_fetches(&self) -> BTreeSet<CommitRange> {
757        self.pending_fetches.clone()
758    }
759
760    #[cfg(test)]
761    fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
762        self.fetched_ranges.clone()
763    }
764
765    #[cfg(test)]
766    fn highest_scheduled_index(&self) -> Option<CommitIndex> {
767        self.highest_scheduled_index
768    }
769
770    #[cfg(test)]
771    fn highest_fetched_commit_index(&self) -> CommitIndex {
772        self.highest_fetched_commit_index
773    }
774
775    #[cfg(test)]
776    fn synced_commit_index(&self) -> CommitIndex {
777        self.synced_commit_index
778    }
779}
780
781struct Inner<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
782    context: Arc<Context>,
783    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
784    commit_vote_monitor: Arc<CommitVoteMonitor>,
785    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
786    block_verifier: Arc<dyn BlockVerifier>,
787    transaction_vote_tracker: TransactionVoteTracker,
788    round_tracker: Arc<RwLock<RoundTracker>>,
789    network_client: Arc<CommitSyncerClient<VC, OC>>,
790    dag_state: Arc<RwLock<DagState>>,
791    peers_pool: Arc<PeersPool>,
792}
793
794impl<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> Inner<VC, OC> {
795    /// Verifies the commits and also certifies them using the provided vote blocks for the last commit. The
796    /// method returns the trusted commits and the votes as verified blocks.
797    fn verify_commits(
798        &self,
799        peer: PeerId,
800        commit_range: CommitRange,
801        serialized_commits: Vec<Bytes>,
802        serialized_vote_blocks: Vec<Bytes>,
803    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
804        // Parse and verify commits.
805        let mut commits = Vec::new();
806        for serialized in &serialized_commits {
807            let commit: Commit =
808                bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
809            let digest = TrustedCommit::compute_digest(serialized);
810            if commits.is_empty() {
811                // start is inclusive, so first commit must be at the start index.
812                if commit.index() != commit_range.start() {
813                    return Err(ConsensusError::UnexpectedStartCommit {
814                        peer,
815                        start: commit_range.start(),
816                        commit: Box::new(commit),
817                    });
818                }
819            } else {
820                // Verify next commit increments index and references the previous digest.
821                let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
822                    commits.last().unwrap();
823                if commit.index() != last_commit.index() + 1
824                    || &commit.previous_digest() != last_commit_digest
825                {
826                    return Err(ConsensusError::UnexpectedCommitSequence {
827                        peer,
828                        prev_commit: Box::new(last_commit.clone()),
829                        curr_commit: Box::new(commit),
830                    });
831                }
832            }
833            // Do not process more commits past the end index.
834            if commit.index() > commit_range.end() {
835                break;
836            }
837            commits.push((digest, commit));
838        }
839        let Some((end_commit_digest, end_commit)) = commits.last() else {
840            return Err(ConsensusError::NoCommitReceived { peer });
841        };
842
843        // Parse and verify blocks. Then accumulate votes on the end commit.
844        let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
845        let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
846        let mut vote_blocks = Vec::new();
847        for serialized in serialized_vote_blocks {
848            let block: SignedBlock =
849                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
850            // Only block signatures need to be verified, to verify commit votes.
851            // But the blocks will be sent to Core, so they need to be fully verified.
852            let (block, reject_transaction_votes) =
853                self.block_verifier.verify_and_vote(block, serialized)?;
854            if self.context.protocol_config.transaction_voting_enabled() {
855                self.transaction_vote_tracker
856                    .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
857            }
858            for vote in block.commit_votes() {
859                if *vote == end_commit_ref {
860                    stake_aggregator.add(block.author(), &self.context.committee);
861                }
862            }
863            vote_blocks.push(block);
864        }
865
866        // Check if the end commit has enough votes.
867        if !stake_aggregator.reached_threshold(&self.context.committee) {
868            return Err(ConsensusError::NotEnoughCommitVotes {
869                stake: stake_aggregator.stake(),
870                peer,
871                commit: Box::new(end_commit.clone()),
872            });
873        }
874
875        let trusted_commits = commits
876            .into_iter()
877            .zip_debug_eq(serialized_commits)
878            .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
879            .collect();
880        Ok((trusted_commits, vote_blocks))
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use std::{sync::Arc, time::Duration};
887
888    use bytes::Bytes;
889    use consensus_config::{AuthorityIndex, NetworkKeyPair, Parameters};
890    use consensus_types::block::{BlockRef, Round};
891    use mysten_common::ZipDebugEqIteratorExt;
892    use parking_lot::RwLock;
893
894    use crate::{
895        CommitConsumerMonitor, CommitDigest, CommitRef,
896        block::{TestBlock, VerifiedBlock},
897        block_verifier::NoopBlockVerifier,
898        commit::CommitRange,
899        commit_syncer::CommitSyncer,
900        commit_vote_monitor::CommitVoteMonitor,
901        context::Context,
902        core_thread::MockCoreThreadDispatcher,
903        dag_state::DagState,
904        error::ConsensusResult,
905        network::{BlockStream, CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
906        peers_pool::{PeerService, PeersPool},
907        round_tracker::RoundTracker,
908        storage::mem_store::MemStore,
909        transaction_vote_tracker::TransactionVoteTracker,
910    };
911
912    #[derive(Default)]
913    struct FakeNetworkClient {}
914
915    #[async_trait::async_trait]
916    impl ValidatorNetworkClient for FakeNetworkClient {
917        async fn subscribe_blocks(
918            &self,
919            _peer: AuthorityIndex,
920            _last_received: Round,
921            _timeout: Duration,
922        ) -> ConsensusResult<BlockStream> {
923            unimplemented!("Unimplemented")
924        }
925
926        async fn fetch_blocks(
927            &self,
928            _peer: AuthorityIndex,
929            _block_refs: Vec<BlockRef>,
930            _fetch_after_rounds: Vec<Round>,
931            _fetch_missing_ancestors: bool,
932            _timeout: Duration,
933        ) -> ConsensusResult<Vec<Bytes>> {
934            unimplemented!("Unimplemented")
935        }
936
937        async fn fetch_commits(
938            &self,
939            _peer: AuthorityIndex,
940            _commit_range: CommitRange,
941            _timeout: Duration,
942        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
943            unimplemented!("Unimplemented")
944        }
945
946        async fn fetch_latest_blocks(
947            &self,
948            _peer: AuthorityIndex,
949            _authorities: Vec<AuthorityIndex>,
950            _timeout: Duration,
951        ) -> ConsensusResult<Vec<Bytes>> {
952            unimplemented!("Unimplemented")
953        }
954
955        async fn get_latest_rounds(
956            &self,
957            _peer: AuthorityIndex,
958            _timeout: Duration,
959        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
960            unimplemented!("Unimplemented")
961        }
962
963        #[cfg(test)]
964        async fn send_block(
965            &self,
966            _peer: AuthorityIndex,
967            _block: &VerifiedBlock,
968            _timeout: Duration,
969        ) -> ConsensusResult<()> {
970            unimplemented!("Unimplemented")
971        }
972    }
973
974    #[async_trait::async_trait]
975    impl ObserverNetworkClient for FakeNetworkClient {
976        async fn stream_blocks(
977            &self,
978            _peer: crate::network::PeerId,
979            _highest_round_per_authority: Vec<u64>,
980            _timeout: Duration,
981        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
982            unimplemented!("Unimplemented")
983        }
984
985        async fn fetch_blocks(
986            &self,
987            _peer: crate::network::PeerId,
988            _block_refs: Vec<BlockRef>,
989            _highest_accepted_rounds: Vec<Round>,
990            _breadth_first: bool,
991            _timeout: Duration,
992        ) -> ConsensusResult<Vec<Bytes>> {
993            unimplemented!("Unimplemented")
994        }
995
996        async fn fetch_commits(
997            &self,
998            _peer: crate::network::PeerId,
999            _commit_range: CommitRange,
1000            _timeout: Duration,
1001        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1002            unimplemented!("Unimplemented")
1003        }
1004    }
1005
1006    #[tokio::test(flavor = "current_thread", start_paused = true)]
1007    async fn commit_syncer_observer_node_basic() {
1008        // Test basic Observer node behavior for commit syncing
1009        // An Observer node should be able to sync from both validator and observer peers
1010
1011        // Create an Observer node context (own_index = MAX)
1012        let (mut context, _) = Context::new_for_test(4);
1013        context.own_index = AuthorityIndex::MAX; // Mark this as an observer node
1014        context.parameters = Parameters {
1015            commit_sync_batch_size: 5,
1016            commit_sync_batches_ahead: 10,
1017            commit_sync_parallel_fetches: 5,
1018            max_blocks_per_fetch: 5,
1019            ..context.parameters
1020        };
1021        let context = Arc::new(context);
1022
1023        // Setup observer node commit syncer
1024        let block_verifier = Arc::new(NoopBlockVerifier {});
1025        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1026        let mock_client = Arc::new(FakeNetworkClient::default());
1027        let network_client = Arc::new(CommitSyncerClient::new(
1028            context.clone(),
1029            Some(mock_client.clone()),
1030            Some(mock_client.clone()),
1031        ));
1032        let store = Arc::new(MemStore::new());
1033        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1034        let transaction_vote_tracker =
1035            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1036        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1037        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1038        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1039
1040        // Create PeersPool - Observer typically connects to one validator
1041        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1042        // Register the validator peer that the observer connects to
1043        peers_pool
1044            .register_validator(
1045                AuthorityIndex::new_for_test(0),
1046                vec![PeerService::Validator, PeerService::Observer],
1047            )
1048            .unwrap();
1049
1050        let mut commit_syncer = CommitSyncer::new(
1051            context.clone(),
1052            core_thread_dispatcher,
1053            commit_vote_monitor.clone(),
1054            commit_consumer_monitor.clone(),
1055            block_verifier,
1056            transaction_vote_tracker,
1057            round_tracker,
1058            network_client,
1059            dag_state,
1060            peers_pool.clone(),
1061        );
1062
1063        // Verify this is recognized as an observer
1064        assert!(!context.is_validator(), "Should be an observer node");
1065
1066        // Simulate the observer seeing commits from its connected validator
1067        for i in 0..3 {
1068            let test_block = TestBlock::new(10, i)
1069                .set_commit_votes(vec![CommitRef::new(5, CommitDigest::MIN)])
1070                .build();
1071            let block = VerifiedBlock::new_for_test(test_block);
1072            commit_vote_monitor.observe_block(&block);
1073        }
1074
1075        // Observer should be able to schedule commit fetches
1076        commit_syncer.try_schedule_once();
1077        assert_eq!(commit_syncer.pending_fetches().len(), 1);
1078        assert_eq!(commit_syncer.highest_scheduled_index(), Some(5));
1079
1080        // Start fetches - observer should be able to fetch from its single peer
1081        commit_syncer.try_start_fetches();
1082        assert_eq!(
1083            commit_syncer.inflight_fetches.len(),
1084            1,
1085            "Should start fetch from single peer"
1086        );
1087    }
1088
1089    #[tokio::test(flavor = "current_thread", start_paused = true)]
1090    async fn commit_syncer_observer_with_multiple_peers() {
1091        // Test Observer node behavior when connected to multiple peers
1092        // This simulates an observer that can sync from multiple sources
1093
1094        let (mut context, _) = Context::new_for_test(4);
1095        context.own_index = AuthorityIndex::MAX; // Observer node
1096        context.parameters = Parameters {
1097            commit_sync_batch_size: 5,
1098            commit_sync_batches_ahead: 10,
1099            commit_sync_parallel_fetches: 8, // Allow more parallelism
1100            max_blocks_per_fetch: 5,
1101            ..context.parameters
1102        };
1103        let context = Arc::new(context);
1104
1105        // Setup
1106        let block_verifier = Arc::new(NoopBlockVerifier {});
1107        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1108        let network_client = Arc::new(CommitSyncerClient::new(
1109            context.clone(),
1110            Some(Arc::new(FakeNetworkClient::default())),
1111            Some(Arc::new(FakeNetworkClient::default())),
1112        ));
1113        let store = Arc::new(MemStore::new());
1114        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1115        let transaction_vote_tracker =
1116            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1117        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1118        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1119        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1120
1121        // Create PeersPool with multiple peers (validators and another observer)
1122        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1123        // Register multiple validator peers
1124        peers_pool
1125            .register_validator(
1126                AuthorityIndex::new_for_test(0),
1127                vec![PeerService::Validator, PeerService::Observer],
1128            )
1129            .unwrap();
1130        peers_pool
1131            .register_validator(
1132                AuthorityIndex::new_for_test(1),
1133                vec![PeerService::Validator, PeerService::Observer],
1134            )
1135            .unwrap();
1136        peers_pool
1137            .register_validator(
1138                AuthorityIndex::new_for_test(2),
1139                vec![PeerService::Validator, PeerService::Observer],
1140            )
1141            .unwrap();
1142
1143        // Now register another observer peer (simulating observer-to-observer sync)
1144        let observer_peer = NetworkKeyPair::generate(&mut rand::thread_rng()).public();
1145        peers_pool.register_observer(observer_peer);
1146
1147        let mut commit_syncer = CommitSyncer::new(
1148            context.clone(),
1149            core_thread_dispatcher,
1150            commit_vote_monitor.clone(),
1151            commit_consumer_monitor.clone(),
1152            block_verifier,
1153            transaction_vote_tracker,
1154            round_tracker,
1155            network_client,
1156            dag_state,
1157            peers_pool.clone(),
1158        );
1159
1160        // Simulate heavy commit load that requires parallel fetching
1161        for i in 0..3 {
1162            let test_block = TestBlock::new(100, i)
1163                .set_commit_votes(vec![CommitRef::new(50, CommitDigest::MIN)])
1164                .build();
1165            let block = VerifiedBlock::new_for_test(test_block);
1166            commit_vote_monitor.observe_block(&block);
1167        }
1168
1169        commit_syncer.try_schedule_once();
1170
1171        // Should schedule multiple batches
1172        let pending_fetches = commit_syncer.pending_fetches().len();
1173        assert!(pending_fetches > 0, "Should schedule fetches");
1174
1175        // Start fetches - observer should utilize multiple peers in parallel
1176        commit_syncer.try_start_fetches();
1177
1178        // Observer with 4 peers (3 validators + 1 observer) should be able to
1179        // fetch from multiple peers in parallel, not limited by 2/3 rule
1180        let inflight = commit_syncer.inflight_fetches.len();
1181        let known_peers = peers_pool.get_known_peers().len();
1182
1183        assert_eq!(known_peers, 4, "Should have 3 validators + 1 observer peer");
1184
1185        // Observer should be able to use full parallelism up to configured limit
1186        // Not restricted by the validator's 2/3 limitation
1187        let max_parallel = context
1188            .parameters
1189            .commit_sync_parallel_fetches
1190            .min(pending_fetches)
1191            .min(context.parameters.commit_sync_batches_ahead);
1192
1193        assert!(
1194            inflight <= max_parallel,
1195            "Observer should respect configured parallelism limit: {} <= {}",
1196            inflight,
1197            max_parallel
1198        );
1199
1200        // Verify observer can potentially use more parallelism than a validator would
1201        // A validator with 4 peers would be limited to 4 * 2/3 = 2 parallel fetches
1202        // But an observer can use more
1203        if pending_fetches >= 3 {
1204            assert!(
1205                max_parallel > 2,
1206                "Observer should be able to use more parallelism than validator's 2/3 limit"
1207            );
1208        }
1209    }
1210
1211    #[tokio::test(flavor = "current_thread", start_paused = true)]
1212    async fn commit_syncer_start_and_pause_scheduling() {
1213        // SETUP
1214        let (context, _) = Context::new_for_test(4);
1215        // Use smaller batches and fetch limits for testing.
1216        let context = Context {
1217            own_index: AuthorityIndex::new_for_test(3),
1218            parameters: Parameters {
1219                commit_sync_batch_size: 5,
1220                commit_sync_batches_ahead: 5,
1221                commit_sync_parallel_fetches: 5,
1222                max_blocks_per_fetch: 5,
1223                ..context.parameters
1224            },
1225            ..context
1226        };
1227        let context = Arc::new(context);
1228        let block_verifier = Arc::new(NoopBlockVerifier {});
1229        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1230        let mock_client = Arc::new(FakeNetworkClient::default());
1231        let network_client = Arc::new(CommitSyncerClient::new(
1232            context.clone(),
1233            Some(mock_client.clone()),
1234            Some(mock_client.clone()),
1235        ));
1236        let store = Arc::new(MemStore::new());
1237        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1238        let transaction_vote_tracker =
1239            TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1240        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1241        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1242        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1243        let peers_pool = Arc::new(PeersPool::new(context.clone()));
1244        let mut commit_syncer = CommitSyncer::new(
1245            context,
1246            core_thread_dispatcher,
1247            commit_vote_monitor.clone(),
1248            commit_consumer_monitor.clone(),
1249            block_verifier,
1250            transaction_vote_tracker,
1251            round_tracker,
1252            network_client,
1253            dag_state,
1254            peers_pool,
1255        );
1256
1257        // Check initial state.
1258        assert!(commit_syncer.pending_fetches().is_empty());
1259        assert!(commit_syncer.fetched_ranges().is_empty());
1260        assert!(commit_syncer.highest_scheduled_index().is_none());
1261        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1262        assert_eq!(commit_syncer.synced_commit_index(), 0);
1263
1264        // Observe round 15 blocks voting for commit 10 from authorities 0 to 2 in CommitVoteMonitor
1265        for i in 0..3 {
1266            let test_block = TestBlock::new(15, i)
1267                .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
1268                .build();
1269            let block = VerifiedBlock::new_for_test(test_block);
1270            commit_vote_monitor.observe_block(&block);
1271        }
1272
1273        // Fetches should be scheduled after seeing progress of other validators.
1274        commit_syncer.try_schedule_once();
1275
1276        // Verify state.
1277        assert_eq!(commit_syncer.pending_fetches().len(), 2);
1278        assert!(commit_syncer.fetched_ranges().is_empty());
1279        assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1280        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1281        assert_eq!(commit_syncer.synced_commit_index(), 0);
1282
1283        // Observe round 40 blocks voting for commit 35 from authorities 0 to 2 in CommitVoteMonitor
1284        for i in 0..3 {
1285            let test_block = TestBlock::new(40, i)
1286                .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1287                .build();
1288            let block = VerifiedBlock::new_for_test(test_block);
1289            commit_vote_monitor.observe_block(&block);
1290        }
1291
1292        // Fetches should be scheduled until the unhandled commits threshold.
1293        commit_syncer.try_schedule_once();
1294
1295        // Verify commit syncer is paused after scheduling 15 commits to index 25.
1296        assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1297        assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1298        let pending_fetches = commit_syncer.pending_fetches();
1299        assert_eq!(pending_fetches.len(), 5);
1300
1301        // Indicate commit index 25 is consumed, and try to schedule again.
1302        commit_consumer_monitor.set_highest_handled_commit(25);
1303        commit_syncer.try_schedule_once();
1304
1305        // Verify commit syncer schedules fetches up to index 35.
1306        assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1307        let pending_fetches = commit_syncer.pending_fetches();
1308        assert_eq!(pending_fetches.len(), 7);
1309
1310        // Verify contiguous ranges are scheduled.
1311        for (range, start) in pending_fetches.iter().zip_debug_eq((1..35).step_by(5)) {
1312            assert_eq!(range.start(), start);
1313            assert_eq!(range.end(), start + 4);
1314        }
1315    }
1316}