consensus_core/
commit_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CommitSyncer implements efficient synchronization of committed data.
5//!
6//! During the operation of a committee of authorities for consensus, one or more authorities
7//! can fall behind the quorum in their received and accepted blocks. This can happen due to
8//! network disruptions, host crash, or other reasons. Authorities fell behind need to catch up to
9//! the quorum to be able to vote on the latest leaders. So efficient synchronization is necessary
10//! to minimize the impact of temporary disruptions and maintain smooth operations of the network.
11//!  
12//! CommitSyncer achieves efficient synchronization by relying on the following: when blocks
13//! are included in commits with >= 2f+1 certifiers by stake, these blocks must have passed
14//! verifications on some honest validators, so re-verifying them is unnecessary. In fact, the
15//! quorum certified commits themselves can be trusted to be sent to Sui directly, but for
16//! simplicity this is not done. Blocks from trusted commits still go through Core and committer.
17//!
18//! Another way CommitSyncer improves the efficiency of synchronization is parallel fetching:
19//! commits have a simple dependency graph (linear), so it is easy to fetch ranges of commits
20//! in parallel.
21//!
22//! Commit synchronization is an expensive operation, involving transferring large amount of data via
23//! the network. And it is not on the critical path of block processing. So the heuristics for
24//! synchronization, including triggers and retries, should be chosen to favor throughput and
25//! efficient resource usage, over faster reactions.
26
27use std::{
28    collections::{BTreeMap, BTreeSet},
29    sync::Arc,
30    time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_config::AuthorityIndex;
35use consensus_types::block::BlockRef;
36use futures::{StreamExt as _, stream::FuturesOrdered};
37use itertools::Itertools as _;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42    runtime::Handle,
43    sync::oneshot,
44    task::{JoinHandle, JoinSet},
45    time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50    CommitConsumerMonitor, CommitIndex,
51    block::{BlockAPI, SignedBlock, VerifiedBlock},
52    block_verifier::BlockVerifier,
53    commit::{
54        CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55        CommitRef, TrustedCommit,
56    },
57    commit_vote_monitor::CommitVoteMonitor,
58    context::Context,
59    core_thread::CoreThreadDispatcher,
60    dag_state::DagState,
61    error::{ConsensusError, ConsensusResult},
62    network::NetworkClient,
63    stake_aggregator::{QuorumThreshold, StakeAggregator},
64    transaction_certifier::TransactionCertifier,
65};
66
67// Handle to stop the CommitSyncer loop.
68pub(crate) struct CommitSyncerHandle {
69    schedule_task: JoinHandle<()>,
70    tx_shutdown: oneshot::Sender<()>,
71}
72
73impl CommitSyncerHandle {
74    pub(crate) async fn stop(self) {
75        let _ = self.tx_shutdown.send(());
76        // Do not abort schedule task, which waits for fetches to shut down.
77        if let Err(e) = self.schedule_task.await
78            && e.is_panic()
79        {
80            std::panic::resume_unwind(e.into_panic());
81        }
82    }
83}
84
85pub(crate) struct CommitSyncer<C: NetworkClient> {
86    // States shared by scheduler and fetch tasks.
87
88    // Shared components wrapper.
89    inner: Arc<Inner<C>>,
90
91    // States only used by the scheduler.
92
93    // Inflight requests to fetch commits from different authorities.
94    inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
95    // Additional ranges of commits to fetch.
96    pending_fetches: BTreeSet<CommitRange>,
97    // Fetched commits and blocks by commit range.
98    fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
99    // Highest commit index among inflight and pending fetches.
100    // Used to determine the start of new ranges to be fetched.
101    highest_scheduled_index: Option<CommitIndex>,
102    // Highest index among fetched commits, after commits and blocks are verified.
103    // Used for metrics.
104    highest_fetched_commit_index: CommitIndex,
105    // The commit index that is the max of highest local commit index and commit index inflight to Core.
106    // Used to determine if fetched blocks can be sent to Core without gaps.
107    synced_commit_index: CommitIndex,
108}
109
110impl<C: NetworkClient> CommitSyncer<C> {
111    pub(crate) fn new(
112        context: Arc<Context>,
113        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
114        commit_vote_monitor: Arc<CommitVoteMonitor>,
115        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
116        block_verifier: Arc<dyn BlockVerifier>,
117        transaction_certifier: TransactionCertifier,
118        network_client: Arc<C>,
119        dag_state: Arc<RwLock<DagState>>,
120    ) -> Self {
121        let inner = Arc::new(Inner {
122            context,
123            core_thread_dispatcher,
124            commit_vote_monitor,
125            commit_consumer_monitor,
126            block_verifier,
127            transaction_certifier,
128            network_client,
129            dag_state,
130        });
131        let synced_commit_index = inner.dag_state.read().last_commit_index();
132        CommitSyncer {
133            inner,
134            inflight_fetches: JoinSet::new(),
135            pending_fetches: BTreeSet::new(),
136            fetched_ranges: BTreeMap::new(),
137            highest_scheduled_index: None,
138            highest_fetched_commit_index: 0,
139            synced_commit_index,
140        }
141    }
142
143    pub(crate) fn start(self) -> CommitSyncerHandle {
144        let (tx_shutdown, rx_shutdown) = oneshot::channel();
145        let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
146        CommitSyncerHandle {
147            schedule_task,
148            tx_shutdown,
149        }
150    }
151
152    async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
153        let mut interval = tokio::time::interval(Duration::from_secs(2));
154        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
155
156        loop {
157            tokio::select! {
158                // Periodically, schedule new fetches if the node is falling behind.
159                _ = interval.tick() => {
160                    self.try_schedule_once();
161                }
162                // Handles results from fetch tasks.
163                Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
164                    if let Err(e) = result {
165                        if e.is_panic() {
166                            std::panic::resume_unwind(e.into_panic());
167                        }
168                        warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
169                        // If any fetch is cancelled or panicked, try to shutdown and exit the loop.
170                        self.inflight_fetches.shutdown().await;
171                        return;
172                    }
173                    let (target_end, commits) = result.unwrap();
174                    self.handle_fetch_result(target_end, commits).await;
175                }
176                _ = &mut rx_shutdown => {
177                    // Shutdown requested.
178                    info!("CommitSyncer shutting down ...");
179                    self.inflight_fetches.shutdown().await;
180                    return;
181                }
182            }
183
184            self.try_start_fetches();
185        }
186    }
187
188    fn try_schedule_once(&mut self) {
189        let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
190        let local_commit_index = self.inner.dag_state.read().last_commit_index();
191        let metrics = &self.inner.context.metrics.node_metrics;
192        metrics
193            .commit_sync_quorum_index
194            .set(quorum_commit_index as i64);
195        metrics
196            .commit_sync_local_index
197            .set(local_commit_index as i64);
198        let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
199        let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
200        // Update synced_commit_index periodically to make sure it is no smaller than
201        // local commit index.
202        self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
203        let unhandled_commits_threshold = self.unhandled_commits_threshold();
204        info!(
205            "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
206            self.synced_commit_index,
207            highest_handled_index,
208            highest_scheduled_index,
209            quorum_commit_index,
210            unhandled_commits_threshold,
211        );
212
213        // TODO: cleanup inflight fetches that are no longer needed.
214        let fetch_after_index = self
215            .synced_commit_index
216            .max(self.highest_scheduled_index.unwrap_or(0));
217        // When the node is falling behind, schedule pending fetches which will be executed on later.
218        for prev_end in (fetch_after_index..=quorum_commit_index)
219            .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
220        {
221            // Create range with inclusive start and end.
222            let range_start = prev_end + 1;
223            let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
224            // Commit range is not fetched when [range_start, range_end] contains less number of commits
225            // than the target batch size. This is to avoid the cost of processing more and smaller batches.
226            // Block broadcast, subscription and synchronization will help the node catchup.
227            if quorum_commit_index < range_end {
228                break;
229            }
230            // Pause scheduling new fetches when handling of commits is lagging.
231            if highest_handled_index + unhandled_commits_threshold < range_end {
232                warn!(
233                    "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
234                    highest_handled_index, highest_scheduled_index
235                );
236                break;
237            }
238            self.pending_fetches
239                .insert((range_start..=range_end).into());
240            // quorum_commit_index should be non-decreasing, so highest_scheduled_index should not
241            // decrease either.
242            self.highest_scheduled_index = Some(range_end);
243        }
244    }
245
246    async fn handle_fetch_result(
247        &mut self,
248        target_end: CommitIndex,
249        certified_commits: CertifiedCommits,
250    ) {
251        assert!(!certified_commits.commits().is_empty());
252
253        let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
254            .commits()
255            .iter()
256            .fold((0, 0), |(blocks, bytes), c| {
257                (
258                    blocks + c.blocks().len(),
259                    bytes
260                        + c.blocks()
261                            .iter()
262                            .map(|b| b.serialized().len())
263                            .sum::<usize>() as u64,
264                )
265            });
266
267        let metrics = &self.inner.context.metrics.node_metrics;
268        metrics
269            .commit_sync_fetched_commits
270            .inc_by(certified_commits.commits().len() as u64);
271        metrics
272            .commit_sync_fetched_blocks
273            .inc_by(total_blocks_fetched as u64);
274        metrics
275            .commit_sync_total_fetched_blocks_size
276            .inc_by(total_blocks_size_bytes);
277
278        let (commit_start, commit_end) = (
279            certified_commits.commits().first().unwrap().index(),
280            certified_commits.commits().last().unwrap().index(),
281        );
282        self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
283        metrics
284            .commit_sync_highest_fetched_index
285            .set(self.highest_fetched_commit_index as i64);
286
287        // Allow returning partial results, and try fetching the rest separately.
288        if commit_end < target_end {
289            self.pending_fetches
290                .insert((commit_end + 1..=target_end).into());
291        }
292        // Make sure synced_commit_index is up to date.
293        self.synced_commit_index = self
294            .synced_commit_index
295            .max(self.inner.dag_state.read().last_commit_index());
296        // Only add new blocks if at least some of them are not already synced.
297        if self.synced_commit_index < commit_end {
298            self.fetched_ranges
299                .insert((commit_start..=commit_end).into(), certified_commits);
300        }
301        // Try to process as many fetched blocks as possible.
302        while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
303            // Only pop fetched_ranges if there is no gap with blocks already synced.
304            // Note: start, end and synced_commit_index are all inclusive.
305            let (fetched_commit_range, commits) =
306                if fetched_commit_range.start() <= self.synced_commit_index + 1 {
307                    self.fetched_ranges.pop_first().unwrap()
308                } else {
309                    // Found gap between earliest fetched block and latest synced block,
310                    // so not sending additional blocks to Core.
311                    metrics.commit_sync_gap_on_processing.inc();
312                    break;
313                };
314            // Avoid sending to Core a whole batch of already synced blocks.
315            if fetched_commit_range.end() <= self.synced_commit_index {
316                continue;
317            }
318
319            debug!(
320                "Fetched blocks for commit range {:?}: {}",
321                fetched_commit_range,
322                commits
323                    .commits()
324                    .iter()
325                    .flat_map(|c| c.blocks())
326                    .map(|b| b.reference().to_string())
327                    .join(","),
328            );
329
330            // If core thread cannot handle the incoming blocks, it is ok to block here
331            // to slow down the commit syncer.
332            match self
333                .inner
334                .core_thread_dispatcher
335                .add_certified_commits(commits)
336                .await
337            {
338                // Missing ancestors are possible from certification blocks, but
339                // it is unnecessary to try to sync their causal history. If they are required
340                // for the progress of the DAG, they will be included in a future commit.
341                Ok(missing) => {
342                    if !missing.is_empty() {
343                        info!(
344                            "Certification blocks have missing ancestors: {} for commit range {:?}",
345                            missing.iter().map(|b| b.to_string()).join(","),
346                            fetched_commit_range,
347                        );
348                    }
349                    for block_ref in missing {
350                        let hostname = &self
351                            .inner
352                            .context
353                            .committee
354                            .authority(block_ref.author)
355                            .hostname;
356                        metrics
357                            .commit_sync_fetch_missing_blocks
358                            .with_label_values(&[hostname])
359                            .inc();
360                    }
361                }
362                Err(e) => {
363                    info!("Failed to add blocks, shutting down: {}", e);
364                    return;
365                }
366            };
367
368            // Once commits and blocks are sent to Core, ratchet up synced_commit_index
369            self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
370        }
371
372        metrics
373            .commit_sync_inflight_fetches
374            .set(self.inflight_fetches.len() as i64);
375        metrics
376            .commit_sync_pending_fetches
377            .set(self.pending_fetches.len() as i64);
378        metrics
379            .commit_sync_highest_synced_index
380            .set(self.synced_commit_index as i64);
381    }
382
383    fn try_start_fetches(&mut self) {
384        // Cap parallel fetches based on configured limit and committee size, to avoid overloading the network.
385        // Also when there are too many fetched blocks that cannot be sent to Core before an earlier fetch
386        // has not finished, reduce parallelism so the earlier fetch can retry on a better host and succeed.
387        let target_parallel_fetches = self
388            .inner
389            .context
390            .parameters
391            .commit_sync_parallel_fetches
392            .min(self.inner.context.committee.size() * 2 / 3)
393            .min(
394                self.inner
395                    .context
396                    .parameters
397                    .commit_sync_batches_ahead
398                    .saturating_sub(self.fetched_ranges.len()),
399            )
400            .max(1);
401        // Start new fetches if there are pending batches and available slots.
402        loop {
403            if self.inflight_fetches.len() >= target_parallel_fetches {
404                break;
405            }
406            let Some(commit_range) = self.pending_fetches.pop_first() else {
407                break;
408            };
409            self.inflight_fetches
410                .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
411        }
412
413        let metrics = &self.inner.context.metrics.node_metrics;
414        metrics
415            .commit_sync_inflight_fetches
416            .set(self.inflight_fetches.len() as i64);
417        metrics
418            .commit_sync_pending_fetches
419            .set(self.pending_fetches.len() as i64);
420        metrics
421            .commit_sync_highest_synced_index
422            .set(self.synced_commit_index as i64);
423    }
424
425    // Retries fetching commits and blocks from available authorities, until a request succeeds
426    // where at least a prefix of the commit range is fetched.
427    // Returns the fetched commits and blocks referenced by the commits.
428    async fn fetch_loop(
429        inner: Arc<Inner<C>>,
430        commit_range: CommitRange,
431    ) -> (CommitIndex, CertifiedCommits) {
432        // Individual request base timeout.
433        const TIMEOUT: Duration = Duration::from_secs(10);
434        // Max per-request timeout will be base timeout times a multiplier.
435        // At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks.
436        const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
437        // timeout * max number of targets should be reasonably small, so the
438        // system can adjust to slow network or large data sizes quickly.
439        const MAX_NUM_TARGETS: usize = 24;
440        let mut timeout_multiplier = 0;
441        let _timer = inner
442            .context
443            .metrics
444            .node_metrics
445            .commit_sync_fetch_loop_latency
446            .start_timer();
447        info!("Starting to fetch commits in {commit_range:?} ...",);
448        loop {
449            // Attempt to fetch commits and blocks through min(committee size, MAX_NUM_TARGETS) peers.
450            let mut target_authorities = inner
451                .context
452                .committee
453                .authorities()
454                .filter_map(|(i, _)| {
455                    if i != inner.context.own_index {
456                        Some(i)
457                    } else {
458                        None
459                    }
460                })
461                .collect_vec();
462            target_authorities.shuffle(&mut ThreadRng::default());
463            target_authorities.truncate(MAX_NUM_TARGETS);
464            // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
465            timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
466            let request_timeout = TIMEOUT * timeout_multiplier;
467            // Give enough overall timeout for fetching commits and blocks.
468            // - Timeout for fetching commits and commit certifying blocks.
469            // - Timeout for fetching blocks referenced by the commits.
470            // - Time spent on pipelining requests to fetch blocks.
471            // - Another headroom to allow fetch_once() to timeout gracefully if possible.
472            let fetch_timeout = request_timeout * 4;
473            // Try fetching from selected target authority.
474            for authority in target_authorities {
475                match tokio::time::timeout(
476                    fetch_timeout,
477                    Self::fetch_once(
478                        inner.clone(),
479                        authority,
480                        commit_range.clone(),
481                        request_timeout,
482                    ),
483                )
484                .await
485                {
486                    Ok(Ok(commits)) => {
487                        info!("Finished fetching commits in {commit_range:?}",);
488                        return (commit_range.end(), commits);
489                    }
490                    Ok(Err(e)) => {
491                        let hostname = inner
492                            .context
493                            .committee
494                            .authority(authority)
495                            .hostname
496                            .clone();
497                        warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
498                        inner
499                            .context
500                            .metrics
501                            .node_metrics
502                            .commit_sync_fetch_once_errors
503                            .with_label_values(&[&hostname, e.name()])
504                            .inc();
505                    }
506                    Err(_) => {
507                        let hostname = inner
508                            .context
509                            .committee
510                            .authority(authority)
511                            .hostname
512                            .clone();
513                        warn!("Timed out fetching {commit_range:?} from {authority}",);
514                        inner
515                            .context
516                            .metrics
517                            .node_metrics
518                            .commit_sync_fetch_once_errors
519                            .with_label_values(&[&hostname, "FetchTimeout"])
520                            .inc();
521                    }
522                }
523            }
524            // Avoid busy looping, by waiting for a while before retrying.
525            sleep(TIMEOUT).await;
526        }
527    }
528
529    // Fetches commits and blocks from a single authority. At a high level, first the commits are
530    // fetched and verified. After that, blocks referenced in the certified commits are fetched
531    // and sent to Core for processing.
532    async fn fetch_once(
533        inner: Arc<Inner<C>>,
534        target_authority: AuthorityIndex,
535        commit_range: CommitRange,
536        timeout: Duration,
537    ) -> ConsensusResult<CertifiedCommits> {
538        let _timer = inner
539            .context
540            .metrics
541            .node_metrics
542            .commit_sync_fetch_once_latency
543            .start_timer();
544
545        // 1. Fetch commits in the commit range from the target authority.
546        let (serialized_commits, serialized_blocks) = inner
547            .network_client
548            .fetch_commits(target_authority, commit_range.clone(), timeout)
549            .await?;
550
551        // 2. Verify the response contains blocks that can certify the last returned commit,
552        // and the returned commits are chained by digests, so earlier commits are certified
553        // as well.
554        let (commits, vote_blocks) = Handle::current()
555            .spawn_blocking({
556                let inner = inner.clone();
557                move || {
558                    inner.verify_commits(
559                        target_authority,
560                        commit_range,
561                        serialized_commits,
562                        serialized_blocks,
563                    )
564                }
565            })
566            .await
567            .expect("Spawn blocking should not fail")?;
568
569        // 3. Fetch blocks referenced by the commits, from the same peer where commits are fetched.
570        let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
571        block_refs.sort();
572        let num_chunks = block_refs
573            .len()
574            .div_ceil(inner.context.parameters.max_blocks_per_fetch)
575            as u32;
576        let mut requests: FuturesOrdered<_> = block_refs
577            .chunks(inner.context.parameters.max_blocks_per_fetch)
578            .enumerate()
579            .map(|(i, request_block_refs)| {
580                let inner = inner.clone();
581                async move {
582                    // 4. Send out pipelined fetch requests to avoid overloading the target authority.
583                    sleep(timeout * i as u32 / num_chunks).await;
584                    // TODO: add some retries.
585                    let serialized_blocks = inner
586                        .network_client
587                        .fetch_blocks(
588                            target_authority,
589                            request_block_refs.to_vec(),
590                            vec![],
591                            false,
592                            timeout,
593                        )
594                        .await?;
595                    // 5. Verify the same number of blocks are returned as requested.
596                    if request_block_refs.len() != serialized_blocks.len() {
597                        return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
598                            authority: target_authority,
599                            requested: request_block_refs.len(),
600                            received: serialized_blocks.len(),
601                        });
602                    }
603                    // 6. Verify returned blocks have valid formats.
604                    let signed_blocks = serialized_blocks
605                        .iter()
606                        .map(|serialized| {
607                            let block: SignedBlock = bcs::from_bytes(serialized)
608                                .map_err(ConsensusError::MalformedBlock)?;
609                            Ok(block)
610                        })
611                        .collect::<ConsensusResult<Vec<_>>>()?;
612                    // 7. Verify the returned blocks match the requested block refs.
613                    // If they do match, the returned blocks can be considered verified as well.
614                    let mut blocks = Vec::new();
615                    for ((requested_block_ref, signed_block), serialized) in request_block_refs
616                        .iter()
617                        .zip(signed_blocks.into_iter())
618                        .zip(serialized_blocks.into_iter())
619                    {
620                        let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
621                        let received_block_ref = BlockRef::new(
622                            signed_block.round(),
623                            signed_block.author(),
624                            signed_block_digest,
625                        );
626                        if *requested_block_ref != received_block_ref {
627                            return Err(ConsensusError::UnexpectedBlockForCommit {
628                                peer: target_authority,
629                                requested: *requested_block_ref,
630                                received: received_block_ref,
631                            });
632                        }
633                        blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
634                    }
635                    Ok(blocks)
636                }
637            })
638            .collect();
639
640        let mut fetched_blocks = BTreeMap::new();
641        while let Some(result) = requests.next().await {
642            for block in result? {
643                fetched_blocks.insert(block.reference(), block);
644            }
645        }
646
647        // 8. Check if the block timestamps are lower than current time - this is for metrics only.
648        for block in fetched_blocks.values().chain(vote_blocks.iter()) {
649            let now_ms = inner.context.clock.timestamp_utc_ms();
650            let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
651            if forward_drift == 0 {
652                continue;
653            };
654            let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
655            inner
656                .context
657                .metrics
658                .node_metrics
659                .block_timestamp_drift_ms
660                .with_label_values(&[peer_hostname, "commit_syncer"])
661                .inc_by(forward_drift);
662        }
663
664        // 9. Now create certified commits by assigning the blocks to each commit.
665        let mut certified_commits = Vec::new();
666        for commit in &commits {
667            let blocks = commit
668                .blocks()
669                .iter()
670                .map(|block_ref| {
671                    fetched_blocks
672                        .remove(block_ref)
673                        .expect("Block should exist")
674                })
675                .collect::<Vec<_>>();
676            certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
677        }
678
679        // 10. Add blocks in certified commits to the transaction certifier.
680        for commit in &certified_commits {
681            for block in commit.blocks() {
682                // Only account for reject votes in the block, since they may vote on uncommitted
683                // blocks or transactions. It is unnecessary to vote on the committed blocks
684                // themselves.
685                if inner.context.protocol_config.mysticeti_fastpath() {
686                    inner
687                        .transaction_certifier
688                        .add_voted_blocks(vec![(block.clone(), vec![])]);
689                }
690            }
691        }
692
693        Ok(CertifiedCommits::new(certified_commits, vote_blocks))
694    }
695
696    fn unhandled_commits_threshold(&self) -> CommitIndex {
697        self.inner.context.parameters.commit_sync_batch_size
698            * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
699    }
700
701    #[cfg(test)]
702    fn pending_fetches(&self) -> BTreeSet<CommitRange> {
703        self.pending_fetches.clone()
704    }
705
706    #[cfg(test)]
707    fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
708        self.fetched_ranges.clone()
709    }
710
711    #[cfg(test)]
712    fn highest_scheduled_index(&self) -> Option<CommitIndex> {
713        self.highest_scheduled_index
714    }
715
716    #[cfg(test)]
717    fn highest_fetched_commit_index(&self) -> CommitIndex {
718        self.highest_fetched_commit_index
719    }
720
721    #[cfg(test)]
722    fn synced_commit_index(&self) -> CommitIndex {
723        self.synced_commit_index
724    }
725}
726
727struct Inner<C: NetworkClient> {
728    context: Arc<Context>,
729    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
730    commit_vote_monitor: Arc<CommitVoteMonitor>,
731    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
732    block_verifier: Arc<dyn BlockVerifier>,
733    transaction_certifier: TransactionCertifier,
734    network_client: Arc<C>,
735    dag_state: Arc<RwLock<DagState>>,
736}
737
738impl<C: NetworkClient> Inner<C> {
739    /// Verifies the commits and also certifies them using the provided vote blocks for the last commit. The
740    /// method returns the trusted commits and the votes as verified blocks.
741    fn verify_commits(
742        &self,
743        peer: AuthorityIndex,
744        commit_range: CommitRange,
745        serialized_commits: Vec<Bytes>,
746        serialized_vote_blocks: Vec<Bytes>,
747    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
748        // Parse and verify commits.
749        let mut commits = Vec::new();
750        for serialized in &serialized_commits {
751            let commit: Commit =
752                bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
753            let digest = TrustedCommit::compute_digest(serialized);
754            if commits.is_empty() {
755                // start is inclusive, so first commit must be at the start index.
756                if commit.index() != commit_range.start() {
757                    return Err(ConsensusError::UnexpectedStartCommit {
758                        peer,
759                        start: commit_range.start(),
760                        commit: Box::new(commit),
761                    });
762                }
763            } else {
764                // Verify next commit increments index and references the previous digest.
765                let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
766                    commits.last().unwrap();
767                if commit.index() != last_commit.index() + 1
768                    || &commit.previous_digest() != last_commit_digest
769                {
770                    return Err(ConsensusError::UnexpectedCommitSequence {
771                        peer,
772                        prev_commit: Box::new(last_commit.clone()),
773                        curr_commit: Box::new(commit),
774                    });
775                }
776            }
777            // Do not process more commits past the end index.
778            if commit.index() > commit_range.end() {
779                break;
780            }
781            commits.push((digest, commit));
782        }
783        let Some((end_commit_digest, end_commit)) = commits.last() else {
784            return Err(ConsensusError::NoCommitReceived { peer });
785        };
786
787        // Parse and verify blocks. Then accumulate votes on the end commit.
788        let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
789        let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
790        let mut vote_blocks = Vec::new();
791        for serialized in serialized_vote_blocks {
792            let block: SignedBlock =
793                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
794            // Only block signatures need to be verified, to verify commit votes.
795            // But the blocks will be sent to Core, so they need to be fully verified.
796            let (block, reject_transaction_votes) =
797                self.block_verifier.verify_and_vote(block, serialized)?;
798            if self.context.protocol_config.mysticeti_fastpath() {
799                self.transaction_certifier
800                    .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
801            }
802            for vote in block.commit_votes() {
803                if *vote == end_commit_ref {
804                    stake_aggregator.add(block.author(), &self.context.committee);
805                }
806            }
807            vote_blocks.push(block);
808        }
809
810        // Check if the end commit has enough votes.
811        if !stake_aggregator.reached_threshold(&self.context.committee) {
812            return Err(ConsensusError::NotEnoughCommitVotes {
813                stake: stake_aggregator.stake(),
814                peer,
815                commit: Box::new(end_commit.clone()),
816            });
817        }
818
819        let trusted_commits = commits
820            .into_iter()
821            .zip(serialized_commits)
822            .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
823            .collect();
824        Ok((trusted_commits, vote_blocks))
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use std::{sync::Arc, time::Duration};
831
832    use bytes::Bytes;
833    use consensus_config::{AuthorityIndex, Parameters};
834    use consensus_types::block::{BlockRef, Round};
835    use mysten_metrics::monitored_mpsc;
836    use parking_lot::RwLock;
837
838    use crate::{
839        CommitConsumerMonitor, CommitDigest, CommitRef,
840        block::{TestBlock, VerifiedBlock},
841        block_verifier::NoopBlockVerifier,
842        commit::CommitRange,
843        commit_syncer::CommitSyncer,
844        commit_vote_monitor::CommitVoteMonitor,
845        context::Context,
846        core_thread::MockCoreThreadDispatcher,
847        dag_state::DagState,
848        error::ConsensusResult,
849        network::{BlockStream, NetworkClient},
850        storage::mem_store::MemStore,
851        transaction_certifier::TransactionCertifier,
852    };
853
854    #[derive(Default)]
855    struct FakeNetworkClient {}
856
857    #[async_trait::async_trait]
858    impl NetworkClient for FakeNetworkClient {
859        const SUPPORT_STREAMING: bool = true;
860
861        async fn send_block(
862            &self,
863            _peer: AuthorityIndex,
864            _serialized_block: &VerifiedBlock,
865            _timeout: Duration,
866        ) -> ConsensusResult<()> {
867            unimplemented!("Unimplemented")
868        }
869
870        async fn subscribe_blocks(
871            &self,
872            _peer: AuthorityIndex,
873            _last_received: Round,
874            _timeout: Duration,
875        ) -> ConsensusResult<BlockStream> {
876            unimplemented!("Unimplemented")
877        }
878
879        async fn fetch_blocks(
880            &self,
881            _peer: AuthorityIndex,
882            _block_refs: Vec<BlockRef>,
883            _highest_accepted_rounds: Vec<Round>,
884            _breadth_first: bool,
885            _timeout: Duration,
886        ) -> ConsensusResult<Vec<Bytes>> {
887            unimplemented!("Unimplemented")
888        }
889
890        async fn fetch_commits(
891            &self,
892            _peer: AuthorityIndex,
893            _commit_range: CommitRange,
894            _timeout: Duration,
895        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
896            unimplemented!("Unimplemented")
897        }
898
899        async fn fetch_latest_blocks(
900            &self,
901            _peer: AuthorityIndex,
902            _authorities: Vec<AuthorityIndex>,
903            _timeout: Duration,
904        ) -> ConsensusResult<Vec<Bytes>> {
905            unimplemented!("Unimplemented")
906        }
907
908        async fn get_latest_rounds(
909            &self,
910            _peer: AuthorityIndex,
911            _timeout: Duration,
912        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
913            unimplemented!("Unimplemented")
914        }
915    }
916
917    #[tokio::test(flavor = "current_thread", start_paused = true)]
918    async fn commit_syncer_start_and_pause_scheduling() {
919        // SETUP
920        let (context, _) = Context::new_for_test(4);
921        // Use smaller batches and fetch limits for testing.
922        let context = Context {
923            own_index: AuthorityIndex::new_for_test(3),
924            parameters: Parameters {
925                commit_sync_batch_size: 5,
926                commit_sync_batches_ahead: 5,
927                commit_sync_parallel_fetches: 5,
928                max_blocks_per_fetch: 5,
929                ..context.parameters
930            },
931            ..context
932        };
933        let context = Arc::new(context);
934        let block_verifier = Arc::new(NoopBlockVerifier {});
935        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
936        let network_client = Arc::new(FakeNetworkClient::default());
937        let store = Arc::new(MemStore::new());
938        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
939        let (blocks_sender, _blocks_receiver) =
940            monitored_mpsc::unbounded_channel("consensus_block_output");
941        let transaction_certifier =
942            TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
943        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
944        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
945        let mut commit_syncer = CommitSyncer::new(
946            context,
947            core_thread_dispatcher,
948            commit_vote_monitor.clone(),
949            commit_consumer_monitor.clone(),
950            block_verifier,
951            transaction_certifier,
952            network_client,
953            dag_state,
954        );
955
956        // Check initial state.
957        assert!(commit_syncer.pending_fetches().is_empty());
958        assert!(commit_syncer.fetched_ranges().is_empty());
959        assert!(commit_syncer.highest_scheduled_index().is_none());
960        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
961        assert_eq!(commit_syncer.synced_commit_index(), 0);
962
963        // Observe round 15 blocks voting for commit 10 from authorities 0 to 2 in CommitVoteMonitor
964        for i in 0..3 {
965            let test_block = TestBlock::new(15, i)
966                .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
967                .build();
968            let block = VerifiedBlock::new_for_test(test_block);
969            commit_vote_monitor.observe_block(&block);
970        }
971
972        // Fetches should be scheduled after seeing progress of other validators.
973        commit_syncer.try_schedule_once();
974
975        // Verify state.
976        assert_eq!(commit_syncer.pending_fetches().len(), 2);
977        assert!(commit_syncer.fetched_ranges().is_empty());
978        assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
979        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
980        assert_eq!(commit_syncer.synced_commit_index(), 0);
981
982        // Observe round 40 blocks voting for commit 35 from authorities 0 to 2 in CommitVoteMonitor
983        for i in 0..3 {
984            let test_block = TestBlock::new(40, i)
985                .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
986                .build();
987            let block = VerifiedBlock::new_for_test(test_block);
988            commit_vote_monitor.observe_block(&block);
989        }
990
991        // Fetches should be scheduled until the unhandled commits threshold.
992        commit_syncer.try_schedule_once();
993
994        // Verify commit syncer is paused after scheduling 15 commits to index 25.
995        assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
996        assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
997        let pending_fetches = commit_syncer.pending_fetches();
998        assert_eq!(pending_fetches.len(), 5);
999
1000        // Indicate commit index 25 is consumed, and try to schedule again.
1001        commit_consumer_monitor.set_highest_handled_commit(25);
1002        commit_syncer.try_schedule_once();
1003
1004        // Verify commit syncer schedules fetches up to index 35.
1005        assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1006        let pending_fetches = commit_syncer.pending_fetches();
1007        assert_eq!(pending_fetches.len(), 7);
1008
1009        // Verify contiguous ranges are scheduled.
1010        for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1011            assert_eq!(range.start(), start);
1012            assert_eq!(range.end(), start + 4);
1013        }
1014    }
1015}