consensus_core/
commit_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! CommitSyncer implements efficient synchronization of committed data.
5//!
6//! During the operation of a committee of authorities for consensus, one or more authorities
7//! can fall behind the quorum in their received and accepted blocks. This can happen due to
8//! network disruptions, host crash, or other reasons. Authorities fell behind need to catch up to
9//! the quorum to be able to vote on the latest leaders. So efficient synchronization is necessary
10//! to minimize the impact of temporary disruptions and maintain smooth operations of the network.
11//!  
12//! CommitSyncer achieves efficient synchronization by relying on the following: when blocks
13//! are included in commits with >= 2f+1 certifiers by stake, these blocks must have passed
14//! verifications on some honest validators, so re-verifying them is unnecessary. In fact, the
15//! quorum certified commits themselves can be trusted to be sent to Sui directly, but for
16//! simplicity this is not done. Blocks from trusted commits still go through Core and committer.
17//!
18//! Another way CommitSyncer improves the efficiency of synchronization is parallel fetching:
19//! commits have a simple dependency graph (linear), so it is easy to fetch ranges of commits
20//! in parallel.
21//!
22//! Commit synchronization is an expensive operation, involving transferring large amount of data via
23//! the network. And it is not on the critical path of block processing. So the heuristics for
24//! synchronization, including triggers and retries, should be chosen to favor throughput and
25//! efficient resource usage, over faster reactions.
26
27use std::{
28    collections::{BTreeMap, BTreeSet},
29    sync::Arc,
30    time::Duration,
31};
32
33use bytes::Bytes;
34use consensus_config::AuthorityIndex;
35use consensus_types::block::BlockRef;
36use futures::{StreamExt as _, stream::FuturesOrdered};
37use itertools::Itertools as _;
38use mysten_metrics::spawn_logged_monitored_task;
39use parking_lot::RwLock;
40use rand::{prelude::SliceRandom as _, rngs::ThreadRng};
41use tokio::{
42    runtime::Handle,
43    sync::oneshot,
44    task::{JoinHandle, JoinSet},
45    time::{MissedTickBehavior, sleep},
46};
47use tracing::{debug, info, warn};
48
49use crate::{
50    CommitConsumerMonitor, CommitIndex,
51    block::{BlockAPI, ExtendedBlock, SignedBlock, VerifiedBlock},
52    block_verifier::BlockVerifier,
53    commit::{
54        CertifiedCommit, CertifiedCommits, Commit, CommitAPI as _, CommitDigest, CommitRange,
55        CommitRef, TrustedCommit,
56    },
57    commit_vote_monitor::CommitVoteMonitor,
58    context::Context,
59    core_thread::CoreThreadDispatcher,
60    dag_state::DagState,
61    error::{ConsensusError, ConsensusResult},
62    network::{CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
63    round_tracker::RoundTracker,
64    stake_aggregator::{QuorumThreshold, StakeAggregator},
65    transaction_certifier::TransactionCertifier,
66};
67
68// Handle to stop the CommitSyncer loop.
69pub(crate) struct CommitSyncerHandle {
70    schedule_task: JoinHandle<()>,
71    tx_shutdown: oneshot::Sender<()>,
72}
73
74impl CommitSyncerHandle {
75    pub(crate) async fn stop(self) {
76        let _ = self.tx_shutdown.send(());
77        // Do not abort schedule task, which waits for fetches to shut down.
78        if let Err(e) = self.schedule_task.await
79            && e.is_panic()
80        {
81            std::panic::resume_unwind(e.into_panic());
82        }
83    }
84}
85
86pub(crate) struct CommitSyncer<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
87    // States shared by scheduler and fetch tasks.
88
89    // Shared components wrapper.
90    inner: Arc<Inner<VC, OC>>,
91
92    // States only used by the scheduler.
93
94    // Inflight requests to fetch commits from different authorities.
95    inflight_fetches: JoinSet<(u32, CertifiedCommits)>,
96    // Additional ranges of commits to fetch.
97    pending_fetches: BTreeSet<CommitRange>,
98    // Fetched commits and blocks by commit range.
99    fetched_ranges: BTreeMap<CommitRange, CertifiedCommits>,
100    // Highest commit index among inflight and pending fetches.
101    // Used to determine the start of new ranges to be fetched.
102    highest_scheduled_index: Option<CommitIndex>,
103    // Highest index among fetched commits, after commits and blocks are verified.
104    // Used for metrics.
105    highest_fetched_commit_index: CommitIndex,
106    // The commit index that is the max of highest local commit index and commit index inflight to Core.
107    // Used to determine if fetched blocks can be sent to Core without gaps.
108    synced_commit_index: CommitIndex,
109}
110
111impl<VC, OC> CommitSyncer<VC, OC>
112where
113    VC: ValidatorNetworkClient,
114    OC: ObserverNetworkClient,
115{
116    pub(crate) fn new(
117        context: Arc<Context>,
118        core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
119        commit_vote_monitor: Arc<CommitVoteMonitor>,
120        commit_consumer_monitor: Arc<CommitConsumerMonitor>,
121        block_verifier: Arc<dyn BlockVerifier>,
122        transaction_certifier: TransactionCertifier,
123        round_tracker: Arc<RwLock<RoundTracker>>,
124        network_client: Arc<CommitSyncerClient<VC, OC>>,
125        dag_state: Arc<RwLock<DagState>>,
126    ) -> Self {
127        let inner = Arc::new(Inner {
128            context,
129            core_thread_dispatcher,
130            commit_vote_monitor,
131            commit_consumer_monitor,
132            block_verifier,
133            transaction_certifier,
134            round_tracker,
135            network_client,
136            dag_state,
137        });
138        let synced_commit_index = inner.dag_state.read().last_commit_index();
139        CommitSyncer {
140            inner,
141            inflight_fetches: JoinSet::new(),
142            pending_fetches: BTreeSet::new(),
143            fetched_ranges: BTreeMap::new(),
144            highest_scheduled_index: None,
145            highest_fetched_commit_index: 0,
146            synced_commit_index,
147        }
148    }
149
150    pub(crate) fn start(self) -> CommitSyncerHandle {
151        let (tx_shutdown, rx_shutdown) = oneshot::channel();
152        let schedule_task = spawn_logged_monitored_task!(self.schedule_loop(rx_shutdown,));
153        CommitSyncerHandle {
154            schedule_task,
155            tx_shutdown,
156        }
157    }
158
159    async fn schedule_loop(mut self, mut rx_shutdown: oneshot::Receiver<()>) {
160        let mut interval = tokio::time::interval(Duration::from_secs(2));
161        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
162
163        loop {
164            tokio::select! {
165                // Periodically, schedule new fetches if the node is falling behind.
166                _ = interval.tick() => {
167                    self.try_schedule_once();
168                }
169                // Handles results from fetch tasks.
170                Some(result) = self.inflight_fetches.join_next(), if !self.inflight_fetches.is_empty() => {
171                    if let Err(e) = result {
172                        if e.is_panic() {
173                            std::panic::resume_unwind(e.into_panic());
174                        }
175                        warn!("Fetch cancelled. CommitSyncer shutting down: {}", e);
176                        // If any fetch is cancelled or panicked, try to shutdown and exit the loop.
177                        self.inflight_fetches.shutdown().await;
178                        return;
179                    }
180                    let (target_end, commits) = result.unwrap();
181                    self.handle_fetch_result(target_end, commits).await;
182                }
183                _ = &mut rx_shutdown => {
184                    // Shutdown requested.
185                    info!("CommitSyncer shutting down ...");
186                    self.inflight_fetches.shutdown().await;
187                    return;
188                }
189            }
190
191            self.try_start_fetches();
192        }
193    }
194
195    fn try_schedule_once(&mut self) {
196        let quorum_commit_index = self.inner.commit_vote_monitor.quorum_commit_index();
197        let local_commit_index = self.inner.dag_state.read().last_commit_index();
198        let metrics = &self.inner.context.metrics.node_metrics;
199        metrics
200            .commit_sync_quorum_index
201            .set(quorum_commit_index as i64);
202        metrics
203            .commit_sync_local_index
204            .set(local_commit_index as i64);
205        let highest_handled_index = self.inner.commit_consumer_monitor.highest_handled_commit();
206        let highest_scheduled_index = self.highest_scheduled_index.unwrap_or(0);
207        // Update synced_commit_index periodically to make sure it is no smaller than
208        // local commit index.
209        self.synced_commit_index = self.synced_commit_index.max(local_commit_index);
210        let unhandled_commits_threshold = self.unhandled_commits_threshold();
211        info!(
212            "Checking to schedule fetches: synced_commit_index={}, highest_handled_index={}, highest_scheduled_index={}, quorum_commit_index={}, unhandled_commits_threshold={}",
213            self.synced_commit_index,
214            highest_handled_index,
215            highest_scheduled_index,
216            quorum_commit_index,
217            unhandled_commits_threshold,
218        );
219
220        // TODO: cleanup inflight fetches that are no longer needed.
221        let fetch_after_index = self
222            .synced_commit_index
223            .max(self.highest_scheduled_index.unwrap_or(0));
224        // When the node is falling behind, schedule pending fetches which will be executed on later.
225        for prev_end in (fetch_after_index..=quorum_commit_index)
226            .step_by(self.inner.context.parameters.commit_sync_batch_size as usize)
227        {
228            // Create range with inclusive start and end.
229            let range_start = prev_end + 1;
230            let range_end = prev_end + self.inner.context.parameters.commit_sync_batch_size;
231            // Commit range is not fetched when [range_start, range_end] contains less number of commits
232            // than the target batch size. This is to avoid the cost of processing more and smaller batches.
233            // Block broadcast, subscription and synchronization will help the node catchup.
234            if quorum_commit_index < range_end {
235                break;
236            }
237            // Pause scheduling new fetches when handling of commits is lagging.
238            if highest_handled_index + unhandled_commits_threshold < range_end {
239                warn!(
240                    "Skip scheduling new commit fetches: consensus handler is lagging. highest_handled_index={}, highest_scheduled_index={}",
241                    highest_handled_index, highest_scheduled_index
242                );
243                break;
244            }
245            self.pending_fetches
246                .insert((range_start..=range_end).into());
247            // quorum_commit_index should be non-decreasing, so highest_scheduled_index should not
248            // decrease either.
249            self.highest_scheduled_index = Some(range_end);
250        }
251    }
252
253    async fn handle_fetch_result(
254        &mut self,
255        target_end: CommitIndex,
256        certified_commits: CertifiedCommits,
257    ) {
258        assert!(!certified_commits.commits().is_empty());
259
260        let (total_blocks_fetched, total_blocks_size_bytes) = certified_commits
261            .commits()
262            .iter()
263            .fold((0, 0), |(blocks, bytes), c| {
264                (
265                    blocks + c.blocks().len(),
266                    bytes
267                        + c.blocks()
268                            .iter()
269                            .map(|b| b.serialized().len())
270                            .sum::<usize>() as u64,
271                )
272            });
273
274        let metrics = &self.inner.context.metrics.node_metrics;
275        metrics
276            .commit_sync_fetched_commits
277            .inc_by(certified_commits.commits().len() as u64);
278        metrics
279            .commit_sync_fetched_blocks
280            .inc_by(total_blocks_fetched as u64);
281        metrics
282            .commit_sync_total_fetched_blocks_size
283            .inc_by(total_blocks_size_bytes);
284
285        let (commit_start, commit_end) = (
286            certified_commits.commits().first().unwrap().index(),
287            certified_commits.commits().last().unwrap().index(),
288        );
289        self.highest_fetched_commit_index = self.highest_fetched_commit_index.max(commit_end);
290        metrics
291            .commit_sync_highest_fetched_index
292            .set(self.highest_fetched_commit_index as i64);
293
294        // Allow returning partial results, and try fetching the rest separately.
295        if commit_end < target_end {
296            self.pending_fetches
297                .insert((commit_end + 1..=target_end).into());
298        }
299        // Make sure synced_commit_index is up to date.
300        self.synced_commit_index = self
301            .synced_commit_index
302            .max(self.inner.dag_state.read().last_commit_index());
303        // Only add new blocks if at least some of them are not already synced.
304        if self.synced_commit_index < commit_end {
305            self.fetched_ranges
306                .insert((commit_start..=commit_end).into(), certified_commits);
307        }
308        // Try to process as many fetched blocks as possible.
309        while let Some((fetched_commit_range, _commits)) = self.fetched_ranges.first_key_value() {
310            // Only pop fetched_ranges if there is no gap with blocks already synced.
311            // Note: start, end and synced_commit_index are all inclusive.
312            let (fetched_commit_range, commits) =
313                if fetched_commit_range.start() <= self.synced_commit_index + 1 {
314                    self.fetched_ranges.pop_first().unwrap()
315                } else {
316                    // Found gap between earliest fetched block and latest synced block,
317                    // so not sending additional blocks to Core.
318                    metrics.commit_sync_gap_on_processing.inc();
319                    break;
320                };
321            // Avoid sending to Core a whole batch of already synced blocks.
322            if fetched_commit_range.end() <= self.synced_commit_index {
323                continue;
324            }
325
326            debug!(
327                "Fetched blocks for commit range {:?}: {}",
328                fetched_commit_range,
329                commits
330                    .commits()
331                    .iter()
332                    .flat_map(|c| c.blocks())
333                    .map(|b| b.reference().to_string())
334                    .join(","),
335            );
336
337            // If core thread cannot handle the incoming blocks, it is ok to block here
338            // to slow down the commit syncer.
339            match self
340                .inner
341                .core_thread_dispatcher
342                .add_certified_commits(commits)
343                .await
344            {
345                // Missing ancestors are possible from certification blocks, but
346                // it is unnecessary to try to sync their causal history. If they are required
347                // for the progress of the DAG, they will be included in a future commit.
348                Ok(missing) => {
349                    if !missing.is_empty() {
350                        info!(
351                            "Certification blocks have missing ancestors: {} for commit range {:?}",
352                            missing.iter().map(|b| b.to_string()).join(","),
353                            fetched_commit_range,
354                        );
355                    }
356                    for block_ref in missing {
357                        let hostname = &self
358                            .inner
359                            .context
360                            .committee
361                            .authority(block_ref.author)
362                            .hostname;
363                        metrics
364                            .commit_sync_fetch_missing_blocks
365                            .with_label_values(&[hostname])
366                            .inc();
367                    }
368                }
369                Err(e) => {
370                    info!("Failed to add blocks, shutting down: {}", e);
371                    return;
372                }
373            };
374
375            // Once commits and blocks are sent to Core, ratchet up synced_commit_index
376            self.synced_commit_index = self.synced_commit_index.max(fetched_commit_range.end());
377        }
378
379        metrics
380            .commit_sync_inflight_fetches
381            .set(self.inflight_fetches.len() as i64);
382        metrics
383            .commit_sync_pending_fetches
384            .set(self.pending_fetches.len() as i64);
385        metrics
386            .commit_sync_highest_synced_index
387            .set(self.synced_commit_index as i64);
388    }
389
390    fn try_start_fetches(&mut self) {
391        // Cap parallel fetches based on configured limit and committee size, to avoid overloading the network.
392        // Also when there are too many fetched blocks that cannot be sent to Core before an earlier fetch
393        // has not finished, reduce parallelism so the earlier fetch can retry on a better host and succeed.
394        let target_parallel_fetches = self
395            .inner
396            .context
397            .parameters
398            .commit_sync_parallel_fetches
399            .min(self.inner.context.committee.size() * 2 / 3)
400            .min(
401                self.inner
402                    .context
403                    .parameters
404                    .commit_sync_batches_ahead
405                    .saturating_sub(self.fetched_ranges.len()),
406            )
407            .max(1);
408        // Start new fetches if there are pending batches and available slots.
409        loop {
410            if self.inflight_fetches.len() >= target_parallel_fetches {
411                break;
412            }
413            let Some(commit_range) = self.pending_fetches.pop_first() else {
414                break;
415            };
416            self.inflight_fetches
417                .spawn(Self::fetch_loop(self.inner.clone(), commit_range));
418        }
419
420        let metrics = &self.inner.context.metrics.node_metrics;
421        metrics
422            .commit_sync_inflight_fetches
423            .set(self.inflight_fetches.len() as i64);
424        metrics
425            .commit_sync_pending_fetches
426            .set(self.pending_fetches.len() as i64);
427        metrics
428            .commit_sync_highest_synced_index
429            .set(self.synced_commit_index as i64);
430    }
431
432    // Retries fetching commits and blocks from available authorities, until a request succeeds
433    // where at least a prefix of the commit range is fetched.
434    // Returns the fetched commits and blocks referenced by the commits.
435    async fn fetch_loop(
436        inner: Arc<Inner<VC, OC>>,
437        commit_range: CommitRange,
438    ) -> (CommitIndex, CertifiedCommits) {
439        // Individual request base timeout.
440        const TIMEOUT: Duration = Duration::from_secs(10);
441        // Max per-request timeout will be base timeout times a multiplier.
442        // At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks.
443        const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
444        // timeout * max number of targets should be reasonably small, so the
445        // system can adjust to slow network or large data sizes quickly.
446        const MAX_NUM_TARGETS: usize = 24;
447        let mut timeout_multiplier = 0;
448        let _timer = inner
449            .context
450            .metrics
451            .node_metrics
452            .commit_sync_fetch_loop_latency
453            .start_timer();
454        info!("Starting to fetch commits in {commit_range:?} ...",);
455        loop {
456            // Attempt to fetch commits and blocks through min(committee size, MAX_NUM_TARGETS) peers.
457            let mut target_authorities = inner
458                .context
459                .committee
460                .authorities()
461                .filter_map(|(i, _)| {
462                    if i != inner.context.own_index {
463                        Some(i)
464                    } else {
465                        None
466                    }
467                })
468                .collect_vec();
469            target_authorities.shuffle(&mut ThreadRng::default());
470            target_authorities.truncate(MAX_NUM_TARGETS);
471            // Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
472            timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
473            let request_timeout = TIMEOUT * timeout_multiplier;
474            // Give enough overall timeout for fetching commits and blocks.
475            // - Timeout for fetching commits and commit certifying blocks.
476            // - Timeout for fetching blocks referenced by the commits.
477            // - Time spent on pipelining requests to fetch blocks.
478            // - Another headroom to allow fetch_once() to timeout gracefully if possible.
479            let fetch_timeout = request_timeout * 4;
480            // Try fetching from selected target authority.
481            for authority in target_authorities {
482                match tokio::time::timeout(
483                    fetch_timeout,
484                    Self::fetch_once(
485                        inner.clone(),
486                        authority,
487                        commit_range.clone(),
488                        request_timeout,
489                    ),
490                )
491                .await
492                {
493                    Ok(Ok(commits)) => {
494                        info!("Finished fetching commits in {commit_range:?}",);
495                        return (commit_range.end(), commits);
496                    }
497                    Ok(Err(e)) => {
498                        let hostname = inner
499                            .context
500                            .committee
501                            .authority(authority)
502                            .hostname
503                            .clone();
504                        warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
505                        inner
506                            .context
507                            .metrics
508                            .node_metrics
509                            .commit_sync_fetch_once_errors
510                            .with_label_values(&[hostname.as_str(), e.name()])
511                            .inc();
512                    }
513                    Err(_) => {
514                        let hostname = inner
515                            .context
516                            .committee
517                            .authority(authority)
518                            .hostname
519                            .clone();
520                        warn!("Timed out fetching {commit_range:?} from {authority}",);
521                        inner
522                            .context
523                            .metrics
524                            .node_metrics
525                            .commit_sync_fetch_once_errors
526                            .with_label_values(&[hostname.as_str(), "FetchTimeout"])
527                            .inc();
528                    }
529                }
530            }
531            // Avoid busy looping, by waiting for a while before retrying.
532            sleep(TIMEOUT).await;
533        }
534    }
535
536    // Fetches commits and blocks from a single authority. At a high level, first the commits are
537    // fetched and verified. After that, blocks referenced in the certified commits are fetched
538    // and sent to Core for processing.
539    async fn fetch_once(
540        inner: Arc<Inner<VC, OC>>,
541        target_authority: AuthorityIndex,
542        commit_range: CommitRange,
543        timeout: Duration,
544    ) -> ConsensusResult<CertifiedCommits> {
545        let _timer = inner
546            .context
547            .metrics
548            .node_metrics
549            .commit_sync_fetch_once_latency
550            .start_timer();
551
552        // 1. Fetch commits in the commit range from the target authority.
553        let (serialized_commits, serialized_blocks) = inner
554            .network_client
555            .fetch_commits(
556                crate::network::PeerId::Validator(target_authority),
557                commit_range.clone(),
558                timeout,
559            )
560            .await?;
561
562        // 2. Verify the response contains blocks that can certify the last returned commit,
563        // and the returned commits are chained by digests, so earlier commits are certified
564        // as well.
565        let (commits, vote_blocks) = Handle::current()
566            .spawn_blocking({
567                let inner = inner.clone();
568                move || {
569                    inner.verify_commits(
570                        target_authority,
571                        commit_range,
572                        serialized_commits,
573                        serialized_blocks,
574                    )
575                }
576            })
577            .await
578            .expect("Spawn blocking should not fail")?;
579
580        // 3. Fetch blocks referenced by the commits, from the same peer where commits are fetched.
581        let mut block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
582        block_refs.sort();
583        let num_chunks = block_refs
584            .len()
585            .div_ceil(inner.context.parameters.max_blocks_per_fetch)
586            as u32;
587        let mut requests: FuturesOrdered<_> = block_refs
588            .chunks(inner.context.parameters.max_blocks_per_fetch)
589            .enumerate()
590            .map(|(i, request_block_refs)| {
591                let inner = inner.clone();
592                async move {
593                    // 4. Send out pipelined fetch requests to avoid overloading the target authority.
594                    sleep(timeout * i as u32 / num_chunks).await;
595                    // TODO: add some retries.
596                    let serialized_blocks = inner
597                        .network_client
598                        .fetch_blocks(
599                            crate::network::PeerId::Validator(target_authority),
600                            request_block_refs.to_vec(),
601                            vec![],
602                            false,
603                            timeout,
604                        )
605                        .await?;
606                    // 5. Verify the same number of blocks are returned as requested.
607                    if request_block_refs.len() != serialized_blocks.len() {
608                        return Err(ConsensusError::UnexpectedNumberOfBlocksFetched {
609                            authority: target_authority,
610                            requested: request_block_refs.len(),
611                            received: serialized_blocks.len(),
612                        });
613                    }
614                    // 6. Verify returned blocks have valid formats.
615                    let signed_blocks = serialized_blocks
616                        .iter()
617                        .map(|serialized| {
618                            let block: SignedBlock = bcs::from_bytes(serialized)
619                                .map_err(ConsensusError::MalformedBlock)?;
620                            Ok(block)
621                        })
622                        .collect::<ConsensusResult<Vec<_>>>()?;
623                    // 7. Verify the returned blocks match the requested block refs.
624                    // If they do match, the returned blocks can be considered verified as well.
625                    let mut blocks = Vec::new();
626                    for ((requested_block_ref, signed_block), serialized) in request_block_refs
627                        .iter()
628                        .zip(signed_blocks.into_iter())
629                        .zip(serialized_blocks.into_iter())
630                    {
631                        let signed_block_digest = VerifiedBlock::compute_digest(&serialized);
632                        let received_block_ref = BlockRef::new(
633                            signed_block.round(),
634                            signed_block.author(),
635                            signed_block_digest,
636                        );
637                        if *requested_block_ref != received_block_ref {
638                            return Err(ConsensusError::UnexpectedBlockForCommit {
639                                peer: target_authority,
640                                requested: *requested_block_ref,
641                                received: received_block_ref,
642                            });
643                        }
644                        blocks.push(VerifiedBlock::new_verified(signed_block, serialized));
645                    }
646                    Ok(blocks)
647                }
648            })
649            .collect();
650
651        let mut fetched_blocks = BTreeMap::new();
652        while let Some(result) = requests.next().await {
653            for block in result? {
654                fetched_blocks.insert(block.reference(), block);
655            }
656        }
657
658        // 8. Check if the block timestamps are lower than current time - this is for metrics only.
659        for block in fetched_blocks.values().chain(vote_blocks.iter()) {
660            let now_ms = inner.context.clock.timestamp_utc_ms();
661            let forward_drift = block.timestamp_ms().saturating_sub(now_ms);
662            if forward_drift == 0 {
663                continue;
664            };
665            let peer_hostname = &inner.context.committee.authority(target_authority).hostname;
666            inner
667                .context
668                .metrics
669                .node_metrics
670                .block_timestamp_drift_ms
671                .with_label_values(&[peer_hostname.as_str(), "commit_syncer"])
672                .inc_by(forward_drift);
673        }
674
675        // 9. Now create certified commits by assigning the blocks to each commit.
676        let mut certified_commits = Vec::new();
677        for commit in &commits {
678            let blocks = commit
679                .blocks()
680                .iter()
681                .map(|block_ref| {
682                    fetched_blocks
683                        .remove(block_ref)
684                        .expect("Block should exist")
685                })
686                .collect::<Vec<_>>();
687            certified_commits.push(CertifiedCommit::new_certified(commit.clone(), blocks));
688        }
689
690        // 10. Add blocks in certified commits to the transaction certifier.
691        for commit in &certified_commits {
692            for block in commit.blocks() {
693                // Only account for reject votes in the block, since they may vote on uncommitted
694                // blocks or transactions. It is unnecessary to vote on the committed blocks
695                // themselves.
696                if inner.context.protocol_config.transaction_voting_enabled() {
697                    inner
698                        .transaction_certifier
699                        .add_voted_blocks(vec![(block.clone(), vec![])]);
700                }
701            }
702        }
703
704        // 11. Record commit votes from the fetched blocks.
705        for commit in &certified_commits {
706            for block in commit.blocks() {
707                inner.commit_vote_monitor.observe_block(block);
708            }
709        }
710        for block in &vote_blocks {
711            inner.commit_vote_monitor.observe_block(block);
712        }
713
714        // 12. Update round tracker from the fetched blocks. For fetched blocks,
715        // excluded_ancestors are not available so we use an empty vector.
716        {
717            let mut tracker = inner.round_tracker.write();
718            // Update from commit blocks
719            for commit in &certified_commits {
720                for block in commit.blocks() {
721                    tracker.update_from_verified_block(&ExtendedBlock {
722                        block: block.clone(),
723                        excluded_ancestors: vec![],
724                    });
725                }
726            }
727            // Update from vote blocks
728            for block in &vote_blocks {
729                tracker.update_from_verified_block(&ExtendedBlock {
730                    block: block.clone(),
731                    excluded_ancestors: vec![],
732                });
733            }
734        }
735
736        Ok(CertifiedCommits::new(certified_commits, vote_blocks))
737    }
738
739    fn unhandled_commits_threshold(&self) -> CommitIndex {
740        self.inner.context.parameters.commit_sync_batch_size
741            * (self.inner.context.parameters.commit_sync_batches_ahead as u32)
742    }
743
744    #[cfg(test)]
745    fn pending_fetches(&self) -> BTreeSet<CommitRange> {
746        self.pending_fetches.clone()
747    }
748
749    #[cfg(test)]
750    fn fetched_ranges(&self) -> BTreeMap<CommitRange, CertifiedCommits> {
751        self.fetched_ranges.clone()
752    }
753
754    #[cfg(test)]
755    fn highest_scheduled_index(&self) -> Option<CommitIndex> {
756        self.highest_scheduled_index
757    }
758
759    #[cfg(test)]
760    fn highest_fetched_commit_index(&self) -> CommitIndex {
761        self.highest_fetched_commit_index
762    }
763
764    #[cfg(test)]
765    fn synced_commit_index(&self) -> CommitIndex {
766        self.synced_commit_index
767    }
768}
769
770struct Inner<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> {
771    context: Arc<Context>,
772    core_thread_dispatcher: Arc<dyn CoreThreadDispatcher>,
773    commit_vote_monitor: Arc<CommitVoteMonitor>,
774    commit_consumer_monitor: Arc<CommitConsumerMonitor>,
775    block_verifier: Arc<dyn BlockVerifier>,
776    transaction_certifier: TransactionCertifier,
777    round_tracker: Arc<RwLock<RoundTracker>>,
778    network_client: Arc<CommitSyncerClient<VC, OC>>,
779    dag_state: Arc<RwLock<DagState>>,
780}
781
782impl<VC: ValidatorNetworkClient, OC: ObserverNetworkClient> Inner<VC, OC> {
783    /// Verifies the commits and also certifies them using the provided vote blocks for the last commit. The
784    /// method returns the trusted commits and the votes as verified blocks.
785    fn verify_commits(
786        &self,
787        peer: AuthorityIndex,
788        commit_range: CommitRange,
789        serialized_commits: Vec<Bytes>,
790        serialized_vote_blocks: Vec<Bytes>,
791    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
792        // Parse and verify commits.
793        let mut commits = Vec::new();
794        for serialized in &serialized_commits {
795            let commit: Commit =
796                bcs::from_bytes(serialized).map_err(ConsensusError::MalformedCommit)?;
797            let digest = TrustedCommit::compute_digest(serialized);
798            if commits.is_empty() {
799                // start is inclusive, so first commit must be at the start index.
800                if commit.index() != commit_range.start() {
801                    return Err(ConsensusError::UnexpectedStartCommit {
802                        peer,
803                        start: commit_range.start(),
804                        commit: Box::new(commit),
805                    });
806                }
807            } else {
808                // Verify next commit increments index and references the previous digest.
809                let (last_commit_digest, last_commit): &(CommitDigest, Commit) =
810                    commits.last().unwrap();
811                if commit.index() != last_commit.index() + 1
812                    || &commit.previous_digest() != last_commit_digest
813                {
814                    return Err(ConsensusError::UnexpectedCommitSequence {
815                        peer,
816                        prev_commit: Box::new(last_commit.clone()),
817                        curr_commit: Box::new(commit),
818                    });
819                }
820            }
821            // Do not process more commits past the end index.
822            if commit.index() > commit_range.end() {
823                break;
824            }
825            commits.push((digest, commit));
826        }
827        let Some((end_commit_digest, end_commit)) = commits.last() else {
828            return Err(ConsensusError::NoCommitReceived { peer });
829        };
830
831        // Parse and verify blocks. Then accumulate votes on the end commit.
832        let end_commit_ref = CommitRef::new(end_commit.index(), *end_commit_digest);
833        let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
834        let mut vote_blocks = Vec::new();
835        for serialized in serialized_vote_blocks {
836            let block: SignedBlock =
837                bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
838            // Only block signatures need to be verified, to verify commit votes.
839            // But the blocks will be sent to Core, so they need to be fully verified.
840            let (block, reject_transaction_votes) =
841                self.block_verifier.verify_and_vote(block, serialized)?;
842            if self.context.protocol_config.transaction_voting_enabled() {
843                self.transaction_certifier
844                    .add_voted_blocks(vec![(block.clone(), reject_transaction_votes)]);
845            }
846            for vote in block.commit_votes() {
847                if *vote == end_commit_ref {
848                    stake_aggregator.add(block.author(), &self.context.committee);
849                }
850            }
851            vote_blocks.push(block);
852        }
853
854        // Check if the end commit has enough votes.
855        if !stake_aggregator.reached_threshold(&self.context.committee) {
856            return Err(ConsensusError::NotEnoughCommitVotes {
857                stake: stake_aggregator.stake(),
858                peer,
859                commit: Box::new(end_commit.clone()),
860            });
861        }
862
863        let trusted_commits = commits
864            .into_iter()
865            .zip(serialized_commits)
866            .map(|((_d, c), s)| TrustedCommit::new_trusted(c, s))
867            .collect();
868        Ok((trusted_commits, vote_blocks))
869    }
870}
871
872#[cfg(test)]
873mod tests {
874    use std::{sync::Arc, time::Duration};
875
876    use bytes::Bytes;
877    use consensus_config::{AuthorityIndex, Parameters};
878    use consensus_types::block::{BlockRef, Round};
879    use mysten_metrics::monitored_mpsc;
880    use parking_lot::RwLock;
881
882    use crate::{
883        CommitConsumerMonitor, CommitDigest, CommitRef,
884        block::{TestBlock, VerifiedBlock},
885        block_verifier::NoopBlockVerifier,
886        commit::CommitRange,
887        commit_syncer::CommitSyncer,
888        commit_vote_monitor::CommitVoteMonitor,
889        context::Context,
890        core_thread::MockCoreThreadDispatcher,
891        dag_state::DagState,
892        error::ConsensusResult,
893        network::{BlockStream, CommitSyncerClient, ObserverNetworkClient, ValidatorNetworkClient},
894        round_tracker::RoundTracker,
895        storage::mem_store::MemStore,
896        transaction_certifier::TransactionCertifier,
897    };
898
899    #[derive(Default)]
900    struct FakeNetworkClient {}
901
902    #[async_trait::async_trait]
903    impl ValidatorNetworkClient for FakeNetworkClient {
904        async fn subscribe_blocks(
905            &self,
906            _peer: AuthorityIndex,
907            _last_received: Round,
908            _timeout: Duration,
909        ) -> ConsensusResult<BlockStream> {
910            unimplemented!("Unimplemented")
911        }
912
913        async fn fetch_blocks(
914            &self,
915            _peer: AuthorityIndex,
916            _block_refs: Vec<BlockRef>,
917            _highest_accepted_rounds: Vec<Round>,
918            _breadth_first: bool,
919            _timeout: Duration,
920        ) -> ConsensusResult<Vec<Bytes>> {
921            unimplemented!("Unimplemented")
922        }
923
924        async fn fetch_commits(
925            &self,
926            _peer: AuthorityIndex,
927            _commit_range: CommitRange,
928            _timeout: Duration,
929        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
930            unimplemented!("Unimplemented")
931        }
932
933        async fn fetch_latest_blocks(
934            &self,
935            _peer: AuthorityIndex,
936            _authorities: Vec<AuthorityIndex>,
937            _timeout: Duration,
938        ) -> ConsensusResult<Vec<Bytes>> {
939            unimplemented!("Unimplemented")
940        }
941
942        async fn get_latest_rounds(
943            &self,
944            _peer: AuthorityIndex,
945            _timeout: Duration,
946        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
947            unimplemented!("Unimplemented")
948        }
949
950        #[cfg(test)]
951        async fn send_block(
952            &self,
953            _peer: AuthorityIndex,
954            _block: &VerifiedBlock,
955            _timeout: Duration,
956        ) -> ConsensusResult<()> {
957            unimplemented!("Unimplemented")
958        }
959    }
960
961    #[async_trait::async_trait]
962    impl ObserverNetworkClient for FakeNetworkClient {
963        async fn stream_blocks(
964            &self,
965            _peer: crate::network::PeerId,
966            _request_stream: crate::network::BlockRequestStream,
967            _timeout: Duration,
968        ) -> ConsensusResult<crate::network::ObserverBlockStream> {
969            unimplemented!("Unimplemented")
970        }
971
972        async fn fetch_blocks(
973            &self,
974            _peer: crate::network::PeerId,
975            _block_refs: Vec<BlockRef>,
976            _timeout: Duration,
977        ) -> ConsensusResult<Vec<Bytes>> {
978            unimplemented!("Unimplemented")
979        }
980
981        async fn fetch_commits(
982            &self,
983            _peer: crate::network::PeerId,
984            _commit_range: CommitRange,
985            _timeout: Duration,
986        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
987            unimplemented!("Unimplemented")
988        }
989    }
990
991    #[tokio::test(flavor = "current_thread", start_paused = true)]
992    async fn commit_syncer_start_and_pause_scheduling() {
993        // SETUP
994        let (context, _) = Context::new_for_test(4);
995        // Use smaller batches and fetch limits for testing.
996        let context = Context {
997            own_index: AuthorityIndex::new_for_test(3),
998            parameters: Parameters {
999                commit_sync_batch_size: 5,
1000                commit_sync_batches_ahead: 5,
1001                commit_sync_parallel_fetches: 5,
1002                max_blocks_per_fetch: 5,
1003                ..context.parameters
1004            },
1005            ..context
1006        };
1007        let context = Arc::new(context);
1008        let block_verifier = Arc::new(NoopBlockVerifier {});
1009        let core_thread_dispatcher = Arc::new(MockCoreThreadDispatcher::default());
1010        let mock_client = Arc::new(FakeNetworkClient::default());
1011        let network_client = Arc::new(CommitSyncerClient::new(
1012            context.clone(),
1013            Some(mock_client.clone()),
1014            Some(mock_client.clone()),
1015        ));
1016        let store = Arc::new(MemStore::new());
1017        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
1018        let (blocks_sender, _blocks_receiver) =
1019            monitored_mpsc::unbounded_channel("consensus_block_output");
1020        let transaction_certifier = TransactionCertifier::new(
1021            context.clone(),
1022            block_verifier.clone(),
1023            dag_state.clone(),
1024            blocks_sender,
1025        );
1026        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1027        let commit_consumer_monitor = Arc::new(CommitConsumerMonitor::new(0, 0));
1028        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1029        let mut commit_syncer = CommitSyncer::new(
1030            context,
1031            core_thread_dispatcher,
1032            commit_vote_monitor.clone(),
1033            commit_consumer_monitor.clone(),
1034            block_verifier,
1035            transaction_certifier,
1036            round_tracker,
1037            network_client,
1038            dag_state,
1039        );
1040
1041        // Check initial state.
1042        assert!(commit_syncer.pending_fetches().is_empty());
1043        assert!(commit_syncer.fetched_ranges().is_empty());
1044        assert!(commit_syncer.highest_scheduled_index().is_none());
1045        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1046        assert_eq!(commit_syncer.synced_commit_index(), 0);
1047
1048        // Observe round 15 blocks voting for commit 10 from authorities 0 to 2 in CommitVoteMonitor
1049        for i in 0..3 {
1050            let test_block = TestBlock::new(15, i)
1051                .set_commit_votes(vec![CommitRef::new(10, CommitDigest::MIN)])
1052                .build();
1053            let block = VerifiedBlock::new_for_test(test_block);
1054            commit_vote_monitor.observe_block(&block);
1055        }
1056
1057        // Fetches should be scheduled after seeing progress of other validators.
1058        commit_syncer.try_schedule_once();
1059
1060        // Verify state.
1061        assert_eq!(commit_syncer.pending_fetches().len(), 2);
1062        assert!(commit_syncer.fetched_ranges().is_empty());
1063        assert_eq!(commit_syncer.highest_scheduled_index(), Some(10));
1064        assert_eq!(commit_syncer.highest_fetched_commit_index(), 0);
1065        assert_eq!(commit_syncer.synced_commit_index(), 0);
1066
1067        // Observe round 40 blocks voting for commit 35 from authorities 0 to 2 in CommitVoteMonitor
1068        for i in 0..3 {
1069            let test_block = TestBlock::new(40, i)
1070                .set_commit_votes(vec![CommitRef::new(35, CommitDigest::MIN)])
1071                .build();
1072            let block = VerifiedBlock::new_for_test(test_block);
1073            commit_vote_monitor.observe_block(&block);
1074        }
1075
1076        // Fetches should be scheduled until the unhandled commits threshold.
1077        commit_syncer.try_schedule_once();
1078
1079        // Verify commit syncer is paused after scheduling 15 commits to index 25.
1080        assert_eq!(commit_syncer.unhandled_commits_threshold(), 25);
1081        assert_eq!(commit_syncer.highest_scheduled_index(), Some(25));
1082        let pending_fetches = commit_syncer.pending_fetches();
1083        assert_eq!(pending_fetches.len(), 5);
1084
1085        // Indicate commit index 25 is consumed, and try to schedule again.
1086        commit_consumer_monitor.set_highest_handled_commit(25);
1087        commit_syncer.try_schedule_once();
1088
1089        // Verify commit syncer schedules fetches up to index 35.
1090        assert_eq!(commit_syncer.highest_scheduled_index(), Some(35));
1091        let pending_fetches = commit_syncer.pending_fetches();
1092        assert_eq!(pending_fetches.len(), 7);
1093
1094        // Verify contiguous ranges are scheduled.
1095        for (range, start) in pending_fetches.iter().zip((1..35).step_by(5)) {
1096            assert_eq!(range.start(), start);
1097            assert_eq!(range.end(), start + 4);
1098        }
1099    }
1100}