1use std::{
5 cmp::Ordering,
6 collections::BTreeMap,
7 fmt::{self, Debug, Display, Formatter},
8 hash::{Hash, Hasher},
9 ops::{Deref, Range, RangeInclusive},
10 sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 block::{BlockAPI, Slot, VerifiedBlock},
23 leader_scoring::ReputationScores,
24 storage::Store,
25};
26
27pub type CommitIndex = u32;
29
30pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
31
32pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
38
39pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
41
42pub(crate) type WaveNumber = u32;
45
46#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
59#[enum_dispatch(CommitAPI)]
60pub enum Commit {
61 V1(CommitV1),
62}
63
64impl Commit {
65 pub(crate) fn new(
67 index: CommitIndex,
68 previous_digest: CommitDigest,
69 timestamp_ms: BlockTimestampMs,
70 leader: BlockRef,
71 blocks: Vec<BlockRef>,
72 ) -> Self {
73 Commit::V1(CommitV1 {
74 index,
75 previous_digest,
76 timestamp_ms,
77 leader,
78 blocks,
79 })
80 }
81
82 pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
83 let bytes = bcs::to_bytes(self)?;
84 Ok(bytes.into())
85 }
86}
87
88#[enum_dispatch]
90pub trait CommitAPI {
91 fn round(&self) -> Round;
92 fn index(&self) -> CommitIndex;
93 fn previous_digest(&self) -> CommitDigest;
94 fn timestamp_ms(&self) -> BlockTimestampMs;
95 fn leader(&self) -> BlockRef;
96 fn blocks(&self) -> &[BlockRef];
97}
98
99#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
102pub struct CommitV1 {
103 index: CommitIndex,
106 previous_digest: CommitDigest,
109 timestamp_ms: BlockTimestampMs,
111 leader: BlockRef,
113 blocks: Vec<BlockRef>,
115}
116
117impl CommitAPI for CommitV1 {
118 fn round(&self) -> Round {
119 self.leader.round
120 }
121
122 fn index(&self) -> CommitIndex {
123 self.index
124 }
125
126 fn previous_digest(&self) -> CommitDigest {
127 self.previous_digest
128 }
129
130 fn timestamp_ms(&self) -> BlockTimestampMs {
131 self.timestamp_ms
132 }
133
134 fn leader(&self) -> BlockRef {
135 self.leader
136 }
137
138 fn blocks(&self) -> &[BlockRef] {
139 &self.blocks
140 }
141}
142
143#[derive(Clone, Debug, PartialEq)]
149pub struct TrustedCommit {
150 inner: Arc<Commit>,
151
152 digest: CommitDigest,
154 serialized: Bytes,
155}
156
157impl TrustedCommit {
158 pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
159 let digest = Self::compute_digest(&serialized);
160 Self {
161 inner: Arc::new(commit),
162 digest,
163 serialized,
164 }
165 }
166
167 pub(crate) fn new_for_test(
168 index: CommitIndex,
169 previous_digest: CommitDigest,
170 timestamp_ms: BlockTimestampMs,
171 leader: BlockRef,
172 blocks: Vec<BlockRef>,
173 ) -> Self {
174 let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
175 let serialized = commit.serialize().unwrap();
176 Self::new_trusted(commit, serialized)
177 }
178
179 pub(crate) fn reference(&self) -> CommitRef {
180 CommitRef {
181 index: self.index(),
182 digest: self.digest(),
183 }
184 }
185
186 pub(crate) fn digest(&self) -> CommitDigest {
187 self.digest
188 }
189
190 pub(crate) fn serialized(&self) -> &Bytes {
191 &self.serialized
192 }
193
194 pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
195 let mut hasher = DefaultHashFunction::new();
196 hasher.update(serialized);
197 CommitDigest(hasher.finalize().into())
198 }
199}
200
201impl Deref for TrustedCommit {
203 type Target = Commit;
204
205 fn deref(&self) -> &Self::Target {
206 &self.inner
207 }
208}
209
210#[derive(Clone, Debug)]
213pub(crate) struct CertifiedCommits {
214 commits: Vec<CertifiedCommit>,
215 votes: Vec<VerifiedBlock>,
216}
217
218impl CertifiedCommits {
219 pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
220 Self { commits, votes }
221 }
222
223 pub(crate) fn commits(&self) -> &[CertifiedCommit] {
224 &self.commits
225 }
226
227 pub(crate) fn votes(&self) -> &[VerifiedBlock] {
228 &self.votes
229 }
230}
231
232#[derive(Clone, Debug)]
234pub(crate) struct CertifiedCommit {
235 commit: Arc<TrustedCommit>,
236 blocks: Vec<VerifiedBlock>,
237}
238
239impl CertifiedCommit {
240 pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
241 Self {
242 commit: Arc::new(commit),
243 blocks,
244 }
245 }
246
247 pub fn blocks(&self) -> &[VerifiedBlock] {
248 &self.blocks
249 }
250}
251
252impl Deref for CertifiedCommit {
253 type Target = TrustedCommit;
254
255 fn deref(&self) -> &Self::Target {
256 &self.commit
257 }
258}
259
260#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
262pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
263
264impl CommitDigest {
265 pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
267 pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
268
269 pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
270 self.0
271 }
272}
273
274impl Hash for CommitDigest {
275 fn hash<H: Hasher>(&self, state: &mut H) {
276 state.write(&self.0[..8]);
277 }
278}
279
280impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
281 fn from(hd: CommitDigest) -> Self {
282 Digest::new(hd.0)
283 }
284}
285
286impl fmt::Display for CommitDigest {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
288 write!(
289 f,
290 "{}",
291 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
292 .get(0..4)
293 .ok_or(fmt::Error)?
294 )
295 }
296}
297
298impl fmt::Debug for CommitDigest {
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
300 write!(
301 f,
302 "{}",
303 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
304 )
305 }
306}
307
308#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
310pub struct CommitRef {
311 pub index: CommitIndex,
312 pub digest: CommitDigest,
313}
314
315impl CommitRef {
316 pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
317 Self { index, digest }
318 }
319}
320
321impl fmt::Display for CommitRef {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
323 write!(f, "C{}({})", self.index, self.digest)
324 }
325}
326
327impl fmt::Debug for CommitRef {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
329 write!(f, "C{}({:?})", self.index, self.digest)
330 }
331}
332
333pub type CommitVote = CommitRef;
335
336#[derive(Clone, PartialEq)]
343pub struct CommittedSubDag {
344 pub leader: BlockRef,
348 pub blocks: Vec<VerifiedBlock>,
350 pub timestamp_ms: BlockTimestampMs,
352 pub commit_ref: CommitRef,
356
357 pub decided_with_local_blocks: bool,
372 pub recovered_rejected_transactions: bool,
374
375 pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
379}
380
381impl CommittedSubDag {
382 pub fn new(
384 leader: BlockRef,
385 blocks: Vec<VerifiedBlock>,
386 timestamp_ms: BlockTimestampMs,
387 commit_ref: CommitRef,
388 ) -> Self {
389 Self {
390 leader,
391 blocks,
392 timestamp_ms,
393 commit_ref,
394 decided_with_local_blocks: true,
395 recovered_rejected_transactions: false,
396 rejected_transactions_by_block: BTreeMap::new(),
397 }
398 }
399}
400
401pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
404 blocks.sort_by(|a, b| {
405 a.round()
406 .cmp(&b.round())
407 .then_with(|| a.author().cmp(&b.author()))
408 })
409}
410
411impl Display for CommittedSubDag {
412 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
413 write!(
414 f,
415 "{}@{} [{}])",
416 self.commit_ref,
417 self.leader,
418 self.blocks
419 .iter()
420 .map(|b| b.reference().to_string())
421 .join(", ")
422 )
423 }
424}
425
426impl fmt::Debug for CommittedSubDag {
427 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428 write!(
429 f,
430 "{}@{} [{}])",
431 self.commit_ref,
432 self.leader,
433 self.blocks
434 .iter()
435 .map(|b| b.reference().to_string())
436 .join(", ")
437 )?;
438 write!(
439 f,
440 ";{}ms;{};{};[{}]",
441 self.timestamp_ms,
442 self.decided_with_local_blocks,
443 self.recovered_rejected_transactions,
444 self.rejected_transactions_by_block
445 .iter()
446 .map(|(block_ref, transactions)| {
447 format!("{}: {}, ", block_ref, transactions.len())
448 })
449 .collect::<Vec<_>>()
450 .join(", "),
451 )
452 }
453}
454
455pub(crate) fn load_committed_subdag_from_store(
457 store: &dyn Store,
458 commit: TrustedCommit,
459) -> CommittedSubDag {
460 let mut leader_block_idx = None;
461 let commit_blocks = store
462 .read_blocks(commit.blocks())
463 .expect("We should have the block referenced in the commit data");
464 let blocks = commit_blocks
465 .into_iter()
466 .enumerate()
467 .map(|(idx, commit_block_opt)| {
468 let commit_block =
469 commit_block_opt.expect("We should have the block referenced in the commit data");
470 if commit_block.reference() == commit.leader() {
471 leader_block_idx = Some(idx);
472 }
473 commit_block
474 })
475 .collect::<Vec<_>>();
476 let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
477 let leader_block_ref = blocks[leader_block_idx].reference();
478
479 let mut subdag = CommittedSubDag::new(
480 leader_block_ref,
481 blocks,
482 commit.timestamp_ms(),
483 commit.reference(),
484 );
485
486 let reject_votes = store
487 .read_rejected_transactions(commit.reference())
488 .unwrap();
489 if let Some(reject_votes) = reject_votes {
490 subdag.decided_with_local_blocks = true;
491 subdag.recovered_rejected_transactions = true;
492 subdag.rejected_transactions_by_block = reject_votes;
493 } else {
494 subdag.decided_with_local_blocks = false;
495 subdag.recovered_rejected_transactions = false;
496 }
497
498 subdag
499}
500
501#[derive(Debug, Clone, Copy, Eq, PartialEq)]
502pub(crate) enum Decision {
503 Direct,
504 Indirect,
505 Certified, }
507
508#[derive(Debug, Clone, PartialEq)]
510pub(crate) enum LeaderStatus {
511 Commit(VerifiedBlock),
512 Skip(Slot),
513 Undecided(Slot),
514}
515
516impl LeaderStatus {
517 pub(crate) fn round(&self) -> Round {
518 match self {
519 Self::Commit(block) => block.round(),
520 Self::Skip(leader) => leader.round,
521 Self::Undecided(leader) => leader.round,
522 }
523 }
524
525 pub(crate) fn is_decided(&self) -> bool {
526 match self {
527 Self::Commit(_) => true,
528 Self::Skip(_) => true,
529 Self::Undecided(_) => false,
530 }
531 }
532
533 pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
534 match self {
535 Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
536 Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
537 Self::Undecided(..) => None,
538 }
539 }
540}
541
542impl Display for LeaderStatus {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 match self {
545 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
546 Self::Skip(slot) => write!(f, "Skip({slot})"),
547 Self::Undecided(slot) => write!(f, "Undecided({slot})"),
548 }
549 }
550}
551
552#[derive(Debug, Clone, PartialEq)]
554pub(crate) enum DecidedLeader {
555 Commit(VerifiedBlock, bool),
559 Skip(Slot),
561}
562
563impl DecidedLeader {
564 pub(crate) fn slot(&self) -> Slot {
566 match self {
567 Self::Commit(block, _direct) => block.reference().into(),
568 Self::Skip(slot) => *slot,
569 }
570 }
571
572 pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
574 match self {
575 Self::Commit(block, _direct) => Some(block),
576 Self::Skip(_) => None,
577 }
578 }
579
580 #[cfg(test)]
581 pub(crate) fn round(&self) -> Round {
582 match self {
583 Self::Commit(block, _direct) => block.round(),
584 Self::Skip(leader) => leader.round,
585 }
586 }
587
588 #[cfg(test)]
589 pub(crate) fn authority(&self) -> consensus_config::AuthorityIndex {
590 match self {
591 Self::Commit(block, _direct) => block.author(),
592 Self::Skip(leader) => leader.authority,
593 }
594 }
595}
596
597impl Display for DecidedLeader {
598 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599 match self {
600 Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
601 Self::Skip(slot) => write!(f, "Skip({slot})"),
602 }
603 }
604}
605
606#[derive(Clone, Debug, Serialize, Deserialize)]
612pub struct CommitInfo {
613 pub(crate) committed_rounds: Vec<Round>,
614 pub(crate) reputation_scores: ReputationScores,
615}
616
617#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
623pub struct CommitRange(Range<CommitIndex>);
624
625impl CommitRange {
626 pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
627 Self(*range.start()..(*range.end()).saturating_add(1))
630 }
631
632 pub fn start(&self) -> CommitIndex {
634 self.0.start
635 }
636
637 pub fn end(&self) -> CommitIndex {
639 self.0.end.saturating_sub(1)
640 }
641
642 pub fn extend_to(&mut self, other: CommitIndex) {
643 let new_end = other.saturating_add(1);
644 assert!(self.0.end <= new_end);
645 self.0 = self.0.start..new_end;
646 }
647
648 pub fn size(&self) -> usize {
649 self.0
650 .end
651 .checked_sub(self.0.start)
652 .expect("Range should never have end < start") as usize
653 }
654
655 pub fn is_equal_size(&self, other: &Self) -> bool {
657 self.size() == other.size()
658 }
659
660 pub fn is_next_range(&self, other: &Self) -> bool {
662 self.0.end == other.0.start
663 }
664}
665
666impl Ord for CommitRange {
667 fn cmp(&self, other: &Self) -> Ordering {
668 self.start()
669 .cmp(&other.start())
670 .then_with(|| self.end().cmp(&other.end()))
671 }
672}
673
674impl PartialOrd for CommitRange {
675 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
676 Some(self.cmp(other))
677 }
678}
679
680impl From<RangeInclusive<CommitIndex>> for CommitRange {
681 fn from(range: RangeInclusive<CommitIndex>) -> Self {
682 Self::new(range)
683 }
684}
685
686impl Debug for CommitRange {
688 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
689 write!(f, "CommitRange({}..={})", self.start(), self.end())
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use std::sync::Arc;
696
697 use super::*;
698 use crate::{
699 block::TestBlock,
700 context::Context,
701 storage::{WriteBatch, mem_store::MemStore},
702 };
703
704 #[tokio::test]
705 async fn test_new_subdag_from_commit() {
706 let store = Arc::new(MemStore::new());
707 let context = Arc::new(Context::new_for_test(4).0);
708 let wave_length = DEFAULT_WAVE_LENGTH;
709
710 let first_wave_rounds: u32 = wave_length;
712 let num_authorities: u32 = 4;
713
714 let mut blocks = Vec::new();
715 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
716 .committee
717 .authorities()
718 .map(|index| {
719 let author_idx = index.0.value() as u32;
720 let block = TestBlock::new(0, author_idx).build();
721 VerifiedBlock::new_for_test(block)
722 })
723 .map(|block| (block.reference(), block))
724 .unzip();
725 store.write(WriteBatch::default().blocks(genesis)).unwrap();
727 blocks.append(&mut genesis_references.clone());
728
729 let mut ancestors = genesis_references;
730 let mut leader = None;
731 for round in 1..=first_wave_rounds {
732 let mut new_ancestors = vec![];
733 for author in 0..num_authorities {
734 let base_ts = round as BlockTimestampMs * 1000;
735 let block = VerifiedBlock::new_for_test(
736 TestBlock::new(round, author)
737 .set_timestamp_ms(base_ts + (author + round) as u64)
738 .set_ancestors(ancestors.clone())
739 .build(),
740 );
741 store
742 .write(WriteBatch::default().blocks(vec![block.clone()]))
743 .unwrap();
744 new_ancestors.push(block.reference());
745 blocks.push(block.reference());
746
747 if round == first_wave_rounds {
750 leader = Some(block.clone());
751 break;
752 }
753 }
754 ancestors = new_ancestors;
755 }
756
757 let leader_block = leader.unwrap();
758 let leader_ref = leader_block.reference();
759 let commit_index = 1;
760 let commit = TrustedCommit::new_for_test(
761 commit_index,
762 CommitDigest::MIN,
763 leader_block.timestamp_ms(),
764 leader_ref,
765 blocks.clone(),
766 );
767 let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone());
768 assert_eq!(subdag.leader, leader_ref);
769 assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
770 assert_eq!(
771 subdag.blocks.len(),
772 (num_authorities * wave_length) as usize + 1
773 );
774 assert_eq!(subdag.commit_ref, commit.reference());
775 }
776
777 #[tokio::test]
778 async fn test_commit_range() {
779 telemetry_subscribers::init_for_testing();
780 let mut range1 = CommitRange::new(1..=5);
781 let range2 = CommitRange::new(2..=6);
782 let range3 = CommitRange::new(5..=10);
783 let range4 = CommitRange::new(6..=10);
784 let range5 = CommitRange::new(6..=9);
785 let range6 = CommitRange::new(1..=1);
786
787 assert_eq!(range1.start(), 1);
788 assert_eq!(range1.end(), 5);
789
790 assert_eq!(range1.size(), 5);
792 assert_eq!(range3.size(), 6);
793 assert_eq!(range6.size(), 1);
794
795 assert!(!range1.is_next_range(&range2));
797 assert!(!range1.is_next_range(&range3));
798 assert!(range1.is_next_range(&range4));
799 assert!(range1.is_next_range(&range5));
800
801 assert!(range1.is_equal_size(&range2));
803 assert!(!range1.is_equal_size(&range3));
804 assert!(range1.is_equal_size(&range4));
805 assert!(!range1.is_equal_size(&range5));
806
807 assert!(range1 < range2);
809 assert!(range2 < range3);
810 assert!(range3 < range4);
811 assert!(range5 < range4);
812
813 range1.extend_to(10);
815 assert_eq!(range1.start(), 1);
816 assert_eq!(range1.end(), 10);
817 assert_eq!(range1.size(), 10);
818
819 range1.extend_to(20);
820 assert_eq!(range1.start(), 1);
821 assert_eq!(range1.end(), 20);
822 assert_eq!(range1.size(), 20);
823 }
824}