consensus_core/
commit.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Ordering,
6    collections::BTreeMap,
7    fmt::{self, Debug, Display, Formatter},
8    hash::{Hash, Hasher},
9    ops::{Deref, Range, RangeInclusive},
10    sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22    block::{BlockAPI, Slot, VerifiedBlock},
23    leader_scoring::ReputationScores,
24    storage::Store,
25};
26
27/// Index of a commit among all consensus commits.
28pub type CommitIndex = u32;
29
30pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
31
32/// Default wave length for all committers. A longer wave length increases the
33/// chance of committing the leader under asynchrony at the cost of latency in
34/// the common case.
35// TODO: merge DEFAULT_WAVE_LENGTH and MINIMUM_WAVE_LENGTH into a single constant,
36// because we are unlikely to change them via config in the forseeable future.
37pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
38
39/// We need at least one leader round, one voting round, and one decision round.
40pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
41
42/// The consensus protocol operates in 'waves'. Each wave is composed of a leader
43/// round, at least one voting round, and one decision round.
44pub(crate) type WaveNumber = u32;
45
46/// [`Commit`] summarizes [`CommittedSubDag`] for storage and network communications.
47///
48/// Validators should be able to reconstruct a sequence of CommittedSubDag from the
49/// corresponding Commit and blocks referenced in the Commit.
50/// A field must meet these requirements to be added to Commit:
51/// - helps with recovery locally and for peers catching up.
52/// - cannot be derived from a sequence of Commits and other persisted values.
53///
54/// For example, transactions in blocks should not be included in Commit, because they can be
55/// retrieved from blocks specified in Commit. Last committed round per authority also should not
56/// be included, because it can be derived from the latest value in storage and the additional
57/// sequence of Commits.
58#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
59#[enum_dispatch(CommitAPI)]
60pub enum Commit {
61    V1(CommitV1),
62}
63
64impl Commit {
65    /// Create a new commit.
66    pub(crate) fn new(
67        index: CommitIndex,
68        previous_digest: CommitDigest,
69        timestamp_ms: BlockTimestampMs,
70        leader: BlockRef,
71        blocks: Vec<BlockRef>,
72    ) -> Self {
73        Commit::V1(CommitV1 {
74            index,
75            previous_digest,
76            timestamp_ms,
77            leader,
78            blocks,
79        })
80    }
81
82    pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
83        let bytes = bcs::to_bytes(self)?;
84        Ok(bytes.into())
85    }
86}
87
88/// Accessors to Commit info.
89#[enum_dispatch]
90pub trait CommitAPI {
91    fn round(&self) -> Round;
92    fn index(&self) -> CommitIndex;
93    fn previous_digest(&self) -> CommitDigest;
94    fn timestamp_ms(&self) -> BlockTimestampMs;
95    fn leader(&self) -> BlockRef;
96    fn blocks(&self) -> &[BlockRef];
97}
98
99/// Specifies one consensus commit.
100/// It is stored on disk, so it does not contain blocks which are stored individually.
101#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
102pub struct CommitV1 {
103    /// Index of the commit.
104    /// First commit after genesis has an index of 1, then every next commit has an index incremented by 1.
105    index: CommitIndex,
106    /// Digest of the previous commit.
107    /// Set to CommitDigest::MIN for the first commit after genesis.
108    previous_digest: CommitDigest,
109    /// Timestamp of the commit, max of the timestamp of the leader block and previous Commit timestamp.
110    timestamp_ms: BlockTimestampMs,
111    /// A reference to the commit leader.
112    leader: BlockRef,
113    /// Refs to committed blocks, in the commit order.
114    blocks: Vec<BlockRef>,
115}
116
117impl CommitAPI for CommitV1 {
118    fn round(&self) -> Round {
119        self.leader.round
120    }
121
122    fn index(&self) -> CommitIndex {
123        self.index
124    }
125
126    fn previous_digest(&self) -> CommitDigest {
127        self.previous_digest
128    }
129
130    fn timestamp_ms(&self) -> BlockTimestampMs {
131        self.timestamp_ms
132    }
133
134    fn leader(&self) -> BlockRef {
135        self.leader
136    }
137
138    fn blocks(&self) -> &[BlockRef] {
139        &self.blocks
140    }
141}
142
143/// A commit is trusted when it is produced locally or certified by a quorum of authorities.
144/// Blocks referenced by TrustedCommit are assumed to be valid.
145/// Only trusted Commit can be sent to execution.
146///
147/// Note: clone() is relatively cheap with the underlying data refcounted.
148#[derive(Clone, Debug, PartialEq)]
149pub struct TrustedCommit {
150    inner: Arc<Commit>,
151
152    // Cached digest and serialized value, to avoid re-computing these values.
153    digest: CommitDigest,
154    serialized: Bytes,
155}
156
157impl TrustedCommit {
158    pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
159        let digest = Self::compute_digest(&serialized);
160        Self {
161            inner: Arc::new(commit),
162            digest,
163            serialized,
164        }
165    }
166
167    pub(crate) fn new_for_test(
168        index: CommitIndex,
169        previous_digest: CommitDigest,
170        timestamp_ms: BlockTimestampMs,
171        leader: BlockRef,
172        blocks: Vec<BlockRef>,
173    ) -> Self {
174        let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
175        let serialized = commit.serialize().unwrap();
176        Self::new_trusted(commit, serialized)
177    }
178
179    pub(crate) fn reference(&self) -> CommitRef {
180        CommitRef {
181            index: self.index(),
182            digest: self.digest(),
183        }
184    }
185
186    pub(crate) fn digest(&self) -> CommitDigest {
187        self.digest
188    }
189
190    pub(crate) fn serialized(&self) -> &Bytes {
191        &self.serialized
192    }
193
194    pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
195        let mut hasher = DefaultHashFunction::new();
196        hasher.update(serialized);
197        CommitDigest(hasher.finalize().into())
198    }
199}
200
201/// Allow easy access on the underlying Commit.
202impl Deref for TrustedCommit {
203    type Target = Commit;
204
205    fn deref(&self) -> &Self::Target {
206        &self.inner
207    }
208}
209
210/// `CertifiedCommits` keeps the synchronized certified commits along with the corresponding votes received from the peer that provided these commits.
211/// The `votes` contain the blocks as those provided by the peer, and certify the tip of the synced commits.
212#[derive(Clone, Debug)]
213pub(crate) struct CertifiedCommits {
214    commits: Vec<CertifiedCommit>,
215    votes: Vec<VerifiedBlock>,
216}
217
218impl CertifiedCommits {
219    pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
220        Self { commits, votes }
221    }
222
223    pub(crate) fn commits(&self) -> &[CertifiedCommit] {
224        &self.commits
225    }
226
227    pub(crate) fn votes(&self) -> &[VerifiedBlock] {
228        &self.votes
229    }
230}
231
232/// A commit that has been synced and certified by a quorum of authorities.
233#[derive(Clone, Debug)]
234pub(crate) struct CertifiedCommit {
235    commit: Arc<TrustedCommit>,
236    blocks: Vec<VerifiedBlock>,
237}
238
239impl CertifiedCommit {
240    pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
241        Self {
242            commit: Arc::new(commit),
243            blocks,
244        }
245    }
246
247    pub fn blocks(&self) -> &[VerifiedBlock] {
248        &self.blocks
249    }
250}
251
252impl Deref for CertifiedCommit {
253    type Target = TrustedCommit;
254
255    fn deref(&self) -> &Self::Target {
256        &self.commit
257    }
258}
259
260/// Digest of a consensus commit.
261#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
262pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
263
264impl CommitDigest {
265    /// Lexicographic min & max digest.
266    pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
267    pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
268
269    pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
270        self.0
271    }
272}
273
274impl Hash for CommitDigest {
275    fn hash<H: Hasher>(&self, state: &mut H) {
276        state.write(&self.0[..8]);
277    }
278}
279
280impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
281    fn from(hd: CommitDigest) -> Self {
282        Digest::new(hd.0)
283    }
284}
285
286impl fmt::Display for CommitDigest {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
288        write!(
289            f,
290            "{}",
291            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
292                .get(0..4)
293                .ok_or(fmt::Error)?
294        )
295    }
296}
297
298impl fmt::Debug for CommitDigest {
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
300        write!(
301            f,
302            "{}",
303            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
304        )
305    }
306}
307
308/// Uniquely identifies a commit with its index and digest.
309#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
310pub struct CommitRef {
311    pub index: CommitIndex,
312    pub digest: CommitDigest,
313}
314
315impl CommitRef {
316    pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
317        Self { index, digest }
318    }
319}
320
321impl fmt::Display for CommitRef {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
323        write!(f, "C{}({})", self.index, self.digest)
324    }
325}
326
327impl fmt::Debug for CommitRef {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
329        write!(f, "C{}({:?})", self.index, self.digest)
330    }
331}
332
333// Represents a vote on a Commit.
334pub type CommitVote = CommitRef;
335
336/// The output of consensus to execution is an ordered list of [`CommittedSubDag`].
337/// Each CommittedSubDag contains the information needed to execution transactions in
338/// the consensus commit.
339///
340/// The application processing CommittedSubDag can arbitrarily sort the blocks within
341/// each sub-dag (but using a deterministic algorithm).
342#[derive(Clone, PartialEq)]
343pub struct CommittedSubDag {
344    /// Set by Linearizer.
345    ///
346    /// A reference to the leader of the sub-dag
347    pub leader: BlockRef,
348    /// All the committed blocks that are part of this sub-dag
349    pub blocks: Vec<VerifiedBlock>,
350    /// The timestamp of the commit, obtained from the timestamp of the leader block.
351    pub timestamp_ms: BlockTimestampMs,
352    /// The reference of the commit.
353    /// First commit after genesis has a index of 1, then every next commit has a
354    /// index incremented by 1.
355    pub commit_ref: CommitRef,
356
357    /// Set by CommitObserver.
358    ///
359    /// Indicates whether the commit was decided locally based on the local DAG.
360    ///
361    /// If true, `CommitFinalizer` can then assume a quorum of certificates are available
362    /// for each transaction in the commit if there is no reject vote, and proceed with
363    /// optimistic finalization of transactions.
364    ///
365    /// If the commit was decided by `UniversalCommitter`, this must be true.
366    /// If the commit was received from a peer via `CommitSyncer`, this must be false.
367    /// There may not be enough blocks in local DAG to decide on the commit.
368    ///
369    /// For safety, a previously locally decided commit may be recovered after restarting as
370    /// non-local, if its finalization state was not persisted.
371    pub decided_with_local_blocks: bool,
372    /// Whether rejected transactions in this commit have been recovered from storage.
373    pub recovered_rejected_transactions: bool,
374
375    /// Set by CommitFinalizer.
376    ///
377    /// Indices of rejected transactions in each block.
378    pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
379}
380
381impl CommittedSubDag {
382    /// Creates a new committed sub dag.
383    pub fn new(
384        leader: BlockRef,
385        blocks: Vec<VerifiedBlock>,
386        timestamp_ms: BlockTimestampMs,
387        commit_ref: CommitRef,
388    ) -> Self {
389        Self {
390            leader,
391            blocks,
392            timestamp_ms,
393            commit_ref,
394            decided_with_local_blocks: true,
395            recovered_rejected_transactions: false,
396            rejected_transactions_by_block: BTreeMap::new(),
397        }
398    }
399}
400
401// Sort the blocks of the sub-dag blocks by round number then authority index. Any
402// deterministic & stable algorithm works.
403pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
404    blocks.sort_by(|a, b| {
405        a.round()
406            .cmp(&b.round())
407            .then_with(|| a.author().cmp(&b.author()))
408    })
409}
410
411impl Display for CommittedSubDag {
412    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
413        write!(
414            f,
415            "{}@{} [{}])",
416            self.commit_ref,
417            self.leader,
418            self.blocks
419                .iter()
420                .map(|b| b.reference().to_string())
421                .join(", ")
422        )
423    }
424}
425
426impl fmt::Debug for CommittedSubDag {
427    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428        write!(
429            f,
430            "{}@{} [{}])",
431            self.commit_ref,
432            self.leader,
433            self.blocks
434                .iter()
435                .map(|b| b.reference().to_string())
436                .join(", ")
437        )?;
438        write!(
439            f,
440            ";{}ms;{};{};[{}]",
441            self.timestamp_ms,
442            self.decided_with_local_blocks,
443            self.recovered_rejected_transactions,
444            self.rejected_transactions_by_block
445                .iter()
446                .map(|(block_ref, transactions)| {
447                    format!("{}: {}, ", block_ref, transactions.len())
448                })
449                .collect::<Vec<_>>()
450                .join(", "),
451        )
452    }
453}
454
455// Recovers the full CommittedSubDag from block store, based on Commit.
456pub(crate) fn load_committed_subdag_from_store(
457    store: &dyn Store,
458    commit: TrustedCommit,
459) -> CommittedSubDag {
460    let mut leader_block_idx = None;
461    let commit_blocks = store
462        .read_blocks(commit.blocks())
463        .expect("We should have the block referenced in the commit data");
464    let blocks = commit_blocks
465        .into_iter()
466        .enumerate()
467        .map(|(idx, commit_block_opt)| {
468            let commit_block =
469                commit_block_opt.expect("We should have the block referenced in the commit data");
470            if commit_block.reference() == commit.leader() {
471                leader_block_idx = Some(idx);
472            }
473            commit_block
474        })
475        .collect::<Vec<_>>();
476    let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
477    let leader_block_ref = blocks[leader_block_idx].reference();
478
479    let mut subdag = CommittedSubDag::new(
480        leader_block_ref,
481        blocks,
482        commit.timestamp_ms(),
483        commit.reference(),
484    );
485
486    let reject_votes = store
487        .read_rejected_transactions(commit.reference())
488        .unwrap();
489    if let Some(reject_votes) = reject_votes {
490        subdag.decided_with_local_blocks = true;
491        subdag.recovered_rejected_transactions = true;
492        subdag.rejected_transactions_by_block = reject_votes;
493    } else {
494        subdag.decided_with_local_blocks = false;
495        subdag.recovered_rejected_transactions = false;
496    }
497
498    subdag
499}
500
501#[derive(Debug, Clone, Copy, Eq, PartialEq)]
502pub(crate) enum Decision {
503    Direct,
504    Indirect,
505    Certified, // This is a commit certified leader so no commit decision was made locally.
506}
507
508/// The status of a leader slot from the direct and indirect commit rules.
509#[derive(Debug, Clone, PartialEq)]
510pub(crate) enum LeaderStatus {
511    Commit(VerifiedBlock),
512    Skip(Slot),
513    Undecided(Slot),
514}
515
516impl LeaderStatus {
517    pub(crate) fn round(&self) -> Round {
518        match self {
519            Self::Commit(block) => block.round(),
520            Self::Skip(leader) => leader.round,
521            Self::Undecided(leader) => leader.round,
522        }
523    }
524
525    pub(crate) fn is_decided(&self) -> bool {
526        match self {
527            Self::Commit(_) => true,
528            Self::Skip(_) => true,
529            Self::Undecided(_) => false,
530        }
531    }
532
533    pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
534        match self {
535            Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
536            Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
537            Self::Undecided(..) => None,
538        }
539    }
540}
541
542impl Display for LeaderStatus {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        match self {
545            Self::Commit(block) => write!(f, "Commit({})", block.reference()),
546            Self::Skip(slot) => write!(f, "Skip({slot})"),
547            Self::Undecided(slot) => write!(f, "Undecided({slot})"),
548        }
549    }
550}
551
552/// Decision of each leader slot.
553#[derive(Debug, Clone, PartialEq)]
554pub(crate) enum DecidedLeader {
555    /// The committed leader block and whether it is a direct commit.
556    /// It is incorrect to trigger the direct commit optimization when the commit is not.
557    /// So when it is unknown if the commit is direct, the boolean flag should be false.
558    Commit(VerifiedBlock, bool),
559    /// The skipped leader slot where no block is committed.
560    Skip(Slot),
561}
562
563impl DecidedLeader {
564    // Slot where the leader is decided.
565    pub(crate) fn slot(&self) -> Slot {
566        match self {
567            Self::Commit(block, _direct) => block.reference().into(),
568            Self::Skip(slot) => *slot,
569        }
570    }
571
572    // Converts to committed block if the decision is to commit. Returns None otherwise.
573    pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
574        match self {
575            Self::Commit(block, _direct) => Some(block),
576            Self::Skip(_) => None,
577        }
578    }
579
580    #[cfg(test)]
581    pub(crate) fn round(&self) -> Round {
582        match self {
583            Self::Commit(block, _direct) => block.round(),
584            Self::Skip(leader) => leader.round,
585        }
586    }
587
588    #[cfg(test)]
589    pub(crate) fn authority(&self) -> consensus_config::AuthorityIndex {
590        match self {
591            Self::Commit(block, _direct) => block.author(),
592            Self::Skip(leader) => leader.authority,
593        }
594    }
595}
596
597impl Display for DecidedLeader {
598    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599        match self {
600            Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
601            Self::Skip(slot) => write!(f, "Skip({slot})"),
602        }
603    }
604}
605
606/// Per-commit properties that can be regenerated from past values, and do not need to be part of
607/// the Commit struct.
608/// Only the latest version is needed for recovery, but more versions are stored for debugging,
609/// and potentially restoring from an earlier state.
610// TODO: version this struct.
611#[derive(Clone, Debug, Serialize, Deserialize)]
612pub struct CommitInfo {
613    pub(crate) committed_rounds: Vec<Round>,
614    pub(crate) reputation_scores: ReputationScores,
615}
616
617/// `CommitRange` stores a range of `CommitIndex`. The range contains the start (inclusive)
618/// and end (inclusive) commit indices and can be ordered for use as the key of a table.
619///
620/// NOTE: using `Range<CommitIndex>` for internal representation for backward compatibility.
621/// The external semantics of `CommitRange` is closer to `RangeInclusive<CommitIndex>`.
622#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
623pub struct CommitRange(Range<CommitIndex>);
624
625impl CommitRange {
626    pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
627        // When end is CommitIndex::MAX, the range can be considered as unbounded
628        // so it is ok to saturate at the end.
629        Self(*range.start()..(*range.end()).saturating_add(1))
630    }
631
632    // Inclusive
633    pub fn start(&self) -> CommitIndex {
634        self.0.start
635    }
636
637    // Inclusive
638    pub fn end(&self) -> CommitIndex {
639        self.0.end.saturating_sub(1)
640    }
641
642    pub fn extend_to(&mut self, other: CommitIndex) {
643        let new_end = other.saturating_add(1);
644        assert!(self.0.end <= new_end);
645        self.0 = self.0.start..new_end;
646    }
647
648    pub fn size(&self) -> usize {
649        self.0
650            .end
651            .checked_sub(self.0.start)
652            .expect("Range should never have end < start") as usize
653    }
654
655    /// Check whether the two ranges have the same size.
656    pub fn is_equal_size(&self, other: &Self) -> bool {
657        self.size() == other.size()
658    }
659
660    /// Check if the provided range is sequentially after this range.
661    pub fn is_next_range(&self, other: &Self) -> bool {
662        self.0.end == other.0.start
663    }
664}
665
666impl Ord for CommitRange {
667    fn cmp(&self, other: &Self) -> Ordering {
668        self.start()
669            .cmp(&other.start())
670            .then_with(|| self.end().cmp(&other.end()))
671    }
672}
673
674impl PartialOrd for CommitRange {
675    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
676        Some(self.cmp(other))
677    }
678}
679
680impl From<RangeInclusive<CommitIndex>> for CommitRange {
681    fn from(range: RangeInclusive<CommitIndex>) -> Self {
682        Self::new(range)
683    }
684}
685
686/// Display CommitRange as an inclusive range.
687impl Debug for CommitRange {
688    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
689        write!(f, "CommitRange({}..={})", self.start(), self.end())
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use std::sync::Arc;
696
697    use super::*;
698    use crate::{
699        block::TestBlock,
700        context::Context,
701        storage::{WriteBatch, mem_store::MemStore},
702    };
703
704    #[tokio::test]
705    async fn test_new_subdag_from_commit() {
706        let store = Arc::new(MemStore::new());
707        let context = Arc::new(Context::new_for_test(4).0);
708        let wave_length = DEFAULT_WAVE_LENGTH;
709
710        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
711        let first_wave_rounds: u32 = wave_length;
712        let num_authorities: u32 = 4;
713
714        let mut blocks = Vec::new();
715        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
716            .committee
717            .authorities()
718            .map(|index| {
719                let author_idx = index.0.value() as u32;
720                let block = TestBlock::new(0, author_idx).build();
721                VerifiedBlock::new_for_test(block)
722            })
723            .map(|block| (block.reference(), block))
724            .unzip();
725        // TODO: avoid writing genesis blocks?
726        store.write(WriteBatch::default().blocks(genesis)).unwrap();
727        blocks.append(&mut genesis_references.clone());
728
729        let mut ancestors = genesis_references;
730        let mut leader = None;
731        for round in 1..=first_wave_rounds {
732            let mut new_ancestors = vec![];
733            for author in 0..num_authorities {
734                let base_ts = round as BlockTimestampMs * 1000;
735                let block = VerifiedBlock::new_for_test(
736                    TestBlock::new(round, author)
737                        .set_timestamp_ms(base_ts + (author + round) as u64)
738                        .set_ancestors(ancestors.clone())
739                        .build(),
740                );
741                store
742                    .write(WriteBatch::default().blocks(vec![block.clone()]))
743                    .unwrap();
744                new_ancestors.push(block.reference());
745                blocks.push(block.reference());
746
747                // only write one block for the final round, which is the leader
748                // of the committed subdag.
749                if round == first_wave_rounds {
750                    leader = Some(block.clone());
751                    break;
752                }
753            }
754            ancestors = new_ancestors;
755        }
756
757        let leader_block = leader.unwrap();
758        let leader_ref = leader_block.reference();
759        let commit_index = 1;
760        let commit = TrustedCommit::new_for_test(
761            commit_index,
762            CommitDigest::MIN,
763            leader_block.timestamp_ms(),
764            leader_ref,
765            blocks.clone(),
766        );
767        let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone());
768        assert_eq!(subdag.leader, leader_ref);
769        assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
770        assert_eq!(
771            subdag.blocks.len(),
772            (num_authorities * wave_length) as usize + 1
773        );
774        assert_eq!(subdag.commit_ref, commit.reference());
775    }
776
777    #[tokio::test]
778    async fn test_commit_range() {
779        telemetry_subscribers::init_for_testing();
780        let mut range1 = CommitRange::new(1..=5);
781        let range2 = CommitRange::new(2..=6);
782        let range3 = CommitRange::new(5..=10);
783        let range4 = CommitRange::new(6..=10);
784        let range5 = CommitRange::new(6..=9);
785        let range6 = CommitRange::new(1..=1);
786
787        assert_eq!(range1.start(), 1);
788        assert_eq!(range1.end(), 5);
789
790        // Test range size
791        assert_eq!(range1.size(), 5);
792        assert_eq!(range3.size(), 6);
793        assert_eq!(range6.size(), 1);
794
795        // Test next range check
796        assert!(!range1.is_next_range(&range2));
797        assert!(!range1.is_next_range(&range3));
798        assert!(range1.is_next_range(&range4));
799        assert!(range1.is_next_range(&range5));
800
801        // Test equal size range check
802        assert!(range1.is_equal_size(&range2));
803        assert!(!range1.is_equal_size(&range3));
804        assert!(range1.is_equal_size(&range4));
805        assert!(!range1.is_equal_size(&range5));
806
807        // Test range ordering
808        assert!(range1 < range2);
809        assert!(range2 < range3);
810        assert!(range3 < range4);
811        assert!(range5 < range4);
812
813        // Test extending range
814        range1.extend_to(10);
815        assert_eq!(range1.start(), 1);
816        assert_eq!(range1.end(), 10);
817        assert_eq!(range1.size(), 10);
818
819        range1.extend_to(20);
820        assert_eq!(range1.start(), 1);
821        assert_eq!(range1.end(), 20);
822        assert_eq!(range1.size(), 20);
823    }
824}