1use std::{
5 cmp::Ordering,
6 collections::BTreeMap,
7 fmt::{self, Debug, Display, Formatter},
8 hash::{Hash, Hasher},
9 ops::{Deref, Range, RangeInclusive},
10 sync::Arc,
11};
12
13use bytes::Bytes;
14use consensus_config::{AuthorityIndex, DIGEST_LENGTH, DefaultHashFunction};
15use consensus_types::block::{BlockRef, BlockTimestampMs, Round, TransactionIndex};
16use enum_dispatch::enum_dispatch;
17use fastcrypto::hash::{Digest, HashFunction as _};
18use itertools::Itertools as _;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 block::{BlockAPI, Slot, VerifiedBlock},
23 context::Context,
24 leader_scoring::ReputationScores,
25 storage::Store,
26};
27
28pub type CommitIndex = u32;
30
31pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
32
33pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
39
40pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
42
43pub(crate) type WaveNumber = u32;
46
47#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
60#[enum_dispatch(CommitAPI)]
61pub enum Commit {
62 V1(CommitV1),
63}
64
65impl Commit {
66 pub(crate) fn new(
68 index: CommitIndex,
69 previous_digest: CommitDigest,
70 timestamp_ms: BlockTimestampMs,
71 leader: BlockRef,
72 blocks: Vec<BlockRef>,
73 ) -> Self {
74 Commit::V1(CommitV1 {
75 index,
76 previous_digest,
77 timestamp_ms,
78 leader,
79 blocks,
80 })
81 }
82
83 pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
84 let bytes = bcs::to_bytes(self)?;
85 Ok(bytes.into())
86 }
87}
88
89#[enum_dispatch]
91pub trait CommitAPI {
92 fn round(&self) -> Round;
93 fn index(&self) -> CommitIndex;
94 fn previous_digest(&self) -> CommitDigest;
95 fn timestamp_ms(&self) -> BlockTimestampMs;
96 fn leader(&self) -> BlockRef;
97 fn blocks(&self) -> &[BlockRef];
98}
99
100#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
103pub struct CommitV1 {
104 index: CommitIndex,
107 previous_digest: CommitDigest,
110 timestamp_ms: BlockTimestampMs,
112 leader: BlockRef,
114 blocks: Vec<BlockRef>,
116}
117
118impl CommitAPI for CommitV1 {
119 fn round(&self) -> Round {
120 self.leader.round
121 }
122
123 fn index(&self) -> CommitIndex {
124 self.index
125 }
126
127 fn previous_digest(&self) -> CommitDigest {
128 self.previous_digest
129 }
130
131 fn timestamp_ms(&self) -> BlockTimestampMs {
132 self.timestamp_ms
133 }
134
135 fn leader(&self) -> BlockRef {
136 self.leader
137 }
138
139 fn blocks(&self) -> &[BlockRef] {
140 &self.blocks
141 }
142}
143
144#[derive(Clone, Debug, PartialEq)]
150pub struct TrustedCommit {
151 inner: Arc<Commit>,
152
153 digest: CommitDigest,
155 serialized: Bytes,
156}
157
158impl TrustedCommit {
159 pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
160 let digest = Self::compute_digest(&serialized);
161 Self {
162 inner: Arc::new(commit),
163 digest,
164 serialized,
165 }
166 }
167
168 pub(crate) fn new_for_test(
169 index: CommitIndex,
170 previous_digest: CommitDigest,
171 timestamp_ms: BlockTimestampMs,
172 leader: BlockRef,
173 blocks: Vec<BlockRef>,
174 ) -> Self {
175 let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
176 let serialized = commit.serialize().unwrap();
177 Self::new_trusted(commit, serialized)
178 }
179
180 pub(crate) fn reference(&self) -> CommitRef {
181 CommitRef {
182 index: self.index(),
183 digest: self.digest(),
184 }
185 }
186
187 pub(crate) fn digest(&self) -> CommitDigest {
188 self.digest
189 }
190
191 pub(crate) fn serialized(&self) -> &Bytes {
192 &self.serialized
193 }
194
195 pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
196 let mut hasher = DefaultHashFunction::new();
197 hasher.update(serialized);
198 CommitDigest(hasher.finalize().into())
199 }
200}
201
202impl Deref for TrustedCommit {
204 type Target = Commit;
205
206 fn deref(&self) -> &Self::Target {
207 &self.inner
208 }
209}
210
211#[derive(Clone, Debug)]
214pub(crate) struct CertifiedCommits {
215 commits: Vec<CertifiedCommit>,
216 votes: Vec<VerifiedBlock>,
217}
218
219impl CertifiedCommits {
220 pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
221 Self { commits, votes }
222 }
223
224 pub(crate) fn commits(&self) -> &[CertifiedCommit] {
225 &self.commits
226 }
227
228 pub(crate) fn votes(&self) -> &[VerifiedBlock] {
229 &self.votes
230 }
231}
232
233#[derive(Clone, Debug)]
235pub(crate) struct CertifiedCommit {
236 commit: Arc<TrustedCommit>,
237 blocks: Vec<VerifiedBlock>,
238}
239
240impl CertifiedCommit {
241 pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
242 Self {
243 commit: Arc::new(commit),
244 blocks,
245 }
246 }
247
248 pub fn blocks(&self) -> &[VerifiedBlock] {
249 &self.blocks
250 }
251}
252
253impl Deref for CertifiedCommit {
254 type Target = TrustedCommit;
255
256 fn deref(&self) -> &Self::Target {
257 &self.commit
258 }
259}
260
261#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
263pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
264
265impl CommitDigest {
266 pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
268 pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
269
270 pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
271 self.0
272 }
273}
274
275impl Hash for CommitDigest {
276 fn hash<H: Hasher>(&self, state: &mut H) {
277 state.write(&self.0[..8]);
278 }
279}
280
281impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
282 fn from(hd: CommitDigest) -> Self {
283 Digest::new(hd.0)
284 }
285}
286
287impl fmt::Display for CommitDigest {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
289 write!(
290 f,
291 "{}",
292 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
293 .get(0..4)
294 .ok_or(fmt::Error)?
295 )
296 }
297}
298
299impl fmt::Debug for CommitDigest {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
301 write!(
302 f,
303 "{}",
304 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
305 )
306 }
307}
308
309#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
311pub struct CommitRef {
312 pub index: CommitIndex,
313 pub digest: CommitDigest,
314}
315
316impl CommitRef {
317 pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
318 Self { index, digest }
319 }
320}
321
322impl fmt::Display for CommitRef {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
324 write!(f, "C{}({})", self.index, self.digest)
325 }
326}
327
328impl fmt::Debug for CommitRef {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
330 write!(f, "C{}({:?})", self.index, self.digest)
331 }
332}
333
334pub type CommitVote = CommitRef;
336
337#[derive(Clone, PartialEq)]
344pub struct CommittedSubDag {
345 pub leader: BlockRef,
349 pub blocks: Vec<VerifiedBlock>,
351 pub timestamp_ms: BlockTimestampMs,
353 pub commit_ref: CommitRef,
357
358 pub decided_with_local_blocks: bool,
373 pub recovered_rejected_transactions: bool,
375 pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
378
379 pub rejected_transactions_by_block: BTreeMap<BlockRef, Vec<TransactionIndex>>,
383 pub always_accept_system_transactions: bool,
385}
386
387impl CommittedSubDag {
388 pub fn new(
390 leader: BlockRef,
391 blocks: Vec<VerifiedBlock>,
392 timestamp_ms: BlockTimestampMs,
393 commit_ref: CommitRef,
394 always_accept_system_transactions: bool,
395 ) -> Self {
396 Self {
397 leader,
398 blocks,
399 timestamp_ms,
400 commit_ref,
401 decided_with_local_blocks: true,
402 recovered_rejected_transactions: false,
403 reputation_scores_desc: vec![],
404 rejected_transactions_by_block: BTreeMap::new(),
405 always_accept_system_transactions,
406 }
407 }
408}
409
410pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
413 blocks.sort_by(|a, b| {
414 a.round()
415 .cmp(&b.round())
416 .then_with(|| a.author().cmp(&b.author()))
417 })
418}
419
420impl Display for CommittedSubDag {
421 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422 write!(
423 f,
424 "{}@{} [{}])",
425 self.commit_ref,
426 self.leader,
427 self.blocks
428 .iter()
429 .map(|b| b.reference().to_string())
430 .join(", ")
431 )
432 }
433}
434
435impl fmt::Debug for CommittedSubDag {
436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437 write!(
438 f,
439 "{}@{} [{}])",
440 self.commit_ref,
441 self.leader,
442 self.blocks
443 .iter()
444 .map(|b| b.reference().to_string())
445 .join(", ")
446 )?;
447 write!(
448 f,
449 ";{}ms;rs{:?};{};{};[{}]",
450 self.timestamp_ms,
451 self.reputation_scores_desc,
452 self.decided_with_local_blocks,
453 self.recovered_rejected_transactions,
454 self.rejected_transactions_by_block
455 .iter()
456 .map(|(block_ref, transactions)| {
457 format!("{}: {}, ", block_ref, transactions.len())
458 })
459 .collect::<Vec<_>>()
460 .join(", "),
461 )
462 }
463}
464
465pub(crate) fn load_committed_subdag_from_store(
467 context: &Arc<Context>,
468 store: &dyn Store,
469 commit: TrustedCommit,
470 reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
471) -> CommittedSubDag {
472 let mut leader_block_idx = None;
473 let commit_blocks = store
474 .read_blocks(commit.blocks())
475 .expect("We should have the block referenced in the commit data");
476 let blocks = commit_blocks
477 .into_iter()
478 .enumerate()
479 .map(|(idx, commit_block_opt)| {
480 let commit_block =
481 commit_block_opt.expect("We should have the block referenced in the commit data");
482 if commit_block.reference() == commit.leader() {
483 leader_block_idx = Some(idx);
484 }
485 commit_block
486 })
487 .collect::<Vec<_>>();
488 let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
489 let leader_block_ref = blocks[leader_block_idx].reference();
490
491 let mut subdag = CommittedSubDag::new(
492 leader_block_ref,
493 blocks,
494 commit.timestamp_ms(),
495 commit.reference(),
496 context
497 .protocol_config
498 .consensus_always_accept_system_transactions(),
499 );
500
501 subdag.reputation_scores_desc = reputation_scores_desc;
502
503 let reject_votes = store
504 .read_rejected_transactions(commit.reference())
505 .unwrap();
506 if let Some(reject_votes) = reject_votes {
507 subdag.decided_with_local_blocks = true;
508 subdag.recovered_rejected_transactions = true;
509 subdag.rejected_transactions_by_block = reject_votes;
510 } else {
511 subdag.decided_with_local_blocks = false;
512 subdag.recovered_rejected_transactions = false;
513 }
514
515 subdag
516}
517
518#[derive(Debug, Clone, Copy, Eq, PartialEq)]
519pub(crate) enum Decision {
520 Direct,
521 Indirect,
522 Certified, }
524
525#[derive(Debug, Clone, PartialEq)]
527pub(crate) enum LeaderStatus {
528 Commit(VerifiedBlock),
529 Skip(Slot),
530 Undecided(Slot),
531}
532
533impl LeaderStatus {
534 pub(crate) fn round(&self) -> Round {
535 match self {
536 Self::Commit(block) => block.round(),
537 Self::Skip(leader) => leader.round,
538 Self::Undecided(leader) => leader.round,
539 }
540 }
541
542 pub(crate) fn is_decided(&self) -> bool {
543 match self {
544 Self::Commit(_) => true,
545 Self::Skip(_) => true,
546 Self::Undecided(_) => false,
547 }
548 }
549
550 pub(crate) fn into_decided_leader(self, direct: bool) -> Option<DecidedLeader> {
551 match self {
552 Self::Commit(block) => Some(DecidedLeader::Commit(block, direct)),
553 Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
554 Self::Undecided(..) => None,
555 }
556 }
557}
558
559impl Display for LeaderStatus {
560 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561 match self {
562 Self::Commit(block) => write!(f, "Commit({})", block.reference()),
563 Self::Skip(slot) => write!(f, "Skip({slot})"),
564 Self::Undecided(slot) => write!(f, "Undecided({slot})"),
565 }
566 }
567}
568
569#[derive(Debug, Clone, PartialEq)]
571pub(crate) enum DecidedLeader {
572 Commit(VerifiedBlock, bool),
576 Skip(Slot),
578}
579
580impl DecidedLeader {
581 pub(crate) fn slot(&self) -> Slot {
583 match self {
584 Self::Commit(block, _direct) => block.reference().into(),
585 Self::Skip(slot) => *slot,
586 }
587 }
588
589 pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
591 match self {
592 Self::Commit(block, _direct) => Some(block),
593 Self::Skip(_) => None,
594 }
595 }
596
597 #[cfg(test)]
598 pub(crate) fn round(&self) -> Round {
599 match self {
600 Self::Commit(block, _direct) => block.round(),
601 Self::Skip(leader) => leader.round,
602 }
603 }
604
605 #[cfg(test)]
606 pub(crate) fn authority(&self) -> AuthorityIndex {
607 match self {
608 Self::Commit(block, _direct) => block.author(),
609 Self::Skip(leader) => leader.authority,
610 }
611 }
612}
613
614impl Display for DecidedLeader {
615 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616 match self {
617 Self::Commit(block, _direct) => write!(f, "Commit({})", block.reference()),
618 Self::Skip(slot) => write!(f, "Skip({slot})"),
619 }
620 }
621}
622
623#[derive(Clone, Debug, Serialize, Deserialize)]
629pub struct CommitInfo {
630 pub(crate) committed_rounds: Vec<Round>,
631 pub(crate) reputation_scores: ReputationScores,
632}
633
634#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
640pub struct CommitRange(Range<CommitIndex>);
641
642impl CommitRange {
643 pub fn new(range: RangeInclusive<CommitIndex>) -> Self {
644 Self(*range.start()..(*range.end()).saturating_add(1))
647 }
648
649 pub fn start(&self) -> CommitIndex {
651 self.0.start
652 }
653
654 pub fn end(&self) -> CommitIndex {
656 self.0.end.saturating_sub(1)
657 }
658
659 pub fn extend_to(&mut self, other: CommitIndex) {
660 let new_end = other.saturating_add(1);
661 assert!(self.0.end <= new_end);
662 self.0 = self.0.start..new_end;
663 }
664
665 pub fn size(&self) -> usize {
666 self.0
667 .end
668 .checked_sub(self.0.start)
669 .expect("Range should never have end < start") as usize
670 }
671
672 pub fn is_equal_size(&self, other: &Self) -> bool {
674 self.size() == other.size()
675 }
676
677 pub fn is_next_range(&self, other: &Self) -> bool {
679 self.0.end == other.0.start
680 }
681}
682
683impl Ord for CommitRange {
684 fn cmp(&self, other: &Self) -> Ordering {
685 self.start()
686 .cmp(&other.start())
687 .then_with(|| self.end().cmp(&other.end()))
688 }
689}
690
691impl PartialOrd for CommitRange {
692 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
693 Some(self.cmp(other))
694 }
695}
696
697impl From<RangeInclusive<CommitIndex>> for CommitRange {
698 fn from(range: RangeInclusive<CommitIndex>) -> Self {
699 Self::new(range)
700 }
701}
702
703impl Debug for CommitRange {
705 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
706 write!(f, "CommitRange({}..={})", self.start(), self.end())
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use std::sync::Arc;
713
714 use super::*;
715 use crate::{
716 block::TestBlock,
717 context::Context,
718 storage::{WriteBatch, mem_store::MemStore},
719 };
720
721 #[tokio::test]
722 async fn test_new_subdag_from_commit() {
723 let store = Arc::new(MemStore::new());
724 let context = Arc::new(Context::new_for_test(4).0);
725 let wave_length = DEFAULT_WAVE_LENGTH;
726
727 let first_wave_rounds: u32 = wave_length;
729 let num_authorities: u32 = 4;
730
731 let mut blocks = Vec::new();
732 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
733 .committee
734 .authorities()
735 .map(|index| {
736 let author_idx = index.0.value() as u32;
737 let block = TestBlock::new(0, author_idx).build();
738 VerifiedBlock::new_for_test(block)
739 })
740 .map(|block| (block.reference(), block))
741 .unzip();
742 store.write(WriteBatch::default().blocks(genesis)).unwrap();
744 blocks.append(&mut genesis_references.clone());
745
746 let mut ancestors = genesis_references;
747 let mut leader = None;
748 for round in 1..=first_wave_rounds {
749 let mut new_ancestors = vec![];
750 for author in 0..num_authorities {
751 let base_ts = round as BlockTimestampMs * 1000;
752 let block = VerifiedBlock::new_for_test(
753 TestBlock::new(round, author)
754 .set_timestamp_ms(base_ts + (author + round) as u64)
755 .set_ancestors(ancestors.clone())
756 .build(),
757 );
758 store
759 .write(WriteBatch::default().blocks(vec![block.clone()]))
760 .unwrap();
761 new_ancestors.push(block.reference());
762 blocks.push(block.reference());
763
764 if round == first_wave_rounds {
767 leader = Some(block.clone());
768 break;
769 }
770 }
771 ancestors = new_ancestors;
772 }
773
774 let leader_block = leader.unwrap();
775 let leader_ref = leader_block.reference();
776 let commit_index = 1;
777 let commit = TrustedCommit::new_for_test(
778 commit_index,
779 CommitDigest::MIN,
780 leader_block.timestamp_ms(),
781 leader_ref,
782 blocks.clone(),
783 );
784 let subdag =
785 load_committed_subdag_from_store(&context, store.as_ref(), commit.clone(), vec![]);
786 assert_eq!(subdag.leader, leader_ref);
787 assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
788 assert_eq!(
789 subdag.blocks.len(),
790 (num_authorities * wave_length) as usize + 1
791 );
792 assert_eq!(subdag.commit_ref, commit.reference());
793 assert_eq!(subdag.reputation_scores_desc, vec![]);
794 }
795
796 #[tokio::test]
797 async fn test_commit_range() {
798 telemetry_subscribers::init_for_testing();
799 let mut range1 = CommitRange::new(1..=5);
800 let range2 = CommitRange::new(2..=6);
801 let range3 = CommitRange::new(5..=10);
802 let range4 = CommitRange::new(6..=10);
803 let range5 = CommitRange::new(6..=9);
804 let range6 = CommitRange::new(1..=1);
805
806 assert_eq!(range1.start(), 1);
807 assert_eq!(range1.end(), 5);
808
809 assert_eq!(range1.size(), 5);
811 assert_eq!(range3.size(), 6);
812 assert_eq!(range6.size(), 1);
813
814 assert!(!range1.is_next_range(&range2));
816 assert!(!range1.is_next_range(&range3));
817 assert!(range1.is_next_range(&range4));
818 assert!(range1.is_next_range(&range5));
819
820 assert!(range1.is_equal_size(&range2));
822 assert!(!range1.is_equal_size(&range3));
823 assert!(range1.is_equal_size(&range4));
824 assert!(!range1.is_equal_size(&range5));
825
826 assert!(range1 < range2);
828 assert!(range2 < range3);
829 assert!(range3 < range4);
830 assert!(range5 < range4);
831
832 range1.extend_to(10);
834 assert_eq!(range1.start(), 1);
835 assert_eq!(range1.end(), 10);
836 assert_eq!(range1.size(), 10);
837
838 range1.extend_to(20);
839 assert_eq!(range1.start(), 1);
840 assert_eq!(range1.end(), 20);
841 assert_eq!(range1.size(), 20);
842 }
843}