1use std::{
5 cmp::Ordering,
6 collections::BTreeMap,
7 fmt::{self, Debug, Display, Formatter},
8 hash::{Hash, Hasher},
9 ops::{Deref, Range, RangeInclusive},
10 sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 block::{BlockAPI, Slot, VerifiedBlock},
23 leader_scoring::ReputationScores,
24 storage::Store,
25};
26
27pub type CommitIndex = u32;
29
30pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
31
32pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
38
39pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
41
42pub(crate) type WaveNumber = u32;
45
46#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
59#[enum_dispatch(CommitAPI)]
60pub enum Commit {
61 V1(CommitV1),
62}
63
64impl Commit {
65 pub(crate) fn new(
67 index: CommitIndex,
68 previous_digest: CommitDigest,
69 timestamp_ms: BlockTimestampMs,
70 leader: BlockRef,
71 blocks: Vec<BlockRef>,
72 ) -> Self {
73 Commit::V1(CommitV1 {
74 index,
75 previous_digest,
76 timestamp_ms,
77 leader,
78 blocks,
79 })
80 }
81
82 pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
83 let bytes = bcs::to_bytes(self)?;
84 Ok(bytes.into())
85 }
86}
87
88#[enum_dispatch]
90pub trait CommitAPI {
91 fn round(&self) -> Round;
92 fn index(&self) -> CommitIndex;
93 fn previous_digest(&self) -> CommitDigest;
94 fn timestamp_ms(&self) -> BlockTimestampMs;
95 fn leader(&self) -> BlockRef;
96 fn blocks(&self) -> &[BlockRef];
97}
98
99#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
102pub struct CommitV1 {
103 index: CommitIndex,
106 previous_digest: CommitDigest,
109 timestamp_ms: BlockTimestampMs,
111 leader: BlockRef,
113 blocks: Vec<BlockRef>,
115}
116
117impl CommitAPI for CommitV1 {
118 fn round(&self) -> Round {
119 self.leader.round
120 }
121
122 fn index(&self) -> CommitIndex {
123 self.index
124 }
125
126 fn previous_digest(&self) -> CommitDigest {
127 self.previous_digest
128 }
129
130 fn timestamp_ms(&self) -> BlockTimestampMs {
131 self.timestamp_ms
132 }
133
134 fn leader(&self) -> BlockRef {
135 self.leader
136 }
137
138 fn blocks(&self) -> &[BlockRef] {
139 &self.blocks
140 }
141}
142
143#[derive(Clone, Debug, PartialEq)]
149pub struct TrustedCommit {
150 inner: Arc<Commit>,
151
152 digest: CommitDigest,
154 serialized: Bytes,
155}
156
157impl TrustedCommit {
158 pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
159 let digest = Self::compute_digest(&serialized);
160 Self {
161 inner: Arc::new(commit),
162 digest,
163 serialized,
164 }
165 }
166
167 pub(crate) fn new_for_test(
168 index: CommitIndex,
169 previous_digest: CommitDigest,
170 timestamp_ms: BlockTimestampMs,
171 leader: BlockRef,
172 blocks: Vec<BlockRef>,
173 ) -> Self {
174 let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
175 let serialized = commit.serialize().unwrap();
176 Self::new_trusted(commit, serialized)
177 }
178
179 pub(crate) fn reference(&self) -> CommitRef {
180 CommitRef {
181 index: self.index(),
182 digest: self.digest(),
183 }
184 }
185
186 pub(crate) fn digest(&self) -> CommitDigest {
187 self.digest
188 }
189
190 pub(crate) fn serialized(&self) -> &Bytes {
191 &self.serialized
192 }
193
194 pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
195 let mut hasher = DefaultHashFunction::new();
196 hasher.update(serialized);
197 CommitDigest(hasher.finalize().into())
198 }
199}
200
201impl Deref for TrustedCommit {
203 type Target = Commit;
204
205 fn deref(&self) -> &Self::Target {
206 &self.inner
207 }
208}
209
210#[derive(Clone, Debug)]
213pub(crate) struct CertifiedCommits {
214 commits: Vec<CertifiedCommit>,
215 votes: Vec<VerifiedBlock>,
216}
217
218impl CertifiedCommits {
219 pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
220 Self { commits, votes }
221 }
222
223 pub(crate) fn commits(&self) -> &[CertifiedCommit] {
224 &self.commits
225 }
226
227 pub(crate) fn votes(&self) -> &[VerifiedBlock] {
228 &self.votes
229 }
230}
231
232#[derive(Clone, Debug)]
234pub(crate) struct CertifiedCommit {
235 commit: Arc<TrustedCommit>,
236 blocks: Vec<VerifiedBlock>,
237}
238
239impl CertifiedCommit {
240 pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
241 Self {
242 commit: Arc::new(commit),
243 blocks,
244 }
245 }
246
247 pub fn blocks(&self) -> &[VerifiedBlock] {
248 &self.blocks
249 }
250}
251
252impl Deref for CertifiedCommit {
253 type Target = TrustedCommit;
254
255 fn deref(&self) -> &Self::Target {
256 &self.commit
257 }
258}
259
260#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
262pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
263
264impl CommitDigest {
265 pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
267 pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
268
269 pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
270 self.0
271 }
272}
273
274impl Hash for CommitDigest {
275 fn hash<H: Hasher>(&self, state: &mut H) {
276 state.write(&self.0[..8]);
277 }
278}
279
280impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
281 fn from(hd: CommitDigest) -> Self {
282 Digest::new(hd.0)
283 }
284}
285
286impl fmt::Display for CommitDigest {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
288 write!(
289 f,
290 "{}",
291 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
292 .get(0..4)
293 .ok_or(fmt::Error)?
294 )
295 }
296}
297
298impl fmt::Debug for CommitDigest {
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
300 write!(
301 f,
302 "{}",
303 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
304 )
305 }
306}
307
308#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
310pub struct CommitRef {
311 pub index: CommitIndex,
312 pub digest: CommitDigest,
313}
314
315impl CommitRef {
316 pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
317 Self { index, digest }
318 }
319}
320
321impl fmt::Display for CommitRef {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
323 write!(f, "C{}({})", self.index, self.digest)
324 }
325}
326
327impl fmt::Debug for CommitRef {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
329 write!(f, "C{}({:?})", self.index, self.digest)
330 }
331}
332
333pub type CommitVote = CommitRef;
335
336#[derive(Clone, PartialEq)]
343pub struct CommittedSubDag {
344 pub leader: BlockRef,
348 pub blocks: Vec<VerifiedBlock>,
350 pub timestamp_ms: BlockTimestampMs,
352 pub commit_ref: CommitRef,
356
357 pub decided_with_local_blocks: bool,
372 pub recovered_rejected_transactions: bool,
374 pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
377
378 pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
382}
383
384impl CommittedSubDag {
385 pub fn new(
387 leader: BlockRef,
388 blocks: Vec<VerifiedBlock>,
389 timestamp_ms: BlockTimestampMs,
390 commit_ref: CommitRef,
391 ) -> Self {
392 Self {
393 leader,
394 blocks,
395 timestamp_ms,
396 commit_ref,
397 decided_with_local_blocks: true,
398 recovered_rejected_transactions: false,
399 reputation_scores_desc: vec![],
400 rejected_transactions_by_block: BTreeMap::new(),
401 }
402 }
403}
404
405pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
408 blocks.sort_by(|a, b| {
409 a.round()
410 .cmp(&b.round())
411 .then_with(|| a.author().cmp(&b.author()))
412 })
413}
414
415impl Display for CommittedSubDag {
416 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
417 write!(
418 f,
419 "{}@{} [{}])",
420 self.commit_ref,
421 self.leader,
422 self.blocks
423 .iter()
424 .map(|b| b.reference().to_string())
425 .join(", ")
426 )
427 }
428}
429
430impl fmt::Debug for CommittedSubDag {
431 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 write!(
433 f,
434 "{}@{} [{}])",
435 self.commit_ref,
436 self.leader,
437 self.blocks
438 .iter()
439 .map(|b| b.reference().to_string())
440 .join(", ")
441 )?;
442 write!(
443 f,
444 ";{}ms;rs{:?};{};{};[{}]",
445 self.timestamp_ms,
446 self.reputation_scores_desc,
447 self.decided_with_local_blocks,
448 self.recovered_rejected_transactions,
449 self.rejected_transactions_by_block
450 .iter()
451 .map(|(block_ref, transactions)| {
452 format!("{}: {}, ", block_ref, transactions.len())
453 })
454 .collect::<Vec<_>>()
455 .join(", "),
456 )
457 }
458}
459
460pub(crate) fn load_committed_subdag_from_store(
462 store: &dyn Store,
463 commit: TrustedCommit,
464 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
465) -> CommittedSubDag {
466 let mut leader_block_idx = None;
467 let commit_blocks = store
468 .read_blocks(commit.blocks())
469 .expect("We should have the block referenced in the commit data");
470 let blocks = commit_blocks
471 .into_iter()
472 .enumerate()
473 .map(|(idx, commit_block_opt)| {
474 let commit_block =
475 commit_block_opt.expect("We should have the block referenced in the commit data");
476 if commit_block.reference() == commit.leader() {
477 leader_block_idx = Some(idx);
478 }
479 commit_block
480 })
481 .collect::<Vec<_>>();
482 let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
483 let leader_block_ref = blocks[leader_block_idx].reference();
484
485 let mut subdag = CommittedSubDag::new(
486 leader_block_ref,
487 blocks,
488 commit.timestamp_ms(),
489 commit.reference(),
490 );
491
492 subdag.reputation_scores_desc = reputation_scores_desc;
493
494 let reject_votes = store
495 .read_rejected_transactions(commit.reference())
496 .unwrap();
497 if let Some(reject_votes) = reject_votes {
498 subdag.decided_with_local_blocks = true;
499 subdag.recovered_rejected_transactions = true;
500 subdag.rejected_transactions_by_block = reject_votes;
501 } else {
502 subdag.decided_with_local_blocks = false;
503 subdag.recovered_rejected_transactions = false;
504 }
505
506 subdag
507}
508
509#[derive(Debug, Clone, Copy, Eq, PartialEq)]
510pub(crate) enum Decision {
511 Direct,
512 Indirect,
513 Certified, }
515
516#[derive(Debug, Clone, PartialEq)]
518pub(crate) enum LeaderStatus {
519 Commit(VerifiedBlock),
520 Skip(Slot),
521 Undecided(Slot),
522}
523
524impl LeaderStatus {
525 pub(crate) fn round(&self) -> Round {
526 match self {
527 Self::Commit(block) => block.round(),
528 Self::Skip(leader) => leader.round,
529 Self::Undecided(leader) => leader.round,
530 }
531 }
532
533 pub(crate) fn is_decided(&self) -> bool {
534 match self {
535 Self::Commit(_) => true,
536 Self::Skip(_) => true,
537 Self::Undecided(_) => false,
538 }
539 }
540
541 pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
542 match self {
543 Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
544 Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
545 Self::Undecided(..) => None,
546 }
547 }
548}
549
550impl Display for LeaderStatus {
551 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552 match self {
553 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
554 Self::Skip(slot) => write!(f, "Skip({slot})"),
555 Self::Undecided(slot) => write!(f, "Undecided({slot})"),
556 }
557 }
558}
559
560#[derive(Debug, Clone, PartialEq)]
562pub(crate) enum DecidedLeader {
563 Commit(VerifiedBlock, bool),
567 Skip(Slot),
569}
570
571impl DecidedLeader {
572 pub(crate) fn slot(&self) -> Slot {
574 match self {
575 Self::Commit(block, _direct) => block.reference().into(),
576 Self::Skip(slot) => *slot,
577 }
578 }
579
580 pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
582 match self {
583 Self::Commit(block, _direct) => Some(block),
584 Self::Skip(_) => None,
585 }
586 }
587
588 #[cfg(test)]
589 pub(crate) fn round(&self) -> Round {
590 match self {
591 Self::Commit(block, _direct) => block.round(),
592 Self::Skip(leader) => leader.round,
593 }
594 }
595
596 #[cfg(test)]
597 pub(crate) fn authority(&self) -> AuthorityIndex {
598 match self {
599 Self::Commit(block, _direct) => block.author(),
600 Self::Skip(leader) => leader.authority,
601 }
602 }
603}
604
605impl Display for DecidedLeader {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 match self {
608 Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
609 Self::Skip(slot) => write!(f, "Skip({slot})"),
610 }
611 }
612}
613
614#[derive(Clone, Debug, Serialize, Deserialize)]
620pub struct CommitInfo {
621 pub(crate) committed_rounds: Vec<Round>,
622 pub(crate) reputation_scores: ReputationScores,
623}
624
625#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
631pub struct CommitRange(Range<CommitIndex>);
632
633impl CommitRange {
634 pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
635 Self(*range.start()..(*range.end()).saturating_add(1))
638 }
639
640 pub fn start(&self) -> CommitIndex {
642 self.0.start
643 }
644
645 pub fn end(&self) -> CommitIndex {
647 self.0.end.saturating_sub(1)
648 }
649
650 pub fn extend_to(&mut self, other: CommitIndex) {
651 let new_end = other.saturating_add(1);
652 assert!(self.0.end <= new_end);
653 self.0 = self.0.start..new_end;
654 }
655
656 pub fn size(&self) -> usize {
657 self.0
658 .end
659 .checked_sub(self.0.start)
660 .expect("Range should never have end < start") as usize
661 }
662
663 pub fn is_equal_size(&self, other: &Self) -> bool {
665 self.size() == other.size()
666 }
667
668 pub fn is_next_range(&self, other: &Self) -> bool {
670 self.0.end == other.0.start
671 }
672}
673
674impl Ord for CommitRange {
675 fn cmp(&self, other: &Self) -> Ordering {
676 self.start()
677 .cmp(&other.start())
678 .then_with(|| self.end().cmp(&other.end()))
679 }
680}
681
682impl PartialOrd for CommitRange {
683 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
684 Some(self.cmp(other))
685 }
686}
687
688impl From<RangeInclusive<CommitIndex>> for CommitRange {
689 fn from(range: RangeInclusive<CommitIndex>) -> Self {
690 Self::new(range)
691 }
692}
693
694impl Debug for CommitRange {
696 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
697 write!(f, "CommitRange({}..={})", self.start(), self.end())
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use std::sync::Arc;
704
705 use super::*;
706 use crate::{
707 block::TestBlock,
708 context::Context,
709 storage::{WriteBatch, mem_store::MemStore},
710 };
711
712 #[tokio::test]
713 async fn test_new_subdag_from_commit() {
714 let store = Arc::new(MemStore::new());
715 let context = Arc::new(Context::new_for_test(4).0);
716 let wave_length = DEFAULT_WAVE_LENGTH;
717
718 let first_wave_rounds: u32 = wave_length;
720 let num_authorities: u32 = 4;
721
722 let mut blocks = Vec::new();
723 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
724 .committee
725 .authorities()
726 .map(|index| {
727 let author_idx = index.0.value() as u32;
728 let block = TestBlock::new(0, author_idx).build();
729 VerifiedBlock::new_for_test(block)
730 })
731 .map(|block| (block.reference(), block))
732 .unzip();
733 store.write(WriteBatch::default().blocks(genesis)).unwrap();
735 blocks.append(&mut genesis_references.clone());
736
737 let mut ancestors = genesis_references;
738 let mut leader = None;
739 for round in 1..=first_wave_rounds {
740 let mut new_ancestors = vec![];
741 for author in 0..num_authorities {
742 let base_ts = round as BlockTimestampMs * 1000;
743 let block = VerifiedBlock::new_for_test(
744 TestBlock::new(round, author)
745 .set_timestamp_ms(base_ts + (author + round) as u64)
746 .set_ancestors(ancestors.clone())
747 .build(),
748 );
749 store
750 .write(WriteBatch::default().blocks(vec![block.clone()]))
751 .unwrap();
752 new_ancestors.push(block.reference());
753 blocks.push(block.reference());
754
755 if round == first_wave_rounds {
758 leader = Some(block.clone());
759 break;
760 }
761 }
762 ancestors = new_ancestors;
763 }
764
765 let leader_block = leader.unwrap();
766 let leader_ref = leader_block.reference();
767 let commit_index = 1;
768 let commit = TrustedCommit::new_for_test(
769 commit_index,
770 CommitDigest::MIN,
771 leader_block.timestamp_ms(),
772 leader_ref,
773 blocks.clone(),
774 );
775 let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
776 assert_eq!(subdag.leader, leader_ref);
777 assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
778 assert_eq!(
779 subdag.blocks.len(),
780 (num_authorities * wave_length) as usize + 1
781 );
782 assert_eq!(subdag.commit_ref, commit.reference());
783 assert_eq!(subdag.reputation_scores_desc, vec![]);
784 }
785
786 #[tokio::test]
787 async fn test_commit_range() {
788 telemetry_subscribers::init_for_testing();
789 let mut range1 = CommitRange::new(1..=5);
790 let range2 = CommitRange::new(2..=6);
791 let range3 = CommitRange::new(5..=10);
792 let range4 = CommitRange::new(6..=10);
793 let range5 = CommitRange::new(6..=9);
794 let range6 = CommitRange::new(1..=1);
795
796 assert_eq!(range1.start(), 1);
797 assert_eq!(range1.end(), 5);
798
799 assert_eq!(range1.size(), 5);
801 assert_eq!(range3.size(), 6);
802 assert_eq!(range6.size(), 1);
803
804 assert!(!range1.is_next_range(&range2));
806 assert!(!range1.is_next_range(&range3));
807 assert!(range1.is_next_range(&range4));
808 assert!(range1.is_next_range(&range5));
809
810 assert!(range1.is_equal_size(&range2));
812 assert!(!range1.is_equal_size(&range3));
813 assert!(range1.is_equal_size(&range4));
814 assert!(!range1.is_equal_size(&range5));
815
816 assert!(range1 < range2);
818 assert!(range2 < range3);
819 assert!(range3 < range4);
820 assert!(range5 < range4);
821
822 range1.extend_to(10);
824 assert_eq!(range1.start(), 1);
825 assert_eq!(range1.end(), 10);
826 assert_eq!(range1.size(), 10);
827
828 range1.extend_to(20);
829 assert_eq!(range1.start(), 1);
830 assert_eq!(range1.end(), 20);
831 assert_eq!(range1.size(), 20);
832 }
833}