1use std::{
5 cmp::Ordering,
6 collections::BTreeMap,
7 fmt::{self, Debug, Display, Formatter},
8 hash::{Hash, Hasher},
9 ops::{Deref, Range, RangeInclusive},
10 sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 block::{BlockAPI, Slot, VerifiedBlock},
23 context::Context,
24 leader_scoring::ReputationScores,
25 storage::Store,
26};
27
28pub type CommitIndex = u32;
30
31pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
32
33pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
39
40pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
42
43pub(crate) type WaveNumber = u32;
46
47#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
60#[enum_dispatch(CommitAPI)]
61pub enum Commit {
62 V1(CommitV1),
63}
64
65impl Commit {
66 pub(crate) fn new(
68 index: CommitIndex,
69 previous_digest: CommitDigest,
70 timestamp_ms: BlockTimestampMs,
71 leader: BlockRef,
72 blocks: Vec<BlockRef>,
73 ) -> Self {
74 Commit::V1(CommitV1 {
75 index,
76 previous_digest,
77 timestamp_ms,
78 leader,
79 blocks,
80 })
81 }
82
83 pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
84 let bytes = bcs::to_bytes(self)?;
85 Ok(bytes.into())
86 }
87}
88
89#[enum_dispatch]
91pub trait CommitAPI {
92 fn round(&self) -> Round;
93 fn index(&self) -> CommitIndex;
94 fn previous_digest(&self) -> CommitDigest;
95 fn timestamp_ms(&self) -> BlockTimestampMs;
96 fn leader(&self) -> BlockRef;
97 fn blocks(&self) -> &[BlockRef];
98}
99
100#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
103pub struct CommitV1 {
104 index: CommitIndex,
107 previous_digest: CommitDigest,
110 timestamp_ms: BlockTimestampMs,
112 leader: BlockRef,
114 blocks: Vec<BlockRef>,
116}
117
118impl CommitAPI for CommitV1 {
119 fn round(&self) -> Round {
120 self.leader.round
121 }
122
123 fn index(&self) -> CommitIndex {
124 self.index
125 }
126
127 fn previous_digest(&self) -> CommitDigest {
128 self.previous_digest
129 }
130
131 fn timestamp_ms(&self) -> BlockTimestampMs {
132 self.timestamp_ms
133 }
134
135 fn leader(&self) -> BlockRef {
136 self.leader
137 }
138
139 fn blocks(&self) -> &[BlockRef] {
140 &self.blocks
141 }
142}
143
144#[derive(Clone, Debug, PartialEq)]
150pub struct TrustedCommit {
151 inner: Arc<Commit>,
152
153 digest: CommitDigest,
155 serialized: Bytes,
156}
157
158impl TrustedCommit {
159 pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
160 let digest = Self::compute_digest(&serialized);
161 Self {
162 inner: Arc::new(commit),
163 digest,
164 serialized,
165 }
166 }
167
168 pub(crate) fn new_for_test(
169 index: CommitIndex,
170 previous_digest: CommitDigest,
171 timestamp_ms: BlockTimestampMs,
172 leader: BlockRef,
173 blocks: Vec<BlockRef>,
174 ) -> Self {
175 let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
176 let serialized = commit.serialize().unwrap();
177 Self::new_trusted(commit, serialized)
178 }
179
180 pub(crate) fn reference(&self) -> CommitRef {
181 CommitRef {
182 index: self.index(),
183 digest: self.digest(),
184 }
185 }
186
187 pub(crate) fn digest(&self) -> CommitDigest {
188 self.digest
189 }
190
191 pub(crate) fn serialized(&self) -> &Bytes {
192 &self.serialized
193 }
194
195 pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
196 let mut hasher = DefaultHashFunction::new();
197 hasher.update(serialized);
198 CommitDigest(hasher.finalize().into())
199 }
200}
201
202impl Deref for TrustedCommit {
204 type Target = Commit;
205
206 fn deref(&self) -> &Self::Target {
207 &self.inner
208 }
209}
210
211#[derive(Clone, Debug)]
214pub(crate) struct CertifiedCommits {
215 commits: Vec<CertifiedCommit>,
216 votes: Vec<VerifiedBlock>,
217}
218
219impl CertifiedCommits {
220 pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
221 Self { commits, votes }
222 }
223
224 pub(crate) fn commits(&self) -> &[CertifiedCommit] {
225 &self.commits
226 }
227
228 pub(crate) fn votes(&self) -> &[VerifiedBlock] {
229 &self.votes
230 }
231}
232
233#[derive(Clone, Debug)]
235pub(crate) struct CertifiedCommit {
236 commit: Arc<TrustedCommit>,
237 blocks: Vec<VerifiedBlock>,
238}
239
240impl CertifiedCommit {
241 pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
242 Self {
243 commit: Arc::new(commit),
244 blocks,
245 }
246 }
247
248 pub fn blocks(&self) -> &[VerifiedBlock] {
249 &self.blocks
250 }
251}
252
253impl Deref for CertifiedCommit {
254 type Target = TrustedCommit;
255
256 fn deref(&self) -> &Self::Target {
257 &self.commit
258 }
259}
260
261#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
263pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
264
265impl CommitDigest {
266 pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
268 pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
269
270 pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
271 self.0
272 }
273}
274
275impl Hash for CommitDigest {
276 fn hash<H: Hasher>(&self, state: &mut H) {
277 state.write(&self.0[..8]);
278 }
279}
280
281impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
282 fn from(hd: CommitDigest) -> Self {
283 Digest::new(hd.0)
284 }
285}
286
287impl fmt::Display for CommitDigest {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
289 write!(
290 f,
291 "{}",
292 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
293 .get(0..4)
294 .ok_or(fmt::Error)?
295 )
296 }
297}
298
299impl fmt::Debug for CommitDigest {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
301 write!(
302 f,
303 "{}",
304 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
305 )
306 }
307}
308
309#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
311pub struct CommitRef {
312 pub index: CommitIndex,
313 pub digest: CommitDigest,
314}
315
316impl CommitRef {
317 pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
318 Self { index, digest }
319 }
320}
321
322impl fmt::Display for CommitRef {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
324 write!(f, "C{}({})", self.index, self.digest)
325 }
326}
327
328impl fmt::Debug for CommitRef {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
330 write!(f, "C{}({:?})", self.index, self.digest)
331 }
332}
333
334pub type CommitVote = CommitRef;
336
337#[derive(Clone, PartialEq)]
344pub struct CommittedSubDag {
345 pub leader: BlockRef,
349 pub blocks: Vec<VerifiedBlock>,
351 pub timestamp_ms: BlockTimestampMs,
353 pub commit_ref: CommitRef,
357
358 pub decided_with_local_blocks: bool,
373 pub recovered_rejected_transactions: bool,
375 pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
378
379 pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
383 pub always_accept_system_transactions: bool,
385}
386
387impl CommittedSubDag {
388 pub fn new(
390 leader: BlockRef,
391 blocks: Vec<VerifiedBlock>,
392 timestamp_ms: BlockTimestampMs,
393 commit_ref: CommitRef,
394 always_accept_system_transactions: bool,
395 ) -> Self {
396 Self {
397 leader,
398 blocks,
399 timestamp_ms,
400 commit_ref,
401 decided_with_local_blocks: true,
402 recovered_rejected_transactions: false,
403 reputation_scores_desc: vec![],
404 rejected_transactions_by_block: BTreeMap::new(),
405 always_accept_system_transactions,
406 }
407 }
408}
409
410pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
413 blocks.sort_by(|a, b| {
414 a.round()
415 .cmp(&b.round())
416 .then_with(|| a.author().cmp(&b.author()))
417 })
418}
419
420impl Display for CommittedSubDag {
421 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422 write!(
423 f,
424 "{}@{} [{}])",
425 self.commit_ref,
426 self.leader,
427 self.blocks
428 .iter()
429 .map(|b| b.reference().to_string())
430 .join(", ")
431 )
432 }
433}
434
435impl fmt::Debug for CommittedSubDag {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 write!(
438 f,
439 "{}@{} [{}])",
440 self.commit_ref,
441 self.leader,
442 self.blocks
443 .iter()
444 .map(|b| b.reference().to_string())
445 .join(", ")
446 )?;
447 write!(
448 f,
449 ";{}ms;rs{:?};{};{};[{}]",
450 self.timestamp_ms,
451 self.reputation_scores_desc,
452 self.decided_with_local_blocks,
453 self.recovered_rejected_transactions,
454 self.rejected_transactions_by_block
455 .iter()
456 .map(|(block_ref, transactions)| {
457 format!("{}: {}, ", block_ref, transactions.len())
458 })
459 .collect::<Vec<_>>()
460 .join(", "),
461 )
462 }
463}
464
465pub(crate) fn load_committed_subdag_from_store(
467 context: &Arc<Context>,
468 store: &dyn Store,
469 commit: TrustedCommit,
470 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
471) -> CommittedSubDag {
472 let mut leader_block_idx = None;
473 let commit_blocks = store
474 .read_blocks(commit.blocks())
475 .expect("We should have the block referenced in the commit data");
476 let blocks = commit_blocks
477 .into_iter()
478 .enumerate()
479 .map(|(idx, commit_block_opt)| {
480 let commit_block =
481 commit_block_opt.expect("We should have the block referenced in the commit data");
482 if commit_block.reference() == commit.leader() {
483 leader_block_idx = Some(idx);
484 }
485 commit_block
486 })
487 .collect::<Vec<_>>();
488 let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
489 let leader_block_ref = blocks[leader_block_idx].reference();
490
491 let mut subdag = CommittedSubDag::new(
492 leader_block_ref,
493 blocks,
494 commit.timestamp_ms(),
495 commit.reference(),
496 context.protocol_config.always_accept_system_transactions(),
497 );
498
499 subdag.reputation_scores_desc = reputation_scores_desc;
500
501 let reject_votes = store
502 .read_rejected_transactions(commit.reference())
503 .unwrap();
504 if let Some(reject_votes) = reject_votes {
505 subdag.decided_with_local_blocks = true;
506 subdag.recovered_rejected_transactions = true;
507 subdag.rejected_transactions_by_block = reject_votes;
508 } else {
509 subdag.decided_with_local_blocks = false;
510 subdag.recovered_rejected_transactions = false;
511 }
512
513 subdag
514}
515
516#[derive(Debug, Clone, Copy, Eq, PartialEq)]
517pub(crate) enum Decision {
518 Direct,
519 Indirect,
520 Certified, }
522
523#[derive(Debug, Clone, PartialEq)]
525pub(crate) enum LeaderStatus {
526 Commit(VerifiedBlock),
527 Skip(Slot),
528 Undecided(Slot),
529}
530
531impl LeaderStatus {
532 pub(crate) fn round(&self) -> Round {
533 match self {
534 Self::Commit(block) => block.round(),
535 Self::Skip(leader) => leader.round,
536 Self::Undecided(leader) => leader.round,
537 }
538 }
539
540 pub(crate) fn is_decided(&self) -> bool {
541 match self {
542 Self::Commit(_) => true,
543 Self::Skip(_) => true,
544 Self::Undecided(_) => false,
545 }
546 }
547
548 pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
549 match self {
550 Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
551 Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
552 Self::Undecided(..) => None,
553 }
554 }
555}
556
557impl Display for LeaderStatus {
558 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559 match self {
560 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
561 Self::Skip(slot) => write!(f, "Skip({slot})"),
562 Self::Undecided(slot) => write!(f, "Undecided({slot})"),
563 }
564 }
565}
566
567#[derive(Debug, Clone, PartialEq)]
569pub(crate) enum DecidedLeader {
570 Commit(VerifiedBlock, bool),
574 Skip(Slot),
576}
577
578impl DecidedLeader {
579 pub(crate) fn slot(&self) -> Slot {
581 match self {
582 Self::Commit(block, _direct) => block.reference().into(),
583 Self::Skip(slot) => *slot,
584 }
585 }
586
587 pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
589 match self {
590 Self::Commit(block, _direct) => Some(block),
591 Self::Skip(_) => None,
592 }
593 }
594
595 #[cfg(test)]
596 pub(crate) fn round(&self) -> Round {
597 match self {
598 Self::Commit(block, _direct) => block.round(),
599 Self::Skip(leader) => leader.round,
600 }
601 }
602
603 #[cfg(test)]
604 pub(crate) fn authority(&self) -> AuthorityIndex {
605 match self {
606 Self::Commit(block, _direct) => block.author(),
607 Self::Skip(leader) => leader.authority,
608 }
609 }
610}
611
612impl Display for DecidedLeader {
613 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614 match self {
615 Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
616 Self::Skip(slot) => write!(f, "Skip({slot})"),
617 }
618 }
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize)]
627pub struct CommitInfo {
628 pub(crate) committed_rounds: Vec<Round>,
629 pub(crate) reputation_scores: ReputationScores,
630}
631
632#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
638pub struct CommitRange(Range<CommitIndex>);
639
640impl CommitRange {
641 pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
642 Self(*range.start()..(*range.end()).saturating_add(1))
645 }
646
647 pub fn start(&self) -> CommitIndex {
649 self.0.start
650 }
651
652 pub fn end(&self) -> CommitIndex {
654 self.0.end.saturating_sub(1)
655 }
656
657 pub fn extend_to(&mut self, other: CommitIndex) {
658 let new_end = other.saturating_add(1);
659 assert!(self.0.end <= new_end);
660 self.0 = self.0.start..new_end;
661 }
662
663 pub fn size(&self) -> usize {
664 self.0
665 .end
666 .checked_sub(self.0.start)
667 .expect("Range should never have end < start") as usize
668 }
669
670 pub fn is_equal_size(&self, other: &Self) -> bool {
672 self.size() == other.size()
673 }
674
675 pub fn is_next_range(&self, other: &Self) -> bool {
677 self.0.end == other.0.start
678 }
679}
680
681impl Ord for CommitRange {
682 fn cmp(&self, other: &Self) -> Ordering {
683 self.start()
684 .cmp(&other.start())
685 .then_with(|| self.end().cmp(&other.end()))
686 }
687}
688
689impl PartialOrd for CommitRange {
690 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
691 Some(self.cmp(other))
692 }
693}
694
695impl From<RangeInclusive<CommitIndex>> for CommitRange {
696 fn from(range: RangeInclusive<CommitIndex>) -> Self {
697 Self::new(range)
698 }
699}
700
701impl Debug for CommitRange {
703 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
704 write!(f, "CommitRange({}..={})", self.start(), self.end())
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use std::sync::Arc;
711
712 use super::*;
713 use crate::{
714 block::TestBlock,
715 context::Context,
716 storage::{WriteBatch, mem_store::MemStore},
717 };
718
719 #[tokio::test]
720 async fn test_new_subdag_from_commit() {
721 let store = Arc::new(MemStore::new());
722 let context = Arc::new(Context::new_for_test(4).0);
723 let wave_length = DEFAULT_WAVE_LENGTH;
724
725 let first_wave_rounds: u32 = wave_length;
727 let num_authorities: u32 = 4;
728
729 let mut blocks = Vec::new();
730 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
731 .committee
732 .authorities()
733 .map(|index| {
734 let author_idx = index.0.value() as u32;
735 let block = TestBlock::new(0, author_idx).build();
736 VerifiedBlock::new_for_test(block)
737 })
738 .map(|block| (block.reference(), block))
739 .unzip();
740 store.write(WriteBatch::default().blocks(genesis)).unwrap();
742 blocks.append(&mut genesis_references.clone());
743
744 let mut ancestors = genesis_references;
745 let mut leader = None;
746 for round in 1..=first_wave_rounds {
747 let mut new_ancestors = vec![];
748 for author in 0..num_authorities {
749 let base_ts = round as BlockTimestampMs * 1000;
750 let block = VerifiedBlock::new_for_test(
751 TestBlock::new(round, author)
752 .set_timestamp_ms(base_ts + (author + round) as u64)
753 .set_ancestors(ancestors.clone())
754 .build(),
755 );
756 store
757 .write(WriteBatch::default().blocks(vec![block.clone()]))
758 .unwrap();
759 new_ancestors.push(block.reference());
760 blocks.push(block.reference());
761
762 if round == first_wave_rounds {
765 leader = Some(block.clone());
766 break;
767 }
768 }
769 ancestors = new_ancestors;
770 }
771
772 let leader_block = leader.unwrap();
773 let leader_ref = leader_block.reference();
774 let commit_index = 1;
775 let commit = TrustedCommit::new_for_test(
776 commit_index,
777 CommitDigest::MIN,
778 leader_block.timestamp_ms(),
779 leader_ref,
780 blocks.clone(),
781 );
782 let subdag =
783 load_committed_subdag_from_store(&context, store.as_ref(), commit.clone(), vec![]);
784 assert_eq!(subdag.leader, leader_ref);
785 assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
786 assert_eq!(
787 subdag.blocks.len(),
788 (num_authorities * wave_length) as usize + 1
789 );
790 assert_eq!(subdag.commit_ref, commit.reference());
791 assert_eq!(subdag.reputation_scores_desc, vec![]);
792 }
793
794 #[tokio::test]
795 async fn test_commit_range() {
796 telemetry_subscribers::init_for_testing();
797 let mut range1 = CommitRange::new(1..=5);
798 let range2 = CommitRange::new(2..=6);
799 let range3 = CommitRange::new(5..=10);
800 let range4 = CommitRange::new(6..=10);
801 let range5 = CommitRange::new(6..=9);
802 let range6 = CommitRange::new(1..=1);
803
804 assert_eq!(range1.start(), 1);
805 assert_eq!(range1.end(), 5);
806
807 assert_eq!(range1.size(), 5);
809 assert_eq!(range3.size(), 6);
810 assert_eq!(range6.size(), 1);
811
812 assert!(!range1.is_next_range(&range2));
814 assert!(!range1.is_next_range(&range3));
815 assert!(range1.is_next_range(&range4));
816 assert!(range1.is_next_range(&range5));
817
818 assert!(range1.is_equal_size(&range2));
820 assert!(!range1.is_equal_size(&range3));
821 assert!(range1.is_equal_size(&range4));
822 assert!(!range1.is_equal_size(&range5));
823
824 assert!(range1 < range2);
826 assert!(range2 < range3);
827 assert!(range3 < range4);
828 assert!(range5 < range4);
829
830 range1.extend_to(10);
832 assert_eq!(range1.start(), 1);
833 assert_eq!(range1.end(), 10);
834 assert_eq!(range1.size(), 10);
835
836 range1.extend_to(20);
837 assert_eq!(range1.start(), 1);
838 assert_eq!(range1.end(), 20);
839 assert_eq!(range1.size(), 20);
840 }
841}