consensus_core/
commit.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    cmp::Ordering,
6    collections::BTreeMap,
7    fmt::{self, Debug, Display, Formatter},
8    hash::{Hash, Hasher},
9    ops::{Deref, Range, RangeInclusive},
10    sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22    block::{BlockAPI, Slot, VerifiedBlock},
23    leader_scoring::ReputationScores,
24    storage::Store,
25};
26
27/// Index of a commit among all consensus commits.
28pub type CommitIndex = u32;
29
30pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
31
32/// Default wave length for all committers. A longer wave length increases the
33/// chance of committing the leader under asynchrony at the cost of latency in
34/// the common case.
35// TODO: merge DEFAULT_WAVE_LENGTH and MINIMUM_WAVE_LENGTH into a single constant,
36// because we are unlikely to change them via config in the forseeable future.
37pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
38
39/// We need at least one leader round, one voting round, and one decision round.
40pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
41
42/// The consensus protocol operates in 'waves'. Each wave is composed of a leader
43/// round, at least one voting round, and one decision round.
44pub(crate) type WaveNumber = u32;
45
46/// [`Commit`] summarizes [`CommittedSubDag`] for storage and network communications.
47///
48/// Validators should be able to reconstruct a sequence of CommittedSubDag from the
49/// corresponding Commit and blocks referenced in the Commit.
50/// A field must meet these requirements to be added to Commit:
51/// - helps with recovery locally and for peers catching up.
52/// - cannot be derived from a sequence of Commits and other persisted values.
53///
54/// For example, transactions in blocks should not be included in Commit, because they can be
55/// retrieved from blocks specified in Commit. Last committed round per authority also should not
56/// be included, because it can be derived from the latest value in storage and the additional
57/// sequence of Commits.
58#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
59#[enum_dispatch(CommitAPI)]
60pub enum Commit {
61    V1(CommitV1),
62}
63
64impl Commit {
65    /// Create a new commit.
66    pub(crate) fn new(
67        index: CommitIndex,
68        previous_digest: CommitDigest,
69        timestamp_ms: BlockTimestampMs,
70        leader: BlockRef,
71        blocks: Vec<BlockRef>,
72    ) -> Self {
73        Commit::V1(CommitV1 {
74            index,
75            previous_digest,
76            timestamp_ms,
77            leader,
78            blocks,
79        })
80    }
81
82    pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
83        let bytes = bcs::to_bytes(self)?;
84        Ok(bytes.into())
85    }
86}
87
88/// Accessors to Commit info.
89#[enum_dispatch]
90pub trait CommitAPI {
91    fn round(&self) -> Round;
92    fn index(&self) -> CommitIndex;
93    fn previous_digest(&self) -> CommitDigest;
94    fn timestamp_ms(&self) -> BlockTimestampMs;
95    fn leader(&self) -> BlockRef;
96    fn blocks(&self) -> &[BlockRef];
97}
98
99/// Specifies one consensus commit.
100/// It is stored on disk, so it does not contain blocks which are stored individually.
101#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
102pub struct CommitV1 {
103    /// Index of the commit.
104    /// First commit after genesis has an index of 1, then every next commit has an index incremented by 1.
105    index: CommitIndex,
106    /// Digest of the previous commit.
107    /// Set to CommitDigest::MIN for the first commit after genesis.
108    previous_digest: CommitDigest,
109    /// Timestamp of the commit, max of the timestamp of the leader block and previous Commit timestamp.
110    timestamp_ms: BlockTimestampMs,
111    /// A reference to the commit leader.
112    leader: BlockRef,
113    /// Refs to committed blocks, in the commit order.
114    blocks: Vec<BlockRef>,
115}
116
117impl CommitAPI for CommitV1 {
118    fn round(&self) -> Round {
119        self.leader.round
120    }
121
122    fn index(&self) -> CommitIndex {
123        self.index
124    }
125
126    fn previous_digest(&self) -> CommitDigest {
127        self.previous_digest
128    }
129
130    fn timestamp_ms(&self) -> BlockTimestampMs {
131        self.timestamp_ms
132    }
133
134    fn leader(&self) -> BlockRef {
135        self.leader
136    }
137
138    fn blocks(&self) -> &[BlockRef] {
139        &self.blocks
140    }
141}
142
143/// A commit is trusted when it is produced locally or certified by a quorum of authorities.
144/// Blocks referenced by TrustedCommit are assumed to be valid.
145/// Only trusted Commit can be sent to execution.
146///
147/// Note: clone() is relatively cheap with the underlying data refcounted.
148#[derive(Clone, Debug, PartialEq)]
149pub struct TrustedCommit {
150    inner: Arc<Commit>,
151
152    // Cached digest and serialized value, to avoid re-computing these values.
153    digest: CommitDigest,
154    serialized: Bytes,
155}
156
157impl TrustedCommit {
158    pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
159        let digest = Self::compute_digest(&serialized);
160        Self {
161            inner: Arc::new(commit),
162            digest,
163            serialized,
164        }
165    }
166
167    pub(crate) fn new_for_test(
168        index: CommitIndex,
169        previous_digest: CommitDigest,
170        timestamp_ms: BlockTimestampMs,
171        leader: BlockRef,
172        blocks: Vec<BlockRef>,
173    ) -> Self {
174        let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
175        let serialized = commit.serialize().unwrap();
176        Self::new_trusted(commit, serialized)
177    }
178
179    pub(crate) fn reference(&self) -> CommitRef {
180        CommitRef {
181            index: self.index(),
182            digest: self.digest(),
183        }
184    }
185
186    pub(crate) fn digest(&self) -> CommitDigest {
187        self.digest
188    }
189
190    pub(crate) fn serialized(&self) -> &Bytes {
191        &self.serialized
192    }
193
194    pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
195        let mut hasher = DefaultHashFunction::new();
196        hasher.update(serialized);
197        CommitDigest(hasher.finalize().into())
198    }
199}
200
201/// Allow easy access on the underlying Commit.
202impl Deref for TrustedCommit {
203    type Target = Commit;
204
205    fn deref(&self) -> &Self::Target {
206        &self.inner
207    }
208}
209
210/// `CertifiedCommits` keeps the synchronized certified commits along with the corresponding votes received from the peer that provided these commits.
211/// The `votes` contain the blocks as those provided by the peer, and certify the tip of the synced commits.
212#[derive(Clone, Debug)]
213pub(crate) struct CertifiedCommits {
214    commits: Vec<CertifiedCommit>,
215    votes: Vec<VerifiedBlock>,
216}
217
218impl CertifiedCommits {
219    pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
220        Self { commits, votes }
221    }
222
223    pub(crate) fn commits(&self) -> &[CertifiedCommit] {
224        &self.commits
225    }
226
227    pub(crate) fn votes(&self) -> &[VerifiedBlock] {
228        &self.votes
229    }
230}
231
232/// A commit that has been synced and certified by a quorum of authorities.
233#[derive(Clone, Debug)]
234pub(crate) struct CertifiedCommit {
235    commit: Arc<TrustedCommit>,
236    blocks: Vec<VerifiedBlock>,
237}
238
239impl CertifiedCommit {
240    pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
241        Self {
242            commit: Arc::new(commit),
243            blocks,
244        }
245    }
246
247    pub fn blocks(&self) -> &[VerifiedBlock] {
248        &self.blocks
249    }
250}
251
252impl Deref for CertifiedCommit {
253    type Target = TrustedCommit;
254
255    fn deref(&self) -> &Self::Target {
256        &self.commit
257    }
258}
259
260/// Digest of a consensus commit.
261#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
262pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
263
264impl CommitDigest {
265    /// Lexicographic min & max digest.
266    pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
267    pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
268
269    pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
270        self.0
271    }
272}
273
274impl Hash for CommitDigest {
275    fn hash<H: Hasher>(&self, state: &mut H) {
276        state.write(&self.0[..8]);
277    }
278}
279
280impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
281    fn from(hd: CommitDigest) -> Self {
282        Digest::new(hd.0)
283    }
284}
285
286impl fmt::Display for CommitDigest {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
288        write!(
289            f,
290            "{}",
291            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
292                .get(0..4)
293                .ok_or(fmt::Error)?
294        )
295    }
296}
297
298impl fmt::Debug for CommitDigest {
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
300        write!(
301            f,
302            "{}",
303            base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
304        )
305    }
306}
307
308/// Uniquely identifies a commit with its index and digest.
309#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
310pub struct CommitRef {
311    pub index: CommitIndex,
312    pub digest: CommitDigest,
313}
314
315impl CommitRef {
316    pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
317        Self { index, digest }
318    }
319}
320
321impl fmt::Display for CommitRef {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
323        write!(f, "C{}({})", self.index, self.digest)
324    }
325}
326
327impl fmt::Debug for CommitRef {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
329        write!(f, "C{}({:?})", self.index, self.digest)
330    }
331}
332
333// Represents a vote on a Commit.
334pub type CommitVote = CommitRef;
335
336/// The output of consensus to execution is an ordered list of [`CommittedSubDag`].
337/// Each CommittedSubDag contains the information needed to execution transactions in
338/// the consensus commit.
339///
340/// The application processing CommittedSubDag can arbitrarily sort the blocks within
341/// each sub-dag (but using a deterministic algorithm).
342#[derive(Clone, PartialEq)]
343pub struct CommittedSubDag {
344    /// Set by Linearizer.
345    ///
346    /// A reference to the leader of the sub-dag
347    pub leader: BlockRef,
348    /// All the committed blocks that are part of this sub-dag
349    pub blocks: Vec<VerifiedBlock>,
350    /// The timestamp of the commit, obtained from the timestamp of the leader block.
351    pub timestamp_ms: BlockTimestampMs,
352    /// The reference of the commit.
353    /// First commit after genesis has a index of 1, then every next commit has a
354    /// index incremented by 1.
355    pub commit_ref: CommitRef,
356
357    /// Set by CommitObserver.
358    ///
359    /// Indicates whether the commit was decided locally based on the local DAG.
360    ///
361    /// If true, `CommitFinalizer` can then assume a quorum of certificates are available
362    /// for each transaction in the commit if there is no reject vote, and proceed with
363    /// optimistic finalization of transactions.
364    ///
365    /// If the commit was decided by `UniversalCommitter`, this must be true.
366    /// If the commit was received from a peer via `CommitSyncer`, this must be false.
367    /// There may not be enough blocks in local DAG to decide on the commit.
368    ///
369    /// For safety, a previously locally decided commit may be recovered after restarting as
370    /// non-local, if its finalization state was not persisted.
371    pub decided_with_local_blocks: bool,
372    /// Whether rejected transactions in this commit have been recovered from storage.
373    pub recovered_rejected_transactions: bool,
374    /// Optional scores that are provided as part of the consensus output to Sui
375    /// that can then be used by Sui for future submission to consensus.
376    pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
377
378    /// Set by CommitFinalizer.
379    ///
380    /// Indices of rejected transactions in each block.
381    pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
382}
383
384impl CommittedSubDag {
385    /// Creates a new committed sub dag.
386    pub fn new(
387        leader: BlockRef,
388        blocks: Vec<VerifiedBlock>,
389        timestamp_ms: BlockTimestampMs,
390        commit_ref: CommitRef,
391    ) -> Self {
392        Self {
393            leader,
394            blocks,
395            timestamp_ms,
396            commit_ref,
397            decided_with_local_blocks: true,
398            recovered_rejected_transactions: false,
399            reputation_scores_desc: vec![],
400            rejected_transactions_by_block: BTreeMap::new(),
401        }
402    }
403}
404
405// Sort the blocks of the sub-dag blocks by round number then authority index. Any
406// deterministic & stable algorithm works.
407pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
408    blocks.sort_by(|a, b| {
409        a.round()
410            .cmp(&b.round())
411            .then_with(|| a.author().cmp(&b.author()))
412    })
413}
414
415impl Display for CommittedSubDag {
416    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
417        write!(
418            f,
419            "{}@{} [{}])",
420            self.commit_ref,
421            self.leader,
422            self.blocks
423                .iter()
424                .map(|b| b.reference().to_string())
425                .join(", ")
426        )
427    }
428}
429
430impl fmt::Debug for CommittedSubDag {
431    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432        write!(
433            f,
434            "{}@{} [{}])",
435            self.commit_ref,
436            self.leader,
437            self.blocks
438                .iter()
439                .map(|b| b.reference().to_string())
440                .join(", ")
441        )?;
442        write!(
443            f,
444            ";{}ms;rs{:?};{};{};[{}]",
445            self.timestamp_ms,
446            self.reputation_scores_desc,
447            self.decided_with_local_blocks,
448            self.recovered_rejected_transactions,
449            self.rejected_transactions_by_block
450                .iter()
451                .map(|(block_ref, transactions)| {
452                    format!("{}: {}, ", block_ref, transactions.len())
453                })
454                .collect::<Vec<_>>()
455                .join(", "),
456        )
457    }
458}
459
460// Recovers the full CommittedSubDag from block store, based on Commit.
461pub(crate) fn load_committed_subdag_from_store(
462    store: &dyn Store,
463    commit: TrustedCommit,
464    reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
465) -> CommittedSubDag {
466    let mut leader_block_idx = None;
467    let commit_blocks = store
468        .read_blocks(commit.blocks())
469        .expect("We should have the block referenced in the commit data");
470    let blocks = commit_blocks
471        .into_iter()
472        .enumerate()
473        .map(|(idx, commit_block_opt)| {
474            let commit_block =
475                commit_block_opt.expect("We should have the block referenced in the commit data");
476            if commit_block.reference() == commit.leader() {
477                leader_block_idx = Some(idx);
478            }
479            commit_block
480        })
481        .collect::<Vec<_>>();
482    let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
483    let leader_block_ref = blocks[leader_block_idx].reference();
484
485    let mut subdag = CommittedSubDag::new(
486        leader_block_ref,
487        blocks,
488        commit.timestamp_ms(),
489        commit.reference(),
490    );
491
492    subdag.reputation_scores_desc = reputation_scores_desc;
493
494    let reject_votes = store
495        .read_rejected_transactions(commit.reference())
496        .unwrap();
497    if let Some(reject_votes) = reject_votes {
498        subdag.decided_with_local_blocks = true;
499        subdag.recovered_rejected_transactions = true;
500        subdag.rejected_transactions_by_block = reject_votes;
501    } else {
502        subdag.decided_with_local_blocks = false;
503        subdag.recovered_rejected_transactions = false;
504    }
505
506    subdag
507}
508
509#[derive(Debug, Clone, Copy, Eq, PartialEq)]
510pub(crate) enum Decision {
511    Direct,
512    Indirect,
513    Certified, // This is a commit certified leader so no commit decision was made locally.
514}
515
516/// The status of a leader slot from the direct and indirect commit rules.
517#[derive(Debug, Clone, PartialEq)]
518pub(crate) enum LeaderStatus {
519    Commit(VerifiedBlock),
520    Skip(Slot),
521    Undecided(Slot),
522}
523
524impl LeaderStatus {
525    pub(crate) fn round(&self) -> Round {
526        match self {
527            Self::Commit(block) => block.round(),
528            Self::Skip(leader) => leader.round,
529            Self::Undecided(leader) => leader.round,
530        }
531    }
532
533    pub(crate) fn is_decided(&self) -> bool {
534        match self {
535            Self::Commit(_) => true,
536            Self::Skip(_) => true,
537            Self::Undecided(_) => false,
538        }
539    }
540
541    pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
542        match self {
543            Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
544            Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
545            Self::Undecided(..) => None,
546        }
547    }
548}
549
550impl Display for LeaderStatus {
551    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552        match self {
553            Self::Commit(block) => write!(f, "Commit({})", block.reference()),
554            Self::Skip(slot) => write!(f, "Skip({slot})"),
555            Self::Undecided(slot) => write!(f, "Undecided({slot})"),
556        }
557    }
558}
559
560/// Decision of each leader slot.
561#[derive(Debug, Clone, PartialEq)]
562pub(crate) enum DecidedLeader {
563    /// The committed leader block and whether it is a direct commit.
564    /// It is incorrect to trigger the direct commit optimization when the commit is not.
565    /// So when it is unknown if the commit is direct, the boolean flag should be false.
566    Commit(VerifiedBlock, bool),
567    /// The skipped leader slot where no block is committed.
568    Skip(Slot),
569}
570
571impl DecidedLeader {
572    // Slot where the leader is decided.
573    pub(crate) fn slot(&self) -> Slot {
574        match self {
575            Self::Commit(block, _direct) => block.reference().into(),
576            Self::Skip(slot) => *slot,
577        }
578    }
579
580    // Converts to committed block if the decision is to commit. Returns None otherwise.
581    pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
582        match self {
583            Self::Commit(block, _direct) => Some(block),
584            Self::Skip(_) => None,
585        }
586    }
587
588    #[cfg(test)]
589    pub(crate) fn round(&self) -> Round {
590        match self {
591            Self::Commit(block, _direct) => block.round(),
592            Self::Skip(leader) => leader.round,
593        }
594    }
595
596    #[cfg(test)]
597    pub(crate) fn authority(&self) -> AuthorityIndex {
598        match self {
599            Self::Commit(block, _direct) => block.author(),
600            Self::Skip(leader) => leader.authority,
601        }
602    }
603}
604
605impl Display for DecidedLeader {
606    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607        match self {
608            Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
609            Self::Skip(slot) => write!(f, "Skip({slot})"),
610        }
611    }
612}
613
614/// Per-commit properties that can be regenerated from past values, and do not need to be part of
615/// the Commit struct.
616/// Only the latest version is needed for recovery, but more versions are stored for debugging,
617/// and potentially restoring from an earlier state.
618// TODO: version this struct.
619#[derive(Clone, Debug, Serialize, Deserialize)]
620pub struct CommitInfo {
621    pub(crate) committed_rounds: Vec<Round>,
622    pub(crate) reputation_scores: ReputationScores,
623}
624
625/// `CommitRange` stores a range of `CommitIndex`. The range contains the start (inclusive)
626/// and end (inclusive) commit indices and can be ordered for use as the key of a table.
627///
628/// NOTE: using `Range<CommitIndex>` for internal representation for backward compatibility.
629/// The external semantics of `CommitRange` is closer to `RangeInclusive<CommitIndex>`.
630#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
631pub struct CommitRange(Range<CommitIndex>);
632
633impl CommitRange {
634    pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
635        // When end is CommitIndex::MAX, the range can be considered as unbounded
636        // so it is ok to saturate at the end.
637        Self(*range.start()..(*range.end()).saturating_add(1))
638    }
639
640    // Inclusive
641    pub fn start(&self) -> CommitIndex {
642        self.0.start
643    }
644
645    // Inclusive
646    pub fn end(&self) -> CommitIndex {
647        self.0.end.saturating_sub(1)
648    }
649
650    pub fn extend_to(&mut self, other: CommitIndex) {
651        let new_end = other.saturating_add(1);
652        assert!(self.0.end <= new_end);
653        self.0 = self.0.start..new_end;
654    }
655
656    pub fn size(&self) -> usize {
657        self.0
658            .end
659            .checked_sub(self.0.start)
660            .expect("Range should never have end < start") as usize
661    }
662
663    /// Check whether the two ranges have the same size.
664    pub fn is_equal_size(&self, other: &Self) -> bool {
665        self.size() == other.size()
666    }
667
668    /// Check if the provided range is sequentially after this range.
669    pub fn is_next_range(&self, other: &Self) -> bool {
670        self.0.end == other.0.start
671    }
672}
673
674impl Ord for CommitRange {
675    fn cmp(&self, other: &Self) -> Ordering {
676        self.start()
677            .cmp(&other.start())
678            .then_with(|| self.end().cmp(&other.end()))
679    }
680}
681
682impl PartialOrd for CommitRange {
683    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
684        Some(self.cmp(other))
685    }
686}
687
688impl From<RangeInclusive<CommitIndex>> for CommitRange {
689    fn from(range: RangeInclusive<CommitIndex>) -> Self {
690        Self::new(range)
691    }
692}
693
694/// Display CommitRange as an inclusive range.
695impl Debug for CommitRange {
696    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
697        write!(f, "CommitRange({}..={})", self.start(), self.end())
698    }
699}
700
701#[cfg(test)]
702mod tests {
703    use std::sync::Arc;
704
705    use super::*;
706    use crate::{
707        block::TestBlock,
708        context::Context,
709        storage::{WriteBatch, mem_store::MemStore},
710    };
711
712    #[tokio::test]
713    async fn test_new_subdag_from_commit() {
714        let store = Arc::new(MemStore::new());
715        let context = Arc::new(Context::new_for_test(4).0);
716        let wave_length = DEFAULT_WAVE_LENGTH;
717
718        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
719        let first_wave_rounds: u32 = wave_length;
720        let num_authorities: u32 = 4;
721
722        let mut blocks = Vec::new();
723        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
724            .committee
725            .authorities()
726            .map(|index| {
727                let author_idx = index.0.value() as u32;
728                let block = TestBlock::new(0, author_idx).build();
729                VerifiedBlock::new_for_test(block)
730            })
731            .map(|block| (block.reference(), block))
732            .unzip();
733        // TODO: avoid writing genesis blocks?
734        store.write(WriteBatch::default().blocks(genesis)).unwrap();
735        blocks.append(&mut genesis_references.clone());
736
737        let mut ancestors = genesis_references;
738        let mut leader = None;
739        for round in 1..=first_wave_rounds {
740            let mut new_ancestors = vec![];
741            for author in 0..num_authorities {
742                let base_ts = round as BlockTimestampMs * 1000;
743                let block = VerifiedBlock::new_for_test(
744                    TestBlock::new(round, author)
745                        .set_timestamp_ms(base_ts + (author + round) as u64)
746                        .set_ancestors(ancestors.clone())
747                        .build(),
748                );
749                store
750                    .write(WriteBatch::default().blocks(vec![block.clone()]))
751                    .unwrap();
752                new_ancestors.push(block.reference());
753                blocks.push(block.reference());
754
755                // only write one block for the final round, which is the leader
756                // of the committed subdag.
757                if round == first_wave_rounds {
758                    leader = Some(block.clone());
759                    break;
760                }
761            }
762            ancestors = new_ancestors;
763        }
764
765        let leader_block = leader.unwrap();
766        let leader_ref = leader_block.reference();
767        let commit_index = 1;
768        let commit = TrustedCommit::new_for_test(
769            commit_index,
770            CommitDigest::MIN,
771            leader_block.timestamp_ms(),
772            leader_ref,
773            blocks.clone(),
774        );
775        let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
776        assert_eq!(subdag.leader, leader_ref);
777        assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
778        assert_eq!(
779            subdag.blocks.len(),
780            (num_authorities * wave_length) as usize + 1
781        );
782        assert_eq!(subdag.commit_ref, commit.reference());
783        assert_eq!(subdag.reputation_scores_desc, vec![]);
784    }
785
786    #[tokio::test]
787    async fn test_commit_range() {
788        telemetry_subscribers::init_for_testing();
789        let mut range1 = CommitRange::new(1..=5);
790        let range2 = CommitRange::new(2..=6);
791        let range3 = CommitRange::new(5..=10);
792        let range4 = CommitRange::new(6..=10);
793        let range5 = CommitRange::new(6..=9);
794        let range6 = CommitRange::new(1..=1);
795
796        assert_eq!(range1.start(), 1);
797        assert_eq!(range1.end(), 5);
798
799        // Test range size
800        assert_eq!(range1.size(), 5);
801        assert_eq!(range3.size(), 6);
802        assert_eq!(range6.size(), 1);
803
804        // Test next range check
805        assert!(!range1.is_next_range(&range2));
806        assert!(!range1.is_next_range(&range3));
807        assert!(range1.is_next_range(&range4));
808        assert!(range1.is_next_range(&range5));
809
810        // Test equal size range check
811        assert!(range1.is_equal_size(&range2));
812        assert!(!range1.is_equal_size(&range3));
813        assert!(range1.is_equal_size(&range4));
814        assert!(!range1.is_equal_size(&range5));
815
816        // Test range ordering
817        assert!(range1 < range2);
818        assert!(range2 < range3);
819        assert!(range3 < range4);
820        assert!(range5 < range4);
821
822        // Test extending range
823        range1.extend_to(10);
824        assert_eq!(range1.start(), 1);
825        assert_eq!(range1.end(), 10);
826        assert_eq!(range1.size(), 10);
827
828        range1.extend_to(20);
829        assert_eq!(range1.start(), 1);
830        assert_eq!(range1.end(), 20);
831        assert_eq!(range1.size(), 20);
832    }
833}