consensus_core/
commit.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Ordering,
6    collections::BTreeMap,
7    fmt::{self, Debug, Display, Formatter},
8    hash::{Hash, Hasher},
9    ops::{Deref, Range, RangeInclusive},
10    sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22    block::{BlockAPI, Slot, VerifiedBlock},
23    context::Context,
24    leader_scoring::ReputationScores,
25    storage::Store,
26};
27
28/// Index of a commit among all consensus commits.
29pub type CommitIndex = u32;
30
31pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
32
33/// Default wave length for all committers. A longer wave length increases the
34/// chance of committing the leader under asynchrony at the cost of latency in
35/// the common case.
36// TODO: merge DEFAULT_WAVE_LENGTH and MINIMUM_WAVE_LENGTH into a single constant,
37// because we are unlikely to change them via config in the forseeable future.
38pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
39
40/// We need at least one leader round, one voting round, and one decision round.
41pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
42
43/// The consensus protocol operates in 'waves'. Each wave is composed of a leader
44/// round, at least one voting round, and one decision round.
45pub(crate) type WaveNumber = u32;
46
47/// [`Commit`] summarizes [`CommittedSubDag`] for storage and network communications.
48///
49/// Validators should be able to reconstruct a sequence of CommittedSubDag from the
50/// corresponding Commit and blocks referenced in the Commit.
51/// A field must meet these requirements to be added to Commit:
52/// - helps with recovery locally and for peers catching up.
53/// - cannot be derived from a sequence of Commits and other persisted values.
54///
55/// For example, transactions in blocks should not be included in Commit, because they can be
56/// retrieved from blocks specified in Commit. Last committed round per authority also should not
57/// be included, because it can be derived from the latest value in storage and the additional
58/// sequence of Commits.
59#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
60#[enum_dispatch(CommitAPI)]
61pub enum Commit {
62    V1(CommitV1),
63}
64
65impl Commit {
66    /// Create a new commit.
67    pub(crate) fn new(
68        index: CommitIndex,
69        previous_digest: CommitDigest,
70        timestamp_ms: BlockTimestampMs,
71        leader: BlockRef,
72        blocks: Vec<BlockRef>,
73    ) -> Self {
74        Commit::V1(CommitV1 {
75            index,
76            previous_digest,
77            timestamp_ms,
78            leader,
79            blocks,
80        })
81    }
82
83    pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
84        let bytes = bcs::to_bytes(self)?;
85        Ok(bytes.into())
86    }
87}
88
89/// Accessors to Commit info.
90#[enum_dispatch]
91pub trait CommitAPI {
92    fn round(&self) -> Round;
93    fn index(&self) -> CommitIndex;
94    fn previous_digest(&self) -> CommitDigest;
95    fn timestamp_ms(&self) -> BlockTimestampMs;
96    fn leader(&self) -> BlockRef;
97    fn blocks(&self) -> &[BlockRef];
98}
99
100/// Specifies one consensus commit.
101/// It is stored on disk, so it does not contain blocks which are stored individually.
102#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
103pub struct CommitV1 {
104    /// Index of the commit.
105    /// First commit after genesis has an index of 1, then every next commit has an index incremented by 1.
106    index: CommitIndex,
107    /// Digest of the previous commit.
108    /// Set to CommitDigest::MIN for the first commit after genesis.
109    previous_digest: CommitDigest,
110    /// Timestamp of the commit, max of the timestamp of the leader block and previous Commit timestamp.
111    timestamp_ms: BlockTimestampMs,
112    /// A reference to the commit leader.
113    leader: BlockRef,
114    /// Refs to committed blocks, in the commit order.
115    blocks: Vec<BlockRef>,
116}
117
118impl CommitAPI for CommitV1 {
119    fn round(&self) -> Round {
120        self.leader.round
121    }
122
123    fn index(&self) -> CommitIndex {
124        self.index
125    }
126
127    fn previous_digest(&self) -> CommitDigest {
128        self.previous_digest
129    }
130
131    fn timestamp_ms(&self) -> BlockTimestampMs {
132        self.timestamp_ms
133    }
134
135    fn leader(&self) -> BlockRef {
136        self.leader
137    }
138
139    fn blocks(&self) -> &[BlockRef] {
140        &self.blocks
141    }
142}
143
144/// A commit is trusted when it is produced locally or certified by a quorum of authorities.
145/// Blocks referenced by TrustedCommit are assumed to be valid.
146/// Only trusted Commit can be sent to execution.
147///
148/// Note: clone() is relatively cheap with the underlying data refcounted.
149#[derive(Clone, Debug, PartialEq)]
150pub struct TrustedCommit {
151    inner: Arc<Commit>,
152
153    // Cached digest and serialized value, to avoid re-computing these values.
154    digest: CommitDigest,
155    serialized: Bytes,
156}
157
158impl TrustedCommit {
159    pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
160        let digest = Self::compute_digest(&serialized);
161        Self {
162            inner: Arc::new(commit),
163            digest,
164            serialized,
165        }
166    }
167
168    pub(crate) fn new_for_test(
169        index: CommitIndex,
170        previous_digest: CommitDigest,
171        timestamp_ms: BlockTimestampMs,
172        leader: BlockRef,
173        blocks: Vec<BlockRef>,
174    ) -> Self {
175        let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
176        let serialized = commit.serialize().unwrap();
177        Self::new_trusted(commit, serialized)
178    }
179
180    pub(crate) fn reference(&self) -> CommitRef {
181        CommitRef {
182            index: self.index(),
183            digest: self.digest(),
184        }
185    }
186
187    pub(crate) fn digest(&self) -> CommitDigest {
188        self.digest
189    }
190
191    pub(crate) fn serialized(&self) -> &Bytes {
192        &self.serialized
193    }
194
195    pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
196        let mut hasher = DefaultHashFunction::new();
197        hasher.update(serialized);
198        CommitDigest(hasher.finalize().into())
199    }
200}
201
202/// Allow easy access on the underlying Commit.
203impl Deref for TrustedCommit {
204    type Target = Commit;
205
206    fn deref(&self) -> &Self::Target {
207        &self.inner
208    }
209}
210
211/// `CertifiedCommits` keeps the synchronized certified commits along with the corresponding votes received from the peer that provided these commits.
212/// The `votes` contain the blocks as those provided by the peer, and certify the tip of the synced commits.
213#[derive(Clone, Debug)]
214pub(crate) struct CertifiedCommits {
215    commits: Vec<CertifiedCommit>,
216    votes: Vec<VerifiedBlock>,
217}
218
219impl CertifiedCommits {
220    pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
221        Self { commits, votes }
222    }
223
224    pub(crate) fn commits(&self) -> &[CertifiedCommit] {
225        &self.commits
226    }
227
228    pub(crate) fn votes(&self) -> &[VerifiedBlock] {
229        &self.votes
230    }
231}
232
233/// A commit that has been synced and certified by a quorum of authorities.
234#[derive(Clone, Debug)]
235pub(crate) struct CertifiedCommit {
236    commit: Arc<TrustedCommit>,
237    blocks: Vec<VerifiedBlock>,
238}
239
240impl CertifiedCommit {
241    pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
242        Self {
243            commit: Arc::new(commit),
244            blocks,
245        }
246    }
247
248    pub fn blocks(&self) -> &[VerifiedBlock] {
249        &self.blocks
250    }
251}
252
253impl Deref for CertifiedCommit {
254    type Target = TrustedCommit;
255
256    fn deref(&self) -> &Self::Target {
257        &self.commit
258    }
259}
260
261/// Digest of a consensus commit.
262#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
263pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
264
265impl CommitDigest {
266    /// Lexicographic min & max digest.
267    pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
268    pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
269
270    pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
271        self.0
272    }
273}
274
275impl Hash for CommitDigest {
276    fn hash<H: Hasher>(&self, state: &mut H) {
277        state.write(&self.0[..8]);
278    }
279}
280
281impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
282    fn from(hd: CommitDigest) -> Self {
283        Digest::new(hd.0)
284    }
285}
286
287impl fmt::Display for CommitDigest {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
289        write!(
290            f,
291            "{}",
292            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
293                .get(0..4)
294                .ok_or(fmt::Error)?
295        )
296    }
297}
298
299impl fmt::Debug for CommitDigest {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
301        write!(
302            f,
303            "{}",
304            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
305        )
306    }
307}
308
309/// Uniquely identifies a commit with its index and digest.
310#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
311pub struct CommitRef {
312    pub index: CommitIndex,
313    pub digest: CommitDigest,
314}
315
316impl CommitRef {
317    pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
318        Self { index, digest }
319    }
320}
321
322impl fmt::Display for CommitRef {
323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
324        write!(f, "C{}({})", self.index, self.digest)
325    }
326}
327
328impl fmt::Debug for CommitRef {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
330        write!(f, "C{}({:?})", self.index, self.digest)
331    }
332}
333
334// Represents a vote on a Commit.
335pub type CommitVote = CommitRef;
336
337/// The output of consensus to execution is an ordered list of [`CommittedSubDag`].
338/// Each CommittedSubDag contains the information needed to execution transactions in
339/// the consensus commit.
340///
341/// The application processing CommittedSubDag can arbitrarily sort the blocks within
342/// each sub-dag (but using a deterministic algorithm).
343#[derive(Clone, PartialEq)]
344pub struct CommittedSubDag {
345    /// Set by Linearizer.
346    ///
347    /// A reference to the leader of the sub-dag
348    pub leader: BlockRef,
349    /// All the committed blocks that are part of this sub-dag
350    pub blocks: Vec<VerifiedBlock>,
351    /// The timestamp of the commit, obtained from the timestamp of the leader block.
352    pub timestamp_ms: BlockTimestampMs,
353    /// The reference of the commit.
354    /// First commit after genesis has a index of 1, then every next commit has a
355    /// index incremented by 1.
356    pub commit_ref: CommitRef,
357
358    /// Set by CommitObserver.
359    ///
360    /// Indicates whether the commit was decided locally based on the local DAG.
361    ///
362    /// If true, `CommitFinalizer` can then assume a quorum of certificates are available
363    /// for each transaction in the commit if there is no reject vote, and proceed with
364    /// optimistic finalization of transactions.
365    ///
366    /// If the commit was decided by `UniversalCommitter`, this must be true.
367    /// If the commit was received from a peer via `CommitSyncer`, this must be false.
368    /// There may not be enough blocks in local DAG to decide on the commit.
369    ///
370    /// For safety, a previously locally decided commit may be recovered after restarting as
371    /// non-local, if its finalization state was not persisted.
372    pub decided_with_local_blocks: bool,
373    /// Whether rejected transactions in this commit have been recovered from storage.
374    pub recovered_rejected_transactions: bool,
375    /// Optional scores that are provided as part of the consensus output to Sui
376    /// that can then be used by Sui for future submission to consensus.
377    pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
378
379    /// Set by CommitFinalizer.
380    ///
381    /// Indices of rejected transactions in each block.
382    pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
383    /// Used by consensus to communicate whether to always accept system transactions in this commit.
384    pub always_accept_system_transactions: bool,
385}
386
387impl CommittedSubDag {
388    /// Creates a new committed sub dag.
389    pub fn new(
390        leader: BlockRef,
391        blocks: Vec<VerifiedBlock>,
392        timestamp_ms: BlockTimestampMs,
393        commit_ref: CommitRef,
394        always_accept_system_transactions: bool,
395    ) -> Self {
396        Self {
397            leader,
398            blocks,
399            timestamp_ms,
400            commit_ref,
401            decided_with_local_blocks: true,
402            recovered_rejected_transactions: false,
403            reputation_scores_desc: vec![],
404            rejected_transactions_by_block: BTreeMap::new(),
405            always_accept_system_transactions,
406        }
407    }
408}
409
410// Sort the blocks of the sub-dag blocks by round number then authority index. Any
411// deterministic & stable algorithm works.
412pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
413    blocks.sort_by(|a, b| {
414        a.round()
415            .cmp(&b.round())
416            .then_with(|| a.author().cmp(&b.author()))
417    })
418}
419
420impl Display for CommittedSubDag {
421    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422        write!(
423            f,
424            "{}@{} [{}])",
425            self.commit_ref,
426            self.leader,
427            self.blocks
428                .iter()
429                .map(|b| b.reference().to_string())
430                .join(", ")
431        )
432    }
433}
434
435impl fmt::Debug for CommittedSubDag {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        write!(
438            f,
439            "{}@{} [{}])",
440            self.commit_ref,
441            self.leader,
442            self.blocks
443                .iter()
444                .map(|b| b.reference().to_string())
445                .join(", ")
446        )?;
447        write!(
448            f,
449            ";{}ms;rs{:?};{};{};[{}]",
450            self.timestamp_ms,
451            self.reputation_scores_desc,
452            self.decided_with_local_blocks,
453            self.recovered_rejected_transactions,
454            self.rejected_transactions_by_block
455                .iter()
456                .map(|(block_ref, transactions)| {
457                    format!("{}: {}, ", block_ref, transactions.len())
458                })
459                .collect::<Vec<_>>()
460                .join(", "),
461        )
462    }
463}
464
465// Recovers the full CommittedSubDag from block store, based on Commit.
466pub(crate) fn load_committed_subdag_from_store(
467    context: &Arc<Context>,
468    store: &dyn Store,
469    commit: TrustedCommit,
470    reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
471) -> CommittedSubDag {
472    let mut leader_block_idx = None;
473    let commit_blocks = store
474        .read_blocks(commit.blocks())
475        .expect("We should have the block referenced in the commit data");
476    let blocks = commit_blocks
477        .into_iter()
478        .enumerate()
479        .map(|(idx, commit_block_opt)| {
480            let commit_block =
481                commit_block_opt.expect("We should have the block referenced in the commit data");
482            if commit_block.reference() == commit.leader() {
483                leader_block_idx = Some(idx);
484            }
485            commit_block
486        })
487        .collect::<Vec<_>>();
488    let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
489    let leader_block_ref = blocks[leader_block_idx].reference();
490
491    let mut subdag = CommittedSubDag::new(
492        leader_block_ref,
493        blocks,
494        commit.timestamp_ms(),
495        commit.reference(),
496        context
497            .protocol_config
498            .consensus_always_accept_system_transactions(),
499    );
500
501    subdag.reputation_scores_desc = reputation_scores_desc;
502
503    let reject_votes = store
504        .read_rejected_transactions(commit.reference())
505        .unwrap();
506    if let Some(reject_votes) = reject_votes {
507        subdag.decided_with_local_blocks = true;
508        subdag.recovered_rejected_transactions = true;
509        subdag.rejected_transactions_by_block = reject_votes;
510    } else {
511        subdag.decided_with_local_blocks = false;
512        subdag.recovered_rejected_transactions = false;
513    }
514
515    subdag
516}
517
518#[derive(Debug, Clone, Copy, Eq, PartialEq)]
519pub(crate) enum Decision {
520    Direct,
521    Indirect,
522    Certified, // This is a commit certified leader so no commit decision was made locally.
523}
524
525/// The status of a leader slot from the direct and indirect commit rules.
526#[derive(Debug, Clone, PartialEq)]
527pub(crate) enum LeaderStatus {
528    Commit(VerifiedBlock),
529    Skip(Slot),
530    Undecided(Slot),
531}
532
533impl LeaderStatus {
534    pub(crate) fn round(&self) -> Round {
535        match self {
536            Self::Commit(block) => block.round(),
537            Self::Skip(leader) => leader.round,
538            Self::Undecided(leader) => leader.round,
539        }
540    }
541
542    pub(crate) fn is_decided(&self) -> bool {
543        match self {
544            Self::Commit(_) => true,
545            Self::Skip(_) => true,
546            Self::Undecided(_) => false,
547        }
548    }
549
550    pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
551        match self {
552            Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
553            Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
554            Self::Undecided(..) => None,
555        }
556    }
557}
558
559impl Display for LeaderStatus {
560    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561        match self {
562            Self::Commit(block) => write!(f, "Commit({})", block.reference()),
563            Self::Skip(slot) => write!(f, "Skip({slot})"),
564            Self::Undecided(slot) => write!(f, "Undecided({slot})"),
565        }
566    }
567}
568
569/// Decision of each leader slot.
570#[derive(Debug, Clone, PartialEq)]
571pub(crate) enum DecidedLeader {
572    /// The committed leader block and whether it is a direct commit.
573    /// It is incorrect to trigger the direct commit optimization when the commit is not.
574    /// So when it is unknown if the commit is direct, the boolean flag should be false.
575    Commit(VerifiedBlock, bool),
576    /// The skipped leader slot where no block is committed.
577    Skip(Slot),
578}
579
580impl DecidedLeader {
581    // Slot where the leader is decided.
582    pub(crate) fn slot(&self) -> Slot {
583        match self {
584            Self::Commit(block, _direct) => block.reference().into(),
585            Self::Skip(slot) => *slot,
586        }
587    }
588
589    // Converts to committed block if the decision is to commit. Returns None otherwise.
590    pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
591        match self {
592            Self::Commit(block, _direct) => Some(block),
593            Self::Skip(_) => None,
594        }
595    }
596
597    #[cfg(test)]
598    pub(crate) fn round(&self) -> Round {
599        match self {
600            Self::Commit(block, _direct) => block.round(),
601            Self::Skip(leader) => leader.round,
602        }
603    }
604
605    #[cfg(test)]
606    pub(crate) fn authority(&self) -> AuthorityIndex {
607        match self {
608            Self::Commit(block, _direct) => block.author(),
609            Self::Skip(leader) => leader.authority,
610        }
611    }
612}
613
614impl Display for DecidedLeader {
615    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616        match self {
617            Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
618            Self::Skip(slot) => write!(f, "Skip({slot})"),
619        }
620    }
621}
622
623/// Per-commit properties that can be regenerated from past values, and do not need to be part of
624/// the Commit struct.
625/// Only the latest version is needed for recovery, but more versions are stored for debugging,
626/// and potentially restoring from an earlier state.
627// TODO: version this struct.
628#[derive(Clone, Debug, Serialize, Deserialize)]
629pub struct CommitInfo {
630    pub(crate) committed_rounds: Vec<Round>,
631    pub(crate) reputation_scores: ReputationScores,
632}
633
634/// `CommitRange` stores a range of `CommitIndex`. The range contains the start (inclusive)
635/// and end (inclusive) commit indices and can be ordered for use as the key of a table.
636///
637/// NOTE: using `Range<CommitIndex>` for internal representation for backward compatibility.
638/// The external semantics of `CommitRange` is closer to `RangeInclusive<CommitIndex>`.
639#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
640pub struct CommitRange(Range<CommitIndex>);
641
642impl CommitRange {
643    pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
644        // When end is CommitIndex::MAX, the range can be considered as unbounded
645        // so it is ok to saturate at the end.
646        Self(*range.start()..(*range.end()).saturating_add(1))
647    }
648
649    // Inclusive
650    pub fn start(&self) -> CommitIndex {
651        self.0.start
652    }
653
654    // Inclusive
655    pub fn end(&self) -> CommitIndex {
656        self.0.end.saturating_sub(1)
657    }
658
659    pub fn extend_to(&mut self, other: CommitIndex) {
660        let new_end = other.saturating_add(1);
661        assert!(self.0.end <= new_end);
662        self.0 = self.0.start..new_end;
663    }
664
665    pub fn size(&self) -> usize {
666        self.0
667            .end
668            .checked_sub(self.0.start)
669            .expect("Range should never have end < start") as usize
670    }
671
672    /// Check whether the two ranges have the same size.
673    pub fn is_equal_size(&self, other: &Self) -> bool {
674        self.size() == other.size()
675    }
676
677    /// Check if the provided range is sequentially after this range.
678    pub fn is_next_range(&self, other: &Self) -> bool {
679        self.0.end == other.0.start
680    }
681}
682
683impl Ord for CommitRange {
684    fn cmp(&self, other: &Self) -> Ordering {
685        self.start()
686            .cmp(&other.start())
687            .then_with(|| self.end().cmp(&other.end()))
688    }
689}
690
691impl PartialOrd for CommitRange {
692    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693        Some(self.cmp(other))
694    }
695}
696
697impl From<RangeInclusive<CommitIndex>> for CommitRange {
698    fn from(range: RangeInclusive<CommitIndex>) -> Self {
699        Self::new(range)
700    }
701}
702
703/// Display CommitRange as an inclusive range.
704impl Debug for CommitRange {
705    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
706        write!(f, "CommitRange({}..={})", self.start(), self.end())
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use std::sync::Arc;
713
714    use super::*;
715    use crate::{
716        block::TestBlock,
717        context::Context,
718        storage::{WriteBatch, mem_store::MemStore},
719    };
720
721    #[tokio::test]
722    async fn test_new_subdag_from_commit() {
723        let store = Arc::new(MemStore::new());
724        let context = Arc::new(Context::new_for_test(4).0);
725        let wave_length = DEFAULT_WAVE_LENGTH;
726
727        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
728        let first_wave_rounds: u32 = wave_length;
729        let num_authorities: u32 = 4;
730
731        let mut blocks = Vec::new();
732        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
733            .committee
734            .authorities()
735            .map(|index| {
736                let author_idx = index.0.value() as u32;
737                let block = TestBlock::new(0, author_idx).build();
738                VerifiedBlock::new_for_test(block)
739            })
740            .map(|block| (block.reference(), block))
741            .unzip();
742        // TODO: avoid writing genesis blocks?
743        store.write(WriteBatch::default().blocks(genesis)).unwrap();
744        blocks.append(&mut genesis_references.clone());
745
746        let mut ancestors = genesis_references;
747        let mut leader = None;
748        for round in 1..=first_wave_rounds {
749            let mut new_ancestors = vec![];
750            for author in 0..num_authorities {
751                let base_ts = round as BlockTimestampMs * 1000;
752                let block = VerifiedBlock::new_for_test(
753                    TestBlock::new(round, author)
754                        .set_timestamp_ms(base_ts + (author + round) as u64)
755                        .set_ancestors(ancestors.clone())
756                        .build(),
757                );
758                store
759                    .write(WriteBatch::default().blocks(vec![block.clone()]))
760                    .unwrap();
761                new_ancestors.push(block.reference());
762                blocks.push(block.reference());
763
764                // only write one block for the final round, which is the leader
765                // of the committed subdag.
766                if round == first_wave_rounds {
767                    leader = Some(block.clone());
768                    break;
769                }
770            }
771            ancestors = new_ancestors;
772        }
773
774        let leader_block = leader.unwrap();
775        let leader_ref = leader_block.reference();
776        let commit_index = 1;
777        let commit = TrustedCommit::new_for_test(
778            commit_index,
779            CommitDigest::MIN,
780            leader_block.timestamp_ms(),
781            leader_ref,
782            blocks.clone(),
783        );
784        let subdag =
785            load_committed_subdag_from_store(&context, store.as_ref(), commit.clone(), vec![]);
786        assert_eq!(subdag.leader, leader_ref);
787        assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
788        assert_eq!(
789            subdag.blocks.len(),
790            (num_authorities * wave_length) as usize + 1
791        );
792        assert_eq!(subdag.commit_ref, commit.reference());
793        assert_eq!(subdag.reputation_scores_desc, vec![]);
794    }
795
796    #[tokio::test]
797    async fn test_commit_range() {
798        telemetry_subscribers::init_for_testing();
799        let mut range1 = CommitRange::new(1..=5);
800        let range2 = CommitRange::new(2..=6);
801        let range3 = CommitRange::new(5..=10);
802        let range4 = CommitRange::new(6..=10);
803        let range5 = CommitRange::new(6..=9);
804        let range6 = CommitRange::new(1..=1);
805
806        assert_eq!(range1.start(), 1);
807        assert_eq!(range1.end(), 5);
808
809        // Test range size
810        assert_eq!(range1.size(), 5);
811        assert_eq!(range3.size(), 6);
812        assert_eq!(range6.size(), 1);
813
814        // Test next range check
815        assert!(!range1.is_next_range(&range2));
816        assert!(!range1.is_next_range(&range3));
817        assert!(range1.is_next_range(&range4));
818        assert!(range1.is_next_range(&range5));
819
820        // Test equal size range check
821        assert!(range1.is_equal_size(&range2));
822        assert!(!range1.is_equal_size(&range3));
823        assert!(range1.is_equal_size(&range4));
824        assert!(!range1.is_equal_size(&range5));
825
826        // Test range ordering
827        assert!(range1 < range2);
828        assert!(range2 < range3);
829        assert!(range3 < range4);
830        assert!(range5 < range4);
831
832        // Test extending range
833        range1.extend_to(10);
834        assert_eq!(range1.start(), 1);
835        assert_eq!(range1.end(), 10);
836        assert_eq!(range1.size(), 10);
837
838        range1.extend_to(20);
839        assert_eq!(range1.start(), 1);
840        assert_eq!(range1.end(), 20);
841        assert_eq!(range1.size(), 20);
842    }
843}