use std::{
cmp::Ordering,
fmt::{self, Debug, Display, Formatter},
hash::{Hash, Hasher},
ops::{Deref, Range, RangeInclusive},
sync::Arc,
};
use bytes::Bytes;
use consensus_config::{AuthorityIndex, DefaultHashFunction, DIGEST_LENGTH};
use enum_dispatch::enum_dispatch;
use fastcrypto::hash::{Digest, HashFunction as _};
use serde::{Deserialize, Serialize};
use crate::{
block::{BlockAPI, BlockRef, BlockTimestampMs, Round, Slot, VerifiedBlock},
leader_scoring::ReputationScores,
storage::Store,
TransactionIndex,
};
pub type CommitIndex = u32;
pub(crate) const GENESIS_COMMIT_INDEX: CommitIndex = 0;
pub(crate) const DEFAULT_WAVE_LENGTH: Round = MINIMUM_WAVE_LENGTH;
pub(crate) const MINIMUM_WAVE_LENGTH: Round = 3;
pub(crate) type WaveNumber = u32;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[enum_dispatch(CommitAPI)]
pub(crate) enum Commit {
V1(CommitV1),
}
impl Commit {
pub(crate) fn new(
index: CommitIndex,
previous_digest: CommitDigest,
timestamp_ms: BlockTimestampMs,
leader: BlockRef,
blocks: Vec<BlockRef>,
) -> Self {
Commit::V1(CommitV1 {
index,
previous_digest,
timestamp_ms,
leader,
blocks,
})
}
pub(crate) fn serialize(&self) -> Result<Bytes, bcs::Error> {
let bytes = bcs::to_bytes(self)?;
Ok(bytes.into())
}
}
#[enum_dispatch]
pub(crate) trait CommitAPI {
fn round(&self) -> Round;
fn index(&self) -> CommitIndex;
fn previous_digest(&self) -> CommitDigest;
fn timestamp_ms(&self) -> BlockTimestampMs;
fn leader(&self) -> BlockRef;
fn blocks(&self) -> &[BlockRef];
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub(crate) struct CommitV1 {
index: CommitIndex,
previous_digest: CommitDigest,
timestamp_ms: BlockTimestampMs,
leader: BlockRef,
blocks: Vec<BlockRef>,
}
impl CommitAPI for CommitV1 {
fn round(&self) -> Round {
self.leader.round
}
fn index(&self) -> CommitIndex {
self.index
}
fn previous_digest(&self) -> CommitDigest {
self.previous_digest
}
fn timestamp_ms(&self) -> BlockTimestampMs {
self.timestamp_ms
}
fn leader(&self) -> BlockRef {
self.leader
}
fn blocks(&self) -> &[BlockRef] {
&self.blocks
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct TrustedCommit {
inner: Arc<Commit>,
digest: CommitDigest,
serialized: Bytes,
}
impl TrustedCommit {
pub(crate) fn new_trusted(commit: Commit, serialized: Bytes) -> Self {
let digest = Self::compute_digest(&serialized);
Self {
inner: Arc::new(commit),
digest,
serialized,
}
}
#[cfg(test)]
pub(crate) fn new_for_test(
index: CommitIndex,
previous_digest: CommitDigest,
timestamp_ms: BlockTimestampMs,
leader: BlockRef,
blocks: Vec<BlockRef>,
) -> Self {
let commit = Commit::new(index, previous_digest, timestamp_ms, leader, blocks);
let serialized = commit.serialize().unwrap();
Self::new_trusted(commit, serialized)
}
pub(crate) fn reference(&self) -> CommitRef {
CommitRef {
index: self.index(),
digest: self.digest(),
}
}
pub(crate) fn digest(&self) -> CommitDigest {
self.digest
}
pub(crate) fn serialized(&self) -> &Bytes {
&self.serialized
}
pub(crate) fn compute_digest(serialized: &[u8]) -> CommitDigest {
let mut hasher = DefaultHashFunction::new();
hasher.update(serialized);
CommitDigest(hasher.finalize().into())
}
}
impl Deref for TrustedCommit {
type Target = Commit;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Clone, Debug)]
pub(crate) struct CertifiedCommits {
commits: Vec<CertifiedCommit>,
votes: Vec<VerifiedBlock>,
}
impl CertifiedCommits {
pub(crate) fn new(commits: Vec<CertifiedCommit>, votes: Vec<VerifiedBlock>) -> Self {
Self { commits, votes }
}
pub(crate) fn commits(&self) -> &[CertifiedCommit] {
&self.commits
}
pub(crate) fn votes(&self) -> &[VerifiedBlock] {
&self.votes
}
}
#[derive(Clone, Debug)]
pub(crate) struct CertifiedCommit {
commit: Arc<TrustedCommit>,
blocks: Vec<VerifiedBlock>,
}
impl CertifiedCommit {
pub(crate) fn new_certified(commit: TrustedCommit, blocks: Vec<VerifiedBlock>) -> Self {
Self {
commit: Arc::new(commit),
blocks,
}
}
pub fn blocks(&self) -> &[VerifiedBlock] {
&self.blocks
}
}
impl Deref for CertifiedCommit {
type Target = TrustedCommit;
fn deref(&self) -> &Self::Target {
&self.commit
}
}
#[derive(Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitDigest([u8; consensus_config::DIGEST_LENGTH]);
impl CommitDigest {
pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);
pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
self.0
}
}
impl Hash for CommitDigest {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(&self.0[..8]);
}
}
impl From<CommitDigest> for Digest<{ DIGEST_LENGTH }> {
fn from(hd: CommitDigest) -> Self {
Digest::new(hd.0)
}
}
impl fmt::Display for CommitDigest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(
f,
"{}",
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
.get(0..4)
.ok_or(fmt::Error)?
)
}
}
impl fmt::Debug for CommitDigest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(
f,
"{}",
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, self.0)
)
}
}
#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitRef {
pub index: CommitIndex,
pub digest: CommitDigest,
}
impl CommitRef {
pub fn new(index: CommitIndex, digest: CommitDigest) -> Self {
Self { index, digest }
}
}
impl fmt::Display for CommitRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "C{}({})", self.index, self.digest)
}
}
impl fmt::Debug for CommitRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "C{}({:?})", self.index, self.digest)
}
}
pub type CommitVote = CommitRef;
#[derive(Clone, PartialEq)]
pub struct CommittedSubDag {
pub leader: BlockRef,
pub blocks: Vec<VerifiedBlock>,
pub rejected_transactions_by_block: Vec<Vec<TransactionIndex>>,
pub timestamp_ms: BlockTimestampMs,
pub commit_ref: CommitRef,
pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
}
impl CommittedSubDag {
pub fn new(
leader: BlockRef,
blocks: Vec<VerifiedBlock>,
rejected_transactions_by_block: Vec<Vec<TransactionIndex>>,
timestamp_ms: BlockTimestampMs,
commit_ref: CommitRef,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> Self {
assert_eq!(blocks.len(), rejected_transactions_by_block.len());
Self {
leader,
blocks,
rejected_transactions_by_block,
timestamp_ms,
commit_ref,
reputation_scores_desc,
}
}
}
pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
blocks.sort_by(|a, b| {
a.round()
.cmp(&b.round())
.then_with(|| a.author().cmp(&b.author()))
})
}
impl Display for CommittedSubDag {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"CommittedSubDag(leader={}, ref={}, blocks=[",
self.leader, self.commit_ref
)?;
for (idx, block) in self.blocks.iter().enumerate() {
if idx > 0 {
write!(f, ", ")?;
}
write!(f, "{}", block.digest())?;
}
write!(f, "])")
}
}
impl fmt::Debug for CommittedSubDag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}@{} ([", self.leader, self.commit_ref)?;
for block in &self.blocks {
write!(f, "{}, ", block.reference())?;
}
write!(
f,
"];{}ms;rs{:?})",
self.timestamp_ms, self.reputation_scores_desc
)
}
}
pub fn load_committed_subdag_from_store(
store: &dyn Store,
commit: TrustedCommit,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> CommittedSubDag {
let mut leader_block_idx = None;
let commit_blocks = store
.read_blocks(commit.blocks())
.expect("We should have the block referenced in the commit data");
let blocks = commit_blocks
.into_iter()
.enumerate()
.map(|(idx, commit_block_opt)| {
let commit_block =
commit_block_opt.expect("We should have the block referenced in the commit data");
if commit_block.reference() == commit.leader() {
leader_block_idx = Some(idx);
}
commit_block
})
.collect::<Vec<_>>();
let rejected_transactions = vec![vec![]; blocks.len()];
let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
let leader_block_ref = blocks[leader_block_idx].reference();
CommittedSubDag::new(
leader_block_ref,
blocks,
rejected_transactions,
commit.timestamp_ms(),
commit.reference(),
reputation_scores_desc,
)
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum Decision {
Direct,
Indirect,
Certified, }
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum LeaderStatus {
Commit(VerifiedBlock),
Skip(Slot),
Undecided(Slot),
}
impl LeaderStatus {
pub(crate) fn round(&self) -> Round {
match self {
Self::Commit(block) => block.round(),
Self::Skip(leader) => leader.round,
Self::Undecided(leader) => leader.round,
}
}
pub(crate) fn is_decided(&self) -> bool {
match self {
Self::Commit(_) => true,
Self::Skip(_) => true,
Self::Undecided(_) => false,
}
}
pub(crate) fn into_decided_leader(self) -> Option<DecidedLeader> {
match self {
Self::Commit(block) => Some(DecidedLeader::Commit(block)),
Self::Skip(slot) => Some(DecidedLeader::Skip(slot)),
Self::Undecided(..) => None,
}
}
}
impl Display for LeaderStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Commit(block) => write!(f, "Commit({})", block.reference()),
Self::Skip(slot) => write!(f, "Skip({slot})"),
Self::Undecided(slot) => write!(f, "Undecided({slot})"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum DecidedLeader {
Commit(VerifiedBlock),
Skip(Slot),
}
impl DecidedLeader {
pub(crate) fn slot(&self) -> Slot {
match self {
Self::Commit(block) => block.reference().into(),
Self::Skip(slot) => *slot,
}
}
pub(crate) fn into_committed_block(self) -> Option<VerifiedBlock> {
match self {
Self::Commit(block) => Some(block),
Self::Skip(_) => None,
}
}
#[cfg(test)]
pub(crate) fn round(&self) -> Round {
match self {
Self::Commit(block) => block.round(),
Self::Skip(leader) => leader.round,
}
}
#[cfg(test)]
pub(crate) fn authority(&self) -> AuthorityIndex {
match self {
Self::Commit(block) => block.author(),
Self::Skip(leader) => leader.authority,
}
}
}
impl Display for DecidedLeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Commit(block) => write!(f, "Commit({})", block.reference()),
Self::Skip(slot) => write!(f, "Skip({slot})"),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct CommitInfo {
pub(crate) committed_rounds: Vec<Round>,
pub(crate) reputation_scores: ReputationScores,
}
#[derive(Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CommitRange(Range<CommitIndex>);
impl CommitRange {
pub(crate) fn new(range: RangeInclusive<CommitIndex>) -> Self {
Self(*range.start()..(*range.end()).saturating_add(1))
}
pub(crate) fn start(&self) -> CommitIndex {
self.0.start
}
pub(crate) fn end(&self) -> CommitIndex {
self.0.end.saturating_sub(1)
}
pub(crate) fn extend_to(&mut self, other: CommitIndex) {
let new_end = other.saturating_add(1);
assert!(self.0.end <= new_end);
self.0 = self.0.start..new_end;
}
pub(crate) fn size(&self) -> usize {
self.0
.end
.checked_sub(self.0.start)
.expect("Range should never have end < start") as usize
}
pub(crate) fn is_equal_size(&self, other: &Self) -> bool {
self.size() == other.size()
}
pub(crate) fn is_next_range(&self, other: &Self) -> bool {
self.0.end == other.0.start
}
}
impl Ord for CommitRange {
fn cmp(&self, other: &Self) -> Ordering {
self.start()
.cmp(&other.start())
.then_with(|| self.end().cmp(&other.end()))
}
}
impl PartialOrd for CommitRange {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl From<RangeInclusive<CommitIndex>> for CommitRange {
fn from(range: RangeInclusive<CommitIndex>) -> Self {
Self::new(range)
}
}
impl Debug for CommitRange {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "CommitRange({}..={})", self.start(), self.end())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::{
block::TestBlock,
context::Context,
storage::{mem_store::MemStore, WriteBatch},
};
#[tokio::test]
async fn test_new_subdag_from_commit() {
let store = Arc::new(MemStore::new());
let context = Arc::new(Context::new_for_test(4).0);
let wave_length = DEFAULT_WAVE_LENGTH;
let first_wave_rounds: u32 = wave_length;
let num_authorities: u32 = 4;
let mut blocks = Vec::new();
let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
.committee
.authorities()
.map(|index| {
let author_idx = index.0.value() as u32;
let block = TestBlock::new(0, author_idx).build();
VerifiedBlock::new_for_test(block)
})
.map(|block| (block.reference(), block))
.unzip();
store.write(WriteBatch::default().blocks(genesis)).unwrap();
blocks.append(&mut genesis_references.clone());
let mut ancestors = genesis_references;
let mut leader = None;
for round in 1..=first_wave_rounds {
let mut new_ancestors = vec![];
for author in 0..num_authorities {
let base_ts = round as BlockTimestampMs * 1000;
let block = VerifiedBlock::new_for_test(
TestBlock::new(round, author)
.set_timestamp_ms(base_ts + (author + round) as u64)
.set_ancestors(ancestors.clone())
.build(),
);
store
.write(WriteBatch::default().blocks(vec![block.clone()]))
.unwrap();
new_ancestors.push(block.reference());
blocks.push(block.reference());
if round == first_wave_rounds {
leader = Some(block.clone());
break;
}
}
ancestors = new_ancestors;
}
let leader_block = leader.unwrap();
let leader_ref = leader_block.reference();
let commit_index = 1;
let commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
leader_block.timestamp_ms(),
leader_ref,
blocks.clone(),
);
let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
assert_eq!(subdag.leader, leader_ref);
assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
assert_eq!(
subdag.blocks.len(),
(num_authorities * wave_length) as usize + 1
);
assert_eq!(subdag.commit_ref, commit.reference());
assert_eq!(subdag.reputation_scores_desc, vec![]);
}
#[tokio::test]
async fn test_commit_range() {
telemetry_subscribers::init_for_testing();
let mut range1 = CommitRange::new(1..=5);
let range2 = CommitRange::new(2..=6);
let range3 = CommitRange::new(5..=10);
let range4 = CommitRange::new(6..=10);
let range5 = CommitRange::new(6..=9);
let range6 = CommitRange::new(1..=1);
assert_eq!(range1.start(), 1);
assert_eq!(range1.end(), 5);
assert_eq!(range1.size(), 5);
assert_eq!(range3.size(), 6);
assert_eq!(range6.size(), 1);
assert!(!range1.is_next_range(&range2));
assert!(!range1.is_next_range(&range3));
assert!(range1.is_next_range(&range4));
assert!(range1.is_next_range(&range5));
assert!(range1.is_equal_size(&range2));
assert!(!range1.is_equal_size(&range3));
assert!(range1.is_equal_size(&range4));
assert!(!range1.is_equal_size(&range5));
assert!(range1 < range2);
assert!(range2 < range3);
assert!(range3 < range4);
assert!(range5 < range4);
range1.extend_to(10);
assert_eq!(range1.start(), 1);
assert_eq!(range1.end(), 10);
assert_eq!(range1.size(), 10);
range1.extend_to(20);
assert_eq!(range1.start(), 1);
assert_eq!(range1.end(), 20);
assert_eq!(range1.size(), 20);
}
}