use std::{
collections::{BTreeMap, BTreeSet},
iter,
sync::Arc,
time::Duration,
vec,
};
#[cfg(test)]
use consensus_config::{local_committee_and_keys, Stake};
use consensus_config::{AuthorityIndex, ProtocolKeyPair};
use consensus_types::block::{BlockRef, BlockTimestampMs, Round};
use itertools::Itertools as _;
#[cfg(test)]
use mysten_metrics::monitored_mpsc::UnboundedReceiver;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
use sui_macros::fail_point;
use tokio::{
sync::{broadcast, watch},
time::Instant,
};
use tracing::{debug, info, trace, warn};
use crate::{
ancestor::{AncestorState, AncestorStateManager},
block::{
Block, BlockAPI, BlockV1, BlockV2, ExtendedBlock, SignedBlock, Slot, VerifiedBlock,
GENESIS_ROUND,
},
block_manager::BlockManager,
commit::{
CertifiedCommit, CertifiedCommits, CommitAPI, CommittedSubDag, DecidedLeader, Decision,
},
commit_observer::CommitObserver,
context::Context,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
round_tracker::PeerRoundTracker,
stake_aggregator::{QuorumThreshold, StakeAggregator},
transaction::TransactionConsumer,
transaction_certifier::TransactionCertifier,
universal_committer::{
universal_committer_builder::UniversalCommitterBuilder, UniversalCommitter,
},
};
#[cfg(test)]
use crate::{
block::CertifiedBlocksOutput, block_verifier::NoopBlockVerifier, storage::mem_store::MemStore,
CommitConsumer, TransactionClient,
};
const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
pub(crate) struct Core {
context: Arc<Context>,
transaction_consumer: TransactionConsumer,
transaction_certifier: TransactionCertifier,
block_manager: BlockManager,
subscriber_exists: bool,
propagation_delay: Round,
committer: UniversalCommitter,
last_signaled_round: Round,
last_included_ancestors: Vec<Option<BlockRef>>,
last_decided_leader: Slot,
leader_schedule: Arc<LeaderSchedule>,
commit_observer: CommitObserver,
signals: CoreSignals,
block_signer: ProtocolKeyPair,
dag_state: Arc<RwLock<DagState>>,
last_known_proposed_round: Option<Round>,
ancestor_state_manager: AncestorStateManager,
round_tracker: Arc<RwLock<PeerRoundTracker>>,
}
impl Core {
pub(crate) fn new(
context: Arc<Context>,
leader_schedule: Arc<LeaderSchedule>,
transaction_consumer: TransactionConsumer,
transaction_certifier: TransactionCertifier,
block_manager: BlockManager,
subscriber_exists: bool,
commit_observer: CommitObserver,
signals: CoreSignals,
block_signer: ProtocolKeyPair,
dag_state: Arc<RwLock<DagState>>,
sync_last_known_own_block: bool,
round_tracker: Arc<RwLock<PeerRoundTracker>>,
) -> Self {
let last_decided_leader = dag_state.read().last_commit_leader();
let number_of_leaders = context
.protocol_config
.mysticeti_num_leaders_per_round()
.unwrap_or(1);
let committer = UniversalCommitterBuilder::new(
context.clone(),
leader_schedule.clone(),
dag_state.clone(),
)
.with_number_of_leaders(number_of_leaders)
.with_pipeline(true)
.build();
let last_proposed_block = dag_state.read().get_last_proposed_block();
let last_signaled_round = last_proposed_block.round();
let mut last_included_ancestors = vec![None; context.committee.size()];
for ancestor in last_proposed_block.ancestors() {
last_included_ancestors[ancestor.author] = Some(*ancestor);
}
let min_propose_round = if sync_last_known_own_block {
None
} else {
Some(0)
};
let propagation_scores = leader_schedule
.leader_swap_table
.read()
.reputation_scores
.clone();
let mut ancestor_state_manager =
AncestorStateManager::new(context.clone(), dag_state.clone());
ancestor_state_manager.set_propagation_scores(propagation_scores);
Self {
context,
last_signaled_round,
last_included_ancestors,
last_decided_leader,
leader_schedule,
transaction_consumer,
transaction_certifier,
block_manager,
subscriber_exists,
propagation_delay: 0,
committer,
commit_observer,
signals,
block_signer,
dag_state,
last_known_proposed_round: min_propose_round,
ancestor_state_manager,
round_tracker,
}
.recover()
}
fn recover(mut self) -> Self {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["Core::recover"])
.start_timer();
self.try_commit(vec![]).unwrap();
let last_proposed_block = if let Some(last_proposed_block) = self.try_propose(true).unwrap()
{
last_proposed_block
} else {
let last_proposed_block = self.dag_state.read().get_last_proposed_block();
if self.should_propose() {
assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher than genesis should have been produced during recovery");
}
self.signals
.new_block(ExtendedBlock {
block: last_proposed_block.clone(),
excluded_ancestors: vec![],
})
.unwrap();
last_proposed_block
};
self.try_signal_new_round();
info!(
"Core recovery completed with last proposed block {:?}",
last_proposed_block
);
self
}
#[tracing::instrument(skip_all)]
pub(crate) fn add_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> ConsensusResult<BTreeSet<BlockRef>> {
let _scope = monitored_scope("Core::add_blocks");
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["Core::add_blocks"])
.start_timer();
self.context
.metrics
.node_metrics
.core_add_blocks_batch_size
.observe(blocks.len() as f64);
let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);
if !accepted_blocks.is_empty() {
trace!(
"Accepted blocks: {}",
accepted_blocks
.iter()
.map(|b| b.reference().to_string())
.join(",")
);
self.try_commit(vec![])?;
self.try_propose(false)?;
self.try_signal_new_round();
};
if !missing_block_refs.is_empty() {
trace!(
"Missing block refs: {}",
missing_block_refs.iter().map(|b| b.to_string()).join(", ")
);
}
Ok(missing_block_refs)
}
#[tracing::instrument(skip_all)]
pub(crate) fn add_certified_commits(
&mut self,
certified_commits: CertifiedCommits,
) -> ConsensusResult<BTreeSet<BlockRef>> {
let _scope = monitored_scope("Core::add_certified_commits");
let votes = certified_commits.votes().to_vec();
let commits = self
.filter_new_commits(certified_commits.commits().to_vec())
.expect("Certified commits validation failed");
let (_, missing_block_refs) = self.block_manager.try_accept_blocks(votes);
self.try_commit(commits)?;
self.try_propose(false)?;
self.try_signal_new_round();
Ok(missing_block_refs)
}
pub(crate) fn check_block_refs(
&mut self,
block_refs: Vec<BlockRef>,
) -> ConsensusResult<BTreeSet<BlockRef>> {
let _scope = monitored_scope("Core::check_block_refs");
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["Core::check_block_refs"])
.start_timer();
self.context
.metrics
.node_metrics
.core_check_block_refs_batch_size
.observe(block_refs.len() as f64);
let missing_block_refs = self.block_manager.try_find_blocks(block_refs);
if !missing_block_refs.is_empty() {
trace!(
"Missing block refs: {}",
missing_block_refs.iter().map(|b| b.to_string()).join(", ")
);
}
Ok(missing_block_refs)
}
fn try_signal_new_round(&mut self) {
let new_clock_round = self.dag_state.read().threshold_clock_round();
if new_clock_round <= self.last_signaled_round {
return;
}
self.signals.new_round(new_clock_round);
self.last_signaled_round = new_clock_round;
self.context
.metrics
.node_metrics
.threshold_clock_round
.set(new_clock_round as i64);
}
pub(crate) fn new_block(
&mut self,
round: Round,
force: bool,
) -> ConsensusResult<Option<VerifiedBlock>> {
let _scope = monitored_scope("Core::new_block");
if self.last_proposed_round() < round {
self.context
.metrics
.node_metrics
.leader_timeout_total
.with_label_values(&[&format!("{force}")])
.inc();
let result = self.try_propose(force);
self.try_signal_new_round();
return result;
}
Ok(None)
}
fn filter_new_commits(
&mut self,
commits: Vec<CertifiedCommit>,
) -> ConsensusResult<Vec<CertifiedCommit>> {
let last_commit_index = self.dag_state.read().last_commit_index();
let commits = commits
.iter()
.filter(|commit| {
if commit.index() > last_commit_index {
true
} else {
tracing::debug!(
"Skip commit for index {} as it is already committed with last commit index {}",
commit.index(),
last_commit_index
);
false
}
})
.cloned()
.collect::<Vec<_>>();
if let Some(commit) = commits.first() {
if commit.index() != last_commit_index + 1 {
return Err(ConsensusError::UnexpectedCertifiedCommitIndex {
expected_commit_index: last_commit_index + 1,
commit_index: commit.index(),
});
}
}
Ok(commits)
}
fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
if !self.should_propose() {
return Ok(None);
}
if let Some(extended_block) = self.try_new_block(force) {
self.signals.new_block(extended_block.clone())?;
fail_point!("consensus-after-propose");
self.try_commit(vec![])?;
return Ok(Some(extended_block.block));
}
Ok(None)
}
fn try_new_block(&mut self, force: bool) -> Option<ExtendedBlock> {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["Core::try_new_block"])
.start_timer();
let clock_round = {
let dag_state = self.dag_state.read();
let clock_round = dag_state.threshold_clock_round();
if clock_round <= dag_state.get_last_proposed_block().round() {
return None;
}
clock_round
};
let quorum_round = clock_round.saturating_sub(1);
if !force {
if !self.leaders_exist(quorum_round) {
return None;
}
if Duration::from_millis(
self.context
.clock
.timestamp_utc_ms()
.saturating_sub(self.last_proposed_timestamp_ms()),
) < self.context.parameters.min_round_delay
{
return None;
}
}
let (ancestors, excluded_and_equivocating_ancestors) =
self.smart_ancestors_to_propose(clock_round, !force);
if ancestors.is_empty() {
assert!(
!force,
"Ancestors should have been returned if force is true!"
);
return None;
}
let excluded_ancestors_limit = self.context.committee.size() * 2;
if excluded_and_equivocating_ancestors.len() > excluded_ancestors_limit {
debug!(
"Dropping {} excluded ancestor(s) during proposal due to size limit",
excluded_and_equivocating_ancestors.len() - excluded_ancestors_limit,
);
}
let excluded_ancestors = excluded_and_equivocating_ancestors
.into_iter()
.take(excluded_ancestors_limit)
.collect();
for ancestor in &ancestors {
self.last_included_ancestors[ancestor.author()] = Some(ancestor.reference());
}
let leader_authority = &self
.context
.committee
.authority(self.first_leader(quorum_round))
.hostname;
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_ms
.with_label_values(&[leader_authority])
.inc_by(
Instant::now()
.saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
.as_millis() as u64,
);
self.context
.metrics
.node_metrics
.block_proposal_leader_wait_count
.with_label_values(&[leader_authority])
.inc();
self.context
.metrics
.node_metrics
.proposed_block_ancestors
.observe(ancestors.len() as f64);
for ancestor in &ancestors {
let authority = &self.context.committee.authority(ancestor.author()).hostname;
self.context
.metrics
.node_metrics
.proposed_block_ancestors_depth
.with_label_values(&[authority])
.observe(clock_round.saturating_sub(ancestor.round()).into());
}
let now = self.context.clock.timestamp_utc_ms();
ancestors.iter().for_each(|block| {
if block.timestamp_ms() > now {
trace!("Ancestor block {:?} has timestamp {}, greater than current timestamp {now}. Proposing for round {}.", block, block.timestamp_ms(), clock_round);
let authority = &self.context.committee.authority(block.author()).hostname;
self.context
.metrics
.node_metrics
.proposed_block_ancestors_timestamp_drift_ms
.with_label_values(&[authority])
.inc_by(block.timestamp_ms().saturating_sub(now));
}
});
let (transactions, ack_transactions, _limit_reached) = self.transaction_consumer.next();
self.context
.metrics
.node_metrics
.proposed_block_transactions
.observe(transactions.len() as f64);
let commit_votes = self
.dag_state
.write()
.take_commit_votes(MAX_COMMIT_VOTES_PER_BLOCK);
let transaction_votes = if self.context.protocol_config.mysticeti_fastpath() {
let hard_linked_ancestors = {
let mut dag_state = self.dag_state.write();
ancestors
.iter()
.flat_map(|ancestor| dag_state.link_causal_history(ancestor.reference()))
.collect()
};
self.transaction_certifier
.get_own_votes(hard_linked_ancestors)
} else {
vec![]
};
let block = if self.context.protocol_config.mysticeti_fastpath() {
Block::V2(BlockV2::new(
self.context.committee.epoch(),
clock_round,
self.context.own_index,
now,
ancestors.iter().map(|b| b.reference()).collect(),
transactions,
commit_votes,
transaction_votes,
vec![],
))
} else {
Block::V1(BlockV1::new(
self.context.committee.epoch(),
clock_round,
self.context.own_index,
now,
ancestors.iter().map(|b| b.reference()).collect(),
transactions,
commit_votes,
vec![],
))
};
let signed_block =
SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
let serialized = signed_block
.serialize()
.expect("Block serialization failed.");
self.context
.metrics
.node_metrics
.proposed_block_size
.observe(serialized.len() as f64);
let verified_block = VerifiedBlock::new_verified(signed_block, serialized);
let last_proposed_block = self.last_proposed_block();
if last_proposed_block.round() > 0 {
self.context
.metrics
.node_metrics
.block_proposal_interval
.observe(
Duration::from_millis(
verified_block
.timestamp_ms()
.saturating_sub(last_proposed_block.timestamp_ms()),
)
.as_secs_f64(),
);
}
let (accepted_blocks, missing) = self
.block_manager
.try_accept_blocks(vec![verified_block.clone()]);
assert_eq!(accepted_blocks.len(), 1);
assert!(missing.is_empty());
if self.context.protocol_config.mysticeti_fastpath() {
self.transaction_certifier
.add_voted_blocks(vec![(verified_block.clone(), vec![])]);
self.dag_state
.write()
.link_causal_history(verified_block.reference());
}
self.dag_state.write().flush();
ack_transactions(verified_block.reference());
info!("Created block {verified_block:?} for round {clock_round}");
self.context
.metrics
.node_metrics
.proposed_blocks
.with_label_values(&[&force.to_string()])
.inc();
let extended_block = ExtendedBlock {
block: verified_block,
excluded_ancestors,
};
self.round_tracker
.write()
.update_from_accepted_block(&extended_block);
Some(extended_block)
}
fn try_commit(
&mut self,
mut certified_commits: Vec<CertifiedCommit>,
) -> ConsensusResult<Vec<CommittedSubDag>> {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["Core::try_commit"])
.start_timer();
let mut certified_commits_map = BTreeMap::new();
for c in &certified_commits {
certified_commits_map.insert(c.index(), c.reference());
}
if !certified_commits.is_empty() {
info!(
"Processing synced commits: {:?}",
certified_commits
.iter()
.map(|c| (c.index(), c.leader()))
.collect::<Vec<_>>()
);
}
let mut committed_sub_dags = Vec::new();
loop {
let mut commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());
if commits_until_update == 0 {
let last_commit_index = self.dag_state.read().last_commit_index();
tracing::info!(
"Leader schedule change triggered at commit index {last_commit_index}"
);
self.leader_schedule
.update_leader_schedule_v2(&self.dag_state);
let propagation_scores = self
.leader_schedule
.leader_swap_table
.read()
.reputation_scores
.clone();
self.ancestor_state_manager
.set_propagation_scores(propagation_scores);
commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());
fail_point!("consensus-after-leader-schedule-change");
}
assert!(commits_until_update > 0);
let (certified_leaders, decided_certified_commits): (
Vec<DecidedLeader>,
Vec<CertifiedCommit>,
) = self
.try_select_certified_leaders(&mut certified_commits, commits_until_update)
.into_iter()
.unzip();
let blocks = decided_certified_commits
.iter()
.flat_map(|c| c.blocks())
.cloned()
.collect::<Vec<_>>();
self.block_manager.try_accept_committed_blocks(blocks);
let (decided_leaders, local) = if certified_leaders.is_empty() {
let mut decided_leaders = self.committer.try_decide(self.last_decided_leader);
if decided_leaders.len() >= commits_until_update {
let _ = decided_leaders.split_off(commits_until_update);
}
(decided_leaders, true)
} else {
(certified_leaders, false)
};
let Some(last_decided) = decided_leaders.last().cloned() else {
break;
};
self.last_decided_leader = last_decided.slot();
self.context
.metrics
.node_metrics
.last_decided_leader_round
.set(self.last_decided_leader.round as i64);
let sequenced_leaders = decided_leaders
.into_iter()
.filter_map(|leader| leader.into_committed_block())
.collect::<Vec<_>>();
if sequenced_leaders.is_empty() {
break;
}
tracing::info!(
"Committing {} leaders: {}; {} commits before next leader schedule change",
sequenced_leaders.len(),
sequenced_leaders
.iter()
.map(|b| b.reference().to_string())
.join(","),
commits_until_update,
);
let subdags = self
.commit_observer
.handle_commit(sequenced_leaders, local)?;
self.block_manager
.try_unsuspend_blocks_for_latest_gc_round();
committed_sub_dags.extend(subdags);
fail_point!("consensus-after-handle-commit");
}
for sub_dag in &committed_sub_dags {
if let Some(commit_ref) = certified_commits_map.remove(&sub_dag.commit_ref.index) {
assert_eq!(
commit_ref, sub_dag.commit_ref,
"Certified commit has different reference than the committed sub dag"
);
}
}
let committed_block_refs = committed_sub_dags
.iter()
.flat_map(|sub_dag| sub_dag.blocks.iter())
.filter_map(|block| {
(block.author() == self.context.own_index).then_some(block.reference())
})
.collect::<Vec<_>>();
self.transaction_consumer
.notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());
Ok(committed_sub_dags)
}
pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
let _scope = monitored_scope("Core::get_missing_blocks");
self.block_manager.missing_blocks()
}
pub(crate) fn set_subscriber_exists(&mut self, exists: bool) {
info!("Block subscriber exists: {exists}");
self.subscriber_exists = exists;
}
pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
info!("Propagation round delay set to: {delay}");
self.propagation_delay = delay;
}
pub(crate) fn set_last_known_proposed_round(&mut self, round: Round) {
if self.last_known_proposed_round.is_some() {
panic!("Should not attempt to set the last known proposed round if that has been already set");
}
self.last_known_proposed_round = Some(round);
info!("Last known proposed round set to {round}");
}
pub(crate) fn should_propose(&self) -> bool {
let clock_round = self.dag_state.read().threshold_clock_round();
let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;
if !self.subscriber_exists {
debug!("Skip proposing for round {clock_round}, no subscriber exists.");
core_skipped_proposals
.with_label_values(&["no_subscriber"])
.inc();
return false;
}
if self.propagation_delay
> self
.context
.parameters
.propagation_delay_stop_proposal_threshold
{
debug!(
"Skip proposing for round {clock_round}, high propagation delay {} > {}.",
self.propagation_delay,
self.context
.parameters
.propagation_delay_stop_proposal_threshold
);
core_skipped_proposals
.with_label_values(&["high_propagation_delay"])
.inc();
return false;
}
let Some(last_known_proposed_round) = self.last_known_proposed_round else {
debug!("Skip proposing for round {clock_round}, last known proposed round has not been synced yet.");
core_skipped_proposals
.with_label_values(&["no_last_known_proposed_round"])
.inc();
return false;
};
if clock_round <= last_known_proposed_round {
debug!("Skip proposing for round {clock_round} as last known proposed round is {last_known_proposed_round}");
core_skipped_proposals
.with_label_values(&["higher_last_known_proposed_round"])
.inc();
return false;
}
true
}
#[tracing::instrument(skip_all)]
fn try_select_certified_leaders(
&mut self,
certified_commits: &mut Vec<CertifiedCommit>,
limit: usize,
) -> Vec<(DecidedLeader, CertifiedCommit)> {
assert!(limit > 0, "limit should be greater than 0");
if certified_commits.is_empty() {
return vec![];
}
let to_commit = if certified_commits.len() >= limit {
certified_commits.drain(..limit).collect::<Vec<_>>()
} else {
std::mem::take(certified_commits)
};
tracing::debug!(
"Selected {} certified leaders: {}",
to_commit.len(),
to_commit.iter().map(|c| c.leader().to_string()).join(",")
);
let sequenced_leaders = to_commit
.into_iter()
.map(|commit| {
let leader = commit.blocks().last().expect("Certified commit should have at least one block");
assert_eq!(leader.reference(), commit.leader(), "Last block of the committed sub dag should have the same digest as the leader of the commit");
let leader = DecidedLeader::Commit(leader.clone(), false);
UniversalCommitter::update_metrics(&self.context, &leader, Decision::Certified);
(leader, commit)
})
.collect::<Vec<_>>();
sequenced_leaders
}
fn smart_ancestors_to_propose(
&mut self,
clock_round: Round,
smart_select: bool,
) -> (Vec<VerifiedBlock>, BTreeSet<BlockRef>) {
let node_metrics = &self.context.metrics.node_metrics;
let _s = node_metrics
.scope_processing_time
.with_label_values(&["Core::smart_ancestors_to_propose"])
.start_timer();
let all_ancestors = self
.dag_state
.read()
.get_last_cached_block_per_authority(clock_round);
assert_eq!(
all_ancestors.len(),
self.context.committee.size(),
"Fatal error, number of returned ancestors don't match committee size."
);
let accepted_quorum_rounds = self.round_tracker.read().compute_accepted_quorum_rounds();
self.ancestor_state_manager
.update_all_ancestors_state(&accepted_quorum_rounds);
let ancestor_state_map = self.ancestor_state_manager.get_ancestor_states();
let quorum_round = clock_round.saturating_sub(1);
let mut score_and_pending_excluded_ancestors = Vec::new();
let mut excluded_and_equivocating_ancestors = BTreeSet::new();
let included_ancestors = iter::once(self.last_proposed_block().clone())
.chain(
all_ancestors
.into_iter()
.flat_map(|(ancestor, equivocating_ancestors)| {
if ancestor.author() == self.context.own_index {
return None;
}
if let Some(last_block_ref) =
self.last_included_ancestors[ancestor.author()]
{
if last_block_ref.round >= ancestor.round() {
return None;
}
}
excluded_and_equivocating_ancestors.extend(equivocating_ancestors);
let ancestor_state = ancestor_state_map[ancestor.author()];
match ancestor_state {
AncestorState::Include => {
trace!("Found ancestor {ancestor} with INCLUDE state for round {clock_round}");
}
AncestorState::Exclude(score) => {
trace!("Added ancestor {ancestor} with EXCLUDE state with score {score} to temporary excluded ancestors for round {clock_round}");
score_and_pending_excluded_ancestors.push((score, ancestor));
return None;
}
}
Some(ancestor)
}),
)
.collect::<Vec<_>>();
let mut parent_round_quorum = StakeAggregator::<QuorumThreshold>::new();
for ancestor in included_ancestors
.iter()
.filter(|a| a.round() == quorum_round)
{
parent_round_quorum.add(ancestor.author(), &self.context.committee);
}
if smart_select && !parent_round_quorum.reached_threshold(&self.context.committee) {
node_metrics.smart_selection_wait.inc();
debug!("Only found {} stake of good ancestors to include for round {clock_round}, will wait for more.", parent_round_quorum.stake());
return (vec![], BTreeSet::new());
}
score_and_pending_excluded_ancestors.sort_by(|a, b| b.0.cmp(&a.0));
let mut ancestors_to_propose = included_ancestors;
let mut excluded_ancestors = Vec::new();
for (score, ancestor) in score_and_pending_excluded_ancestors.into_iter() {
let block_hostname = &self.context.committee.authority(ancestor.author()).hostname;
if !parent_round_quorum.reached_threshold(&self.context.committee)
&& ancestor.round() == quorum_round
{
debug!("Including temporarily excluded parent round ancestor {ancestor} with score {score} to propose for round {clock_round}");
parent_round_quorum.add(ancestor.author(), &self.context.committee);
ancestors_to_propose.push(ancestor);
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "timeout"])
.inc();
} else {
excluded_ancestors.push((score, ancestor));
}
}
for (score, ancestor) in excluded_ancestors.iter() {
let excluded_author = ancestor.author();
let block_hostname = &self.context.committee.authority(excluded_author).hostname;
let mut accepted_low_quorum_round = accepted_quorum_rounds[excluded_author].0;
accepted_low_quorum_round = accepted_low_quorum_round.min(quorum_round);
let last_included_round = self.last_included_ancestors[excluded_author]
.map(|block_ref| block_ref.round)
.unwrap_or(GENESIS_ROUND);
if ancestor.round() <= last_included_round {
continue;
}
if last_included_round >= accepted_low_quorum_round {
excluded_and_equivocating_ancestors.insert(ancestor.reference());
trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: last included round {last_included_round} >= accepted low quorum round {accepted_low_quorum_round}", ancestor.reference());
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
continue;
}
let ancestor = if ancestor.round() <= accepted_low_quorum_round {
ancestor.clone()
} else {
excluded_and_equivocating_ancestors.insert(ancestor.reference());
trace!("Excluded low score ancestor {} with score {score} to propose for round {clock_round}: ancestor round {} > accepted low quorum round {accepted_low_quorum_round} ", ancestor.reference(), ancestor.round());
node_metrics
.excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname])
.inc();
match self.dag_state.read().get_last_cached_block_in_range(
excluded_author,
last_included_round + 1,
accepted_low_quorum_round + 1,
) {
Some(earlier_ancestor) => {
earlier_ancestor
}
None => {
continue;
}
}
};
self.last_included_ancestors[excluded_author] = Some(ancestor.reference());
ancestors_to_propose.push(ancestor.clone());
trace!("Included low scoring ancestor {} with score {score} seen at accepted low quorum round {accepted_low_quorum_round} to propose for round {clock_round}", ancestor.reference());
node_metrics
.included_excluded_proposal_ancestors_count_by_authority
.with_label_values(&[block_hostname, "quorum"])
.inc();
}
assert!(parent_round_quorum.reached_threshold(&self.context.committee), "Fatal error, quorum not reached for parent round when proposing for round {clock_round}. Possible mismatch between DagState and Core.");
debug!(
"Included {} ancestors & excluded {} low performing or equivocating ancestors for proposal in round {clock_round}",
ancestors_to_propose.len(),
excluded_and_equivocating_ancestors.len()
);
(ancestors_to_propose, excluded_and_equivocating_ancestors)
}
fn leaders_exist(&self, round: Round) -> bool {
let dag_state = self.dag_state.read();
for leader in self.leaders(round) {
if !dag_state.contains_cached_block_at_slot(leader) {
return false;
}
}
true
}
fn leaders(&self, round: Round) -> Vec<Slot> {
self.committer
.get_leaders(round)
.into_iter()
.map(|authority_index| Slot::new(round, authority_index))
.collect()
}
fn first_leader(&self, round: Round) -> AuthorityIndex {
self.leaders(round).first().unwrap().authority
}
fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
self.last_proposed_block().timestamp_ms()
}
fn last_proposed_round(&self) -> Round {
self.last_proposed_block().round()
}
fn last_proposed_block(&self) -> VerifiedBlock {
self.dag_state.read().get_last_proposed_block()
}
}
pub(crate) struct CoreSignals {
tx_block_broadcast: broadcast::Sender<ExtendedBlock>,
new_round_sender: watch::Sender<Round>,
context: Arc<Context>,
}
impl CoreSignals {
pub fn new(context: Arc<Context>) -> (Self, CoreSignalsReceivers) {
let (tx_block_broadcast, rx_block_broadcast) = broadcast::channel::<ExtendedBlock>(
context.parameters.dag_state_cached_rounds as usize,
);
let (new_round_sender, new_round_receiver) = watch::channel(0);
let me = Self {
tx_block_broadcast,
new_round_sender,
context,
};
let receivers = CoreSignalsReceivers {
rx_block_broadcast,
new_round_receiver,
};
(me, receivers)
}
pub(crate) fn new_block(&self, extended_block: ExtendedBlock) -> ConsensusResult<()> {
if self.context.committee.size() > 1 {
if extended_block.block.round() == GENESIS_ROUND {
debug!("Ignoring broadcasting genesis block to peers");
return Ok(());
}
if let Err(err) = self.tx_block_broadcast.send(extended_block) {
warn!("Couldn't broadcast the block to any receiver: {err}");
return Err(ConsensusError::Shutdown);
}
} else {
debug!(
"Did not broadcast block {extended_block:?} to receivers as committee size is <= 1"
);
}
Ok(())
}
pub(crate) fn new_round(&mut self, round_number: Round) {
let _ = self.new_round_sender.send_replace(round_number);
}
}
pub(crate) struct CoreSignalsReceivers {
rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
new_round_receiver: watch::Receiver<Round>,
}
impl CoreSignalsReceivers {
pub(crate) fn block_broadcast_receiver(&self) -> broadcast::Receiver<ExtendedBlock> {
self.rx_block_broadcast.resubscribe()
}
pub(crate) fn new_round_receiver(&self) -> watch::Receiver<Round> {
self.new_round_receiver.clone()
}
}
#[cfg(test)]
pub(crate) async fn create_cores(
context: Context,
authorities: Vec<Stake>,
) -> Vec<CoreTextFixture> {
let mut cores = Vec::new();
for index in 0..authorities.len() {
let own_index = AuthorityIndex::new_for_test(index as u32);
let core =
CoreTextFixture::new(context.clone(), authorities.clone(), own_index, false).await;
cores.push(core);
}
cores
}
#[cfg(test)]
pub(crate) struct CoreTextFixture {
pub(crate) core: Core,
pub(crate) transaction_certifier: TransactionCertifier,
pub(crate) signal_receivers: CoreSignalsReceivers,
pub(crate) block_receiver: broadcast::Receiver<ExtendedBlock>,
pub(crate) _commit_output_receiver: UnboundedReceiver<CommittedSubDag>,
pub(crate) _blocks_output_receiver: UnboundedReceiver<CertifiedBlocksOutput>,
pub(crate) dag_state: Arc<RwLock<DagState>>,
pub(crate) store: Arc<MemStore>,
}
#[cfg(test)]
impl CoreTextFixture {
async fn new(
context: Context,
authorities: Vec<Stake>,
own_index: AuthorityIndex,
sync_last_known_own_block: bool,
) -> Self {
let (committee, mut signers) = local_committee_and_keys(0, authorities.clone());
let mut context = context.clone();
context = context
.with_committee(committee)
.with_authority_index(own_index);
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (blocks_sender, _blocks_receiver) =
mysten_metrics::monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
transaction_certifier.recover(&NoopBlockVerifier, 0);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, commit_output_receiver, blocks_output_receiver) =
CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let block_signer = signers.remove(own_index.value()).1;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let core = Core::new(
context,
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
block_signer,
dag_state.clone(),
sync_last_known_own_block,
round_tracker,
);
Self {
core,
transaction_certifier,
signal_receivers,
block_receiver,
_commit_output_receiver: commit_output_receiver,
_blocks_output_receiver: blocks_output_receiver,
dag_state,
store,
}
}
pub(crate) fn add_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
) -> ConsensusResult<BTreeSet<BlockRef>> {
self.transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
self.core.add_blocks(blocks)
}
}
#[cfg(test)]
mod test {
use std::{collections::BTreeSet, time::Duration};
use consensus_config::{AuthorityIndex, Parameters};
use consensus_types::block::TransactionIndex;
use futures::{stream::FuturesUnordered, StreamExt};
use mysten_metrics::monitored_mpsc;
use sui_protocol_config::ProtocolConfig;
use tokio::time::sleep;
use super::*;
use crate::{
block::{genesis_blocks, TestBlock},
block_verifier::NoopBlockVerifier,
commit::CommitAPI,
leader_scoring::ReputationScores,
storage::{mem_store::MemStore, Store, WriteBatch},
test_dag_builder::DagBuilder,
test_dag_parser::parse_dag,
transaction::{BlockStatus, TransactionClient},
CommitConsumer, CommitIndex,
};
#[tokio::test]
async fn test_core_recover_from_store_for_full_round() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();
let mut last_round_blocks = genesis_blocks(&context);
let mut all_blocks: Vec<VerifiedBlock> = last_round_blocks.clone();
for round in 1..=4 {
let mut this_round_blocks = Vec::new();
for (index, _authority) in context.committee.authorities() {
let block = VerifiedBlock::new_for_test(
TestBlock::new(round, index.value() as u32)
.set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
.build(),
);
if round == 1 && index == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}
this_round_blocks.push(block);
}
all_blocks.extend(this_round_blocks.clone());
last_round_blocks = this_round_blocks;
}
store
.write(WriteBatch::default().blocks(all_blocks))
.expect("Storage error");
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
transaction_certifier.recover(&NoopBlockVerifier, 0);
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker,
);
let mut new_round = signal_receivers.new_round_receiver();
assert_eq!(*new_round.borrow_and_update(), 5);
let proposed_block = block_receiver
.recv()
.await
.expect("A block should have been created");
assert_eq!(proposed_block.block.round(), 5);
let ancestors = proposed_block.block.ancestors();
assert_eq!(ancestors.len(), 4);
for ancestor in ancestors {
assert_eq!(ancestor.round, 4);
}
dag_state.write().flush();
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 2);
assert_eq!(dag_state.read().last_commit_index(), 2);
let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
assert_eq!(all_stored_commits.len(), 2);
while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();
assert!(matches!(status, BlockStatus::Sequenced(_)));
}
}
#[tokio::test]
async fn test_core_recover_from_store_for_partial_round() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut last_round_blocks = genesis_blocks(&context);
let mut all_blocks = last_round_blocks.clone();
for round in 1..=4 {
let mut this_round_blocks = Vec::new();
let authorities_to_skip = if round == 4 {
context.committee.validity_threshold() as usize
} else {
1
};
for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) {
let block = TestBlock::new(round, index.value() as u32)
.set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
.build();
this_round_blocks.push(VerifiedBlock::new_for_test(block));
}
all_blocks.extend(this_round_blocks.clone());
last_round_blocks = this_round_blocks;
}
store
.write(WriteBatch::default().blocks(all_blocks))
.expect("Storage error");
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
transaction_certifier.recover(&NoopBlockVerifier, 0);
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker,
);
let mut new_round = signal_receivers.new_round_receiver();
assert_eq!(*new_round.borrow_and_update(), 5);
let proposed_block = block_receiver
.recv()
.await
.expect("A block should have been created");
assert_eq!(proposed_block.block.round(), 4);
let ancestors = proposed_block.block.ancestors();
assert_eq!(ancestors.len(), 4);
for ancestor in ancestors {
if ancestor.author == context.own_index {
assert_eq!(ancestor.round, 0);
} else {
assert_eq!(ancestor.round, 3);
}
}
core.try_commit(vec![]).ok();
core.dag_state.write().flush();
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 2);
assert_eq!(dag_state.read().last_commit_index(), 2);
let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
assert_eq!(all_stored_commits.len(), 2);
}
#[tokio::test]
async fn test_core_propose_after_genesis() {
telemetry_subscribers::init_for_testing();
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(2_000);
config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
config
});
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let (transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker,
);
let mut total = 0;
let mut index = 0;
loop {
let transaction =
bcs::to_bytes(&format!("Transaction {index}")).expect("Shouldn't fail");
total += transaction.len();
index += 1;
let _w = transaction_client
.submit_no_wait(vec![transaction])
.await
.unwrap();
if total >= 1_000 {
break;
}
}
let extended_block = block_receiver
.recv()
.await
.expect("A new block should have been created");
assert_eq!(extended_block.block.round(), 1);
assert_eq!(extended_block.block.author().value(), 0);
assert_eq!(extended_block.block.ancestors().len(), 4);
let mut total = 0;
for (i, transaction) in extended_block.block.transactions().iter().enumerate() {
total += transaction.data().len() as u64;
let transaction: String = bcs::from_bytes(transaction.data()).unwrap();
assert_eq!(format!("Transaction {i}"), transaction);
}
assert!(total <= context.protocol_config.max_transactions_in_block_bytes());
let all_genesis = genesis_blocks(&context);
for ancestor in extended_block.block.ancestors() {
all_genesis
.iter()
.find(|block| block.reference() == *ancestor)
.expect("Block should be found amongst genesis blocks");
}
assert!(core.try_propose(false).unwrap().is_none());
assert!(core.try_propose(true).unwrap().is_none());
dag_state.write().flush();
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
}
#[tokio::test]
async fn test_core_propose_once_receiving_a_quorum() {
telemetry_subscribers::init_for_testing();
let (context, _key_pairs) = Context::new_for_test(4);
let mut core_fixture = CoreTextFixture::new(
context.clone(),
vec![1, 1, 1, 1],
AuthorityIndex::new_for_test(0),
false,
)
.await;
let transaction_certifier = &core_fixture.transaction_certifier;
let store = &core_fixture.store;
let dag_state = &core_fixture.dag_state;
let core = &mut core_fixture.core;
let mut expected_ancestors = BTreeSet::new();
let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
expected_ancestors.insert(block_1.reference());
sleep(context.parameters.min_round_delay).await;
transaction_certifier.add_voted_blocks(vec![(block_1.clone(), vec![])]);
_ = core.add_blocks(vec![block_1]);
assert_eq!(core.last_proposed_round(), 1);
expected_ancestors.insert(core.last_proposed_block().reference());
assert!(core.try_propose(false).unwrap().is_none());
let block_2 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
expected_ancestors.insert(block_2.reference());
sleep(context.parameters.min_round_delay).await;
transaction_certifier.add_voted_blocks(vec![(block_2.clone(), vec![1, 4])]);
_ = core.add_blocks(vec![block_2.clone()]);
assert_eq!(core.last_proposed_round(), 2);
let proposed_block = core.last_proposed_block();
assert_eq!(proposed_block.round(), 2);
assert_eq!(proposed_block.author(), context.own_index);
assert_eq!(proposed_block.ancestors().len(), 3);
let ancestors = proposed_block.ancestors();
let ancestors = ancestors.iter().cloned().collect::<BTreeSet<_>>();
assert_eq!(ancestors, expected_ancestors);
let transaction_votes = proposed_block.transaction_votes();
assert_eq!(transaction_votes.len(), 1);
let transaction_vote = transaction_votes.first().unwrap();
assert_eq!(transaction_vote.block_ref, block_2.reference());
assert_eq!(transaction_vote.rejects, vec![1, 4]);
dag_state.write().flush();
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
}
#[tokio::test]
async fn test_commit_and_notify_for_block_status() {
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(4);
const GC_DEPTH: u32 = 2;
context
.protocol_config
.set_consensus_gc_depth_for_testing(GC_DEPTH);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();
let dag_str = "DAG {
Round 0 : { 4 },
Round 1 : { * },
Round 2 : { * },
Round 3 : {
A -> [*],
B -> [-A2],
C -> [-A2],
D -> [-A2],
},
Round 4 : {
B -> [-A3],
C -> [-A3],
D -> [-A3],
},
Round 5 : {
A -> [A3, B4, C4, D4]
B -> [*],
C -> [*],
D -> [*],
},
Round 6 : { * },
Round 7 : { * },
Round 8 : { * },
}";
let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();
for block in dag_builder.blocks(1..=5) {
if block.author() == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}
}
store
.write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
.expect("Storage error");
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
dag_state.write().flush();
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
transaction_certifier.recover(&NoopBlockVerifier, 0);
let _block_receiver = signal_receivers.block_broadcast_receiver();
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker,
);
dag_state.write().flush();
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 5);
while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();
match status {
BlockStatus::Sequenced(block_ref) => {
assert!(block_ref.round == 1 || block_ref.round == 5);
}
BlockStatus::GarbageCollected(block_ref) => {
assert!(block_ref.round == 2 || block_ref.round == 3);
}
}
}
}
#[tokio::test]
async fn test_multiple_commits_advance_threshold_clock() {
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(4);
const GC_DEPTH: u32 = 2;
context
.protocol_config
.set_consensus_gc_depth_for_testing(GC_DEPTH);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let dag_str = "DAG {
Round 0 : { 4 },
Round 1 : { * },
Round 2 : {
B -> [-D1],
C -> [-D1],
D -> [-D1],
},
Round 3 : {
B -> [*],
C -> [*]
D -> [*],
},
Round 4 : {
A -> [*],
B -> [*],
C -> [*]
D -> [*],
},
Round 5 : {
A -> [*],
B -> [*],
C -> [*],
D -> [*],
},
Round 6 : {
B -> [A5, B5, C5, D1],
C -> [A5, B5, C5, D1],
D -> [A5, B5, C5, D1],
},
Round 7 : {
B -> [*],
C -> [*],
D -> [*],
},
Round 8 : {
B -> [*],
C -> [*],
D -> [*],
},
Round 9 : {
B -> [*],
C -> [*],
D -> [*],
},
Round 10 : {
B -> [*],
C -> [*],
D -> [*],
},
Round 11 : {
B -> [*],
C -> [*],
D -> [*],
},
}";
let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
dag_state.write().flush();
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let _block_receiver = signal_receivers.block_broadcast_receiver();
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
round_tracker,
);
core.set_last_known_proposed_round(4);
let mut all_blocks = dag_builder.blocks(1..=11);
all_blocks.sort_by_key(|b| b.round());
let voted_blocks: Vec<(VerifiedBlock, Vec<TransactionIndex>)> =
all_blocks.iter().map(|b| (b.clone(), vec![])).collect();
transaction_certifier.add_voted_blocks(voted_blocks);
let blocks: Vec<VerifiedBlock> = all_blocks
.into_iter()
.filter(|b| !(b.round() == 1 && b.author() == AuthorityIndex::new_for_test(3)))
.collect();
core.add_blocks(blocks).expect("Should not fail");
assert_eq!(core.last_proposed_round(), 12);
}
#[tokio::test]
async fn test_core_set_min_propose_round() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let _block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
round_tracker,
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
assert!(core.try_propose(true).unwrap().is_none());
let mut builder = DagBuilder::new(context.clone());
builder.layers(1..=10).build();
let blocks = builder.blocks.values().cloned().collect::<Vec<_>>();
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
core.round_tracker.write().update_from_probe(
vec![
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
],
vec![
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
vec![10, 10, 10, 10],
],
);
assert!(core.try_propose(true).unwrap().is_none());
core.set_last_known_proposed_round(10);
let block = core.try_propose(true).expect("No error").unwrap();
assert_eq!(block.round(), 11);
assert_eq!(block.ancestors().len(), 4);
let our_ancestor_included = block.ancestors()[0];
assert_eq!(our_ancestor_included.author, context.own_index);
assert_eq!(our_ancestor_included.round, 10);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_core_try_new_block_leader_timeout() {
telemetry_subscribers::init_for_testing();
async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
let now = context.clock.timestamp_utc_ms();
let max_timestamp = blocks
.iter()
.max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
.map(|block| block.timestamp_ms())
.unwrap_or(0);
let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
sleep(wait_time).await;
}
let (context, _) = Context::new_for_test(4);
let mut all_cores = create_cores(context, vec![1, 1, 1, 1]).await;
let (_last_core, cores) = all_cores.split_last_mut().unwrap();
let mut last_round_blocks = Vec::<VerifiedBlock>::new();
for round in 1..=3 {
let mut this_round_blocks = Vec::new();
for core_fixture in cores.iter_mut() {
wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
assert_eq!(round - 1, r);
if core_fixture.core.last_proposed_round() == r {
core_fixture
.core
.try_propose(true)
.unwrap()
.unwrap_or_else(|| {
panic!("Block should have been proposed for round {}", round)
});
}
}
assert_eq!(core_fixture.core.last_proposed_round(), round);
this_round_blocks.push(core_fixture.core.last_proposed_block());
}
last_round_blocks = this_round_blocks;
}
for core_fixture in cores.iter_mut() {
wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
assert!(core_fixture.core.try_propose(false).unwrap().is_none());
}
for core_fixture in cores.iter_mut() {
assert!(core_fixture.core.new_block(4, true).unwrap().is_some());
assert_eq!(core_fixture.core.last_proposed_round(), 4);
core_fixture.dag_state.write().flush();
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 1);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), 1);
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_core_try_new_block_with_leader_timeout_and_low_scoring_authority() {
telemetry_subscribers::init_for_testing();
async fn wait_blocks(blocks: &[VerifiedBlock], context: &Context) {
let now = context.clock.timestamp_utc_ms();
let max_timestamp = blocks
.iter()
.max_by_key(|block| block.timestamp_ms() as BlockTimestampMs)
.map(|block| block.timestamp_ms())
.unwrap_or(0);
let wait_time = Duration::from_millis(max_timestamp.saturating_sub(now));
sleep(wait_time).await;
}
let (mut context, _) = Context::new_for_test(5);
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let mut all_cores = create_cores(context, vec![1, 1, 1, 1, 1]).await;
let (_last_core, cores) = all_cores.split_last_mut().unwrap();
let mut last_round_blocks = Vec::<VerifiedBlock>::new();
for round in 1..=30 {
let mut this_round_blocks = Vec::new();
for core_fixture in cores.iter_mut() {
wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![0, 0, 0, 0, 0],
],
vec![
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![0, 0, 0, 0, 0],
],
);
if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
assert_eq!(round - 1, r);
if core_fixture.core.last_proposed_round() == r {
core_fixture
.core
.try_propose(true)
.unwrap()
.unwrap_or_else(|| {
panic!("Block should have been proposed for round {}", round)
});
}
}
assert_eq!(core_fixture.core.last_proposed_round(), round);
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
}
last_round_blocks = this_round_blocks;
}
for round in 31..=40 {
let mut this_round_blocks = Vec::new();
for core_fixture in all_cores.iter_mut() {
wait_blocks(&last_round_blocks, &core_fixture.core.context).await;
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![0, 0, 0, 0, 0],
],
vec![
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![round, round, round, round, 0],
vec![0, 0, 0, 0, 0],
],
);
if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
assert_eq!(round - 1, r);
if core_fixture.core.last_proposed_round() == r {
core_fixture
.core
.try_propose(true)
.unwrap()
.unwrap_or_else(|| {
panic!("Block should have been proposed for round {}", round)
});
}
}
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
for block in this_round_blocks.iter() {
if block.author() != AuthorityIndex::new_for_test(4) {
assert_eq!(block.ancestors().len(), 4);
} else {
assert_eq!(block.ancestors().len(), 5);
}
}
}
last_round_blocks = this_round_blocks;
}
}
#[tokio::test]
async fn test_smart_ancestor_selection() {
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(7);
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
round_tracker.clone(),
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
assert!(core.try_propose(true).unwrap().is_none());
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=12)
.authorities(vec![AuthorityIndex::new_for_test(1)])
.skip_block()
.build();
let blocks = builder.blocks(1..=12);
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
core.set_last_known_proposed_round(12);
round_tracker.write().update_from_probe(
vec![
vec![12, 12, 12, 12, 12, 12, 12],
vec![0, 0, 0, 0, 0, 0, 0],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
],
vec![
vec![12, 12, 12, 12, 12, 12, 12],
vec![0, 0, 0, 0, 0, 0, 0],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
vec![12, 12, 12, 12, 12, 12, 12],
],
);
let block = core.try_propose(true).expect("No error").unwrap();
assert_eq!(block.round(), 13);
assert_eq!(block.ancestors().len(), 7);
builder
.layers(13..=14)
.authorities(vec![AuthorityIndex::new_for_test(0)])
.skip_block()
.build();
let blocks = builder.blocks(13..=14);
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
let block = core.try_propose(true).expect("No error").unwrap();
assert_eq!(block.round(), 15);
assert_eq!(block.ancestors().len(), 6);
let round_14_ancestors = builder.last_ancestors.clone();
builder
.layer(15)
.authorities(vec![
AuthorityIndex::new_for_test(0),
AuthorityIndex::new_for_test(5),
AuthorityIndex::new_for_test(6),
])
.skip_block()
.build();
let blocks = builder.blocks(15..=15);
let authority_1_excluded_block_reference = blocks
.iter()
.find(|block| block.author() == AuthorityIndex::new_for_test(1))
.unwrap()
.reference();
sleep(context.parameters.min_round_delay).await;
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
assert_eq!(core.last_proposed_block().round(), 15);
builder
.layer(15)
.authorities(vec![
AuthorityIndex::new_for_test(0),
AuthorityIndex::new_for_test(1),
AuthorityIndex::new_for_test(2),
AuthorityIndex::new_for_test(3),
AuthorityIndex::new_for_test(4),
])
.skip_block()
.override_last_ancestors(round_14_ancestors)
.build();
let blocks = builder.blocks(15..=15);
let round_15_ancestors: Vec<BlockRef> = blocks
.iter()
.filter(|block| block.round() == 15)
.map(|block| block.reference())
.collect();
let included_block_references = iter::once(&core.last_proposed_block())
.chain(blocks.iter())
.filter(|block| block.author() != AuthorityIndex::new_for_test(1))
.map(|block| block.reference())
.collect::<Vec<_>>();
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
assert_eq!(core.last_proposed_block().round(), 16);
let extended_block = loop {
let extended_block =
tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
.await
.unwrap()
.unwrap();
if extended_block.block.round() == 16 {
break extended_block;
}
};
assert_eq!(extended_block.block.round(), 16);
assert_eq!(extended_block.block.author(), core.context.own_index);
assert_eq!(extended_block.block.ancestors().len(), 6);
assert_eq!(extended_block.block.ancestors(), included_block_references);
assert_eq!(extended_block.excluded_ancestors.len(), 1);
assert_eq!(
extended_block.excluded_ancestors[0],
authority_1_excluded_block_reference
);
builder
.layer(16)
.authorities(vec![
AuthorityIndex::new_for_test(0),
AuthorityIndex::new_for_test(5),
AuthorityIndex::new_for_test(6),
])
.skip_block()
.override_last_ancestors(round_15_ancestors)
.build();
let blocks = builder.blocks(16..=16);
sleep(context.parameters.min_round_delay).await;
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
assert_eq!(core.last_proposed_block().round(), 16);
let block = core.try_propose(true).expect("No error").unwrap();
assert_eq!(block.round(), 17);
assert_eq!(block.ancestors().len(), 5);
let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), 17);
assert_eq!(extended_block.block.author(), core.context.own_index);
assert_eq!(extended_block.block.ancestors().len(), 5);
assert_eq!(extended_block.excluded_ancestors.len(), 0);
builder
.layers(17..=22)
.authorities(vec![AuthorityIndex::new_for_test(0)])
.skip_block()
.build();
let blocks = builder.blocks(17..=22);
round_tracker.write().update_from_probe(
vec![
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
],
vec![
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
vec![22, 22, 22, 22, 22, 22, 22],
],
);
let included_block_references = iter::once(&core.last_proposed_block())
.chain(blocks.iter())
.filter(|block| block.round() == 22 || block.author() == core.context.own_index)
.map(|block| block.reference())
.collect::<Vec<_>>();
sleep(context.parameters.min_round_delay).await;
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
assert_eq!(core.last_proposed_block().round(), 23);
let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), 23);
assert_eq!(extended_block.block.author(), core.context.own_index);
assert_eq!(extended_block.block.ancestors().len(), 7);
assert_eq!(extended_block.block.ancestors(), included_block_references);
assert_eq!(extended_block.excluded_ancestors.len(), 0);
}
#[tokio::test]
async fn test_excluded_ancestor_limit() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
round_tracker,
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
let mut builder = DagBuilder::new(context.clone());
builder.layers(1..=3).build();
builder
.layer(4)
.authorities(vec![AuthorityIndex::new_for_test(1)])
.equivocate(9)
.build();
let blocks = builder.blocks(1..=4);
transaction_certifier
.add_voted_blocks(blocks.iter().map(|b| (b.clone(), vec![])).collect());
assert!(core.add_blocks(blocks).unwrap().is_empty());
core.set_last_known_proposed_round(3);
let block = core.try_propose(true).expect("No error").unwrap();
assert_eq!(block.round(), 5);
assert_eq!(block.ancestors().len(), 4);
let extended_block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), 5);
assert_eq!(extended_block.block.author(), core.context.own_index);
assert_eq!(extended_block.block.ancestors().len(), 4);
assert_eq!(extended_block.excluded_ancestors.len(), 8);
}
#[tokio::test]
async fn test_core_set_subscriber_exists() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let _block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
false,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker,
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
assert!(core.try_propose(true).unwrap().is_none());
core.set_subscriber_exists(true);
assert!(core.try_propose(true).unwrap().is_some());
}
#[tokio::test]
async fn test_core_set_propagation_delay_per_authority() {
telemetry_subscribers::init_for_testing();
let (context, mut key_pairs) = Context::new_for_test(4);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let _block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
false,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
round_tracker.clone(),
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
let test_block = VerifiedBlock::new_for_test(TestBlock::new(1000, 0).build());
transaction_certifier.add_voted_blocks(vec![(test_block.clone(), vec![])]);
dag_state.write().accept_block(test_block);
round_tracker.write().update_from_probe(
vec![
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
],
vec![
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
],
);
core.set_subscriber_exists(true);
assert!(core.try_propose(true).unwrap().is_none());
round_tracker.write().update_from_probe(
vec![
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
],
vec![
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
vec![1000, 1000, 1000, 1000],
],
);
for author in 1..4 {
let block = VerifiedBlock::new_for_test(TestBlock::new(1000, author).build());
transaction_certifier.add_voted_blocks(vec![(block.clone(), vec![])]);
dag_state.write().accept_block(block);
}
assert!(core.try_propose(true).unwrap().is_some());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_leader_schedule_change() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();
let (context, _) = Context::new_for_test(4);
let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
let mut last_round_blocks = Vec::new();
for round in 1..=30 {
let mut this_round_blocks = Vec::new();
sleep(default_params.min_round_delay).await;
for core_fixture in &mut cores {
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
);
let new_round = receive(
Duration::from_secs(1),
core_fixture.signal_receivers.new_round_receiver(),
)
.await;
assert_eq!(new_round, round);
let extended_block = tokio::time::timeout(
Duration::from_secs(1),
core_fixture.block_receiver.recv(),
)
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), round);
assert_eq!(
extended_block.block.author(),
core_fixture.core.context.own_index
);
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
let block = core_fixture.core.last_proposed_block();
assert_eq!(
block.ancestors().len(),
core_fixture.core.context.committee.size()
);
for ancestor in block.ancestors() {
if block.round() > 1 {
assert!(
last_round_blocks
.iter()
.any(|block| block.reference() == *ancestor),
"Reference from previous round should be added"
);
}
}
}
last_round_blocks = this_round_blocks;
}
for core_fixture in cores {
core_fixture.dag_state.write().flush();
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 27);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), 27);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.bad_nodes
.len(),
1
);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.good_nodes
.len(),
1
);
let expected_reputation_scores =
ReputationScores::new((11..=20).into(), vec![29, 29, 29, 29]);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.reputation_scores,
expected_reputation_scores
);
}
}
#[tokio::test]
async fn test_filter_new_commits() {
telemetry_subscribers::init_for_testing();
let (context, _key_pairs) = Context::new_for_test(4);
let context = context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
});
let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
let mut core = core.core;
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();
dag_builder.print();
let blocks = dag_builder.blocks(1..=6);
for block in blocks {
core.dag_state.write().accept_block(block);
}
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
let committed_sub_dags = core.try_commit(vec![]).unwrap();
assert_eq!(committed_sub_dags.len(), 4);
println!("Case 1. Provide certified commits that are all before the last committed round.");
let certified_commits = sub_dags_and_commits
.iter()
.take(4)
.map(|(_, c)| c)
.cloned()
.collect::<Vec<_>>();
assert!(
certified_commits.last().unwrap().index()
<= committed_sub_dags.last().unwrap().commit_ref.index,
"Highest certified commit should older than the highest committed index."
);
let certified_commits = core.filter_new_commits(certified_commits).unwrap();
assert!(certified_commits.is_empty());
println!("Case 2. Provide certified commits that are all after the last committed round.");
let certified_commits = sub_dags_and_commits
.iter()
.take(5)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();
let certified_commits = core.filter_new_commits(certified_commits.clone()).unwrap();
assert_eq!(certified_commits.len(), 1);
assert_eq!(certified_commits.first().unwrap().reference().index, 5);
println!("Case 3. Provide certified commits where the first certified commit index is not the last_commited_index + 1.");
let certified_commits = sub_dags_and_commits
.iter()
.skip(5)
.take(1)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();
let err = core
.filter_new_commits(certified_commits.clone())
.unwrap_err();
match err {
ConsensusError::UnexpectedCertifiedCommitIndex {
expected_commit_index: 5,
commit_index: 6,
} => (),
_ => panic!("Unexpected error: {:?}", err),
}
}
#[tokio::test]
async fn test_add_certified_commits() {
telemetry_subscribers::init_for_testing();
let (context, _key_pairs) = Context::new_for_test(4);
let context = context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
});
let authority_index = AuthorityIndex::new_for_test(0);
let core = CoreTextFixture::new(context, vec![1, 1, 1, 1], authority_index, true).await;
let store = core.store.clone();
let mut core = core.core;
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
let mut dag_builder = DagBuilder::new(core.context.clone());
dag_builder.layers(1..=12).build();
dag_builder.print();
let blocks = dag_builder.blocks(1..=6);
for block in blocks {
core.dag_state.write().accept_block(block);
}
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=10);
let committed_sub_dags = core.try_commit(vec![]).unwrap();
assert_eq!(committed_sub_dags.len(), 4);
core.dag_state.write().flush();
println!("Case 1. Provide no certified commits. No commit should happen.");
let last_commit = store
.read_last_commit()
.unwrap()
.expect("Last commit should be set");
assert_eq!(last_commit.reference().index, 4);
println!("Case 2. Provide certified commits that before and after the last committed round and also there are additional blocks so can run the direct decide rule as well.");
let certified_commits = sub_dags_and_commits
.iter()
.skip(3)
.take(5)
.map(|(_, c)| c.clone())
.collect::<Vec<_>>();
let blocks = dag_builder.blocks(8..=12);
for block in blocks {
core.dag_state.write().accept_block(block);
}
core.add_certified_commits(CertifiedCommits::new(certified_commits.clone(), vec![]))
.expect("Should not fail");
core.dag_state.write().flush();
let commits = store.scan_commits((6..=10).into()).unwrap();
assert_eq!(commits.len(), 5);
for i in 6..=10 {
let commit = &commits[i - 6];
assert_eq!(commit.reference().index, i as u32);
}
}
#[tokio::test]
async fn try_commit_with_certified_commits_gced_blocks() {
const GC_DEPTH: u32 = 3;
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(5);
context
.protocol_config
.set_consensus_gc_depth_for_testing(GC_DEPTH);
let context = Arc::new(context.with_parameters(Parameters {
sync_last_known_own_block_timeout: Duration::from_millis(2_000),
..Default::default()
}));
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(context.clone(), dag_state.clone());
let leader_schedule = Arc::new(
LeaderSchedule::from_store(context.clone(), dag_state.clone())
.with_num_commits_per_schedule(10),
);
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let (signals, signal_receivers) = CoreSignals::new(context.clone());
let (blocks_sender, _blocks_receiver) =
monitored_mpsc::unbounded_channel("consensus_block_output");
let transaction_certifier =
TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
let _block_receiver = signal_receivers.block_broadcast_receiver();
let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
transaction_certifier.clone(),
leader_schedule.clone(),
)
.await;
let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
let mut core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
transaction_certifier.clone(),
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
true,
round_tracker,
);
assert_eq!(
core.last_proposed_round(),
GENESIS_ROUND,
"No block should have been created other than genesis"
);
let dag_str = "DAG {
Round 0 : { 5 },
Round 1 : { * },
Round 2 : {
A -> [-E1],
B -> [-E1],
C -> [-E1],
D -> [-E1],
},
Round 3 : {
A -> [*],
B -> [*],
C -> [*],
D -> [*],
},
Round 4 : {
A -> [*],
B -> [*],
C -> [*],
D -> [*],
},
Round 5 : {
A -> [*],
B -> [*],
C -> [*],
D -> [*],
E -> [A4, B4, C4, D4, E1]
},
Round 6 : { * },
Round 7 : { * },
}";
let (_, mut dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();
let (_sub_dags, certified_commits): (Vec<_>, Vec<_>) = dag_builder
.get_sub_dag_and_certified_commits(1..=5)
.into_iter()
.unzip();
let committed_sub_dags = core.try_commit(certified_commits).unwrap();
assert_eq!(committed_sub_dags.len(), 4);
for (index, committed_sub_dag) in committed_sub_dags.iter().enumerate() {
assert_eq!(committed_sub_dag.commit_ref.index as usize, index + 1);
for block in committed_sub_dag.blocks.iter() {
if block.round() == 1 && block.author() == AuthorityIndex::new_for_test(5) {
panic!("Did not expect to commit block E1");
}
}
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
}
async fn parameterized_test_commit_on_leader_schedule_change_boundary(
num_leaders_per_round: Option<usize>,
) {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();
let (mut context, _) = Context::new_for_test(6);
context
.protocol_config
.set_mysticeti_num_leaders_per_round_for_testing(num_leaders_per_round);
let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]).await;
let mut last_round_blocks: Vec<VerifiedBlock> = Vec::new();
for round in 1..=33 {
let mut this_round_blocks = Vec::new();
sleep(default_params.min_round_delay).await;
for core_fixture in &mut cores {
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
],
vec![
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
vec![round, round, round, round, round, round],
],
);
let new_round = receive(
Duration::from_secs(1),
core_fixture.signal_receivers.new_round_receiver(),
)
.await;
assert_eq!(new_round, round);
let extended_block = tokio::time::timeout(
Duration::from_secs(1),
core_fixture.block_receiver.recv(),
)
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), round);
assert_eq!(
extended_block.block.author(),
core_fixture.core.context.own_index
);
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
let block = core_fixture.core.last_proposed_block();
assert_eq!(
block.ancestors().len(),
core_fixture.core.context.committee.size()
);
for ancestor in block.ancestors() {
if block.round() > 1 {
assert!(
last_round_blocks
.iter()
.any(|block| block.reference() == *ancestor),
"Reference from previous round should be added"
);
}
}
}
last_round_blocks = this_round_blocks;
}
for core_fixture in cores {
let expected_commit_count = match num_leaders_per_round {
Some(1) => 30,
_ => 31,
};
core_fixture.dag_state.write().flush();
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), expected_commit_count);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.bad_nodes
.len(),
1
);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.good_nodes
.len(),
1
);
let expected_reputation_scores =
ReputationScores::new((21..=30).into(), vec![43, 43, 43, 43, 43, 43]);
assert_eq!(
core_fixture
.core
.leader_schedule
.leader_swap_table
.read()
.reputation_scores,
expected_reputation_scores
);
}
}
#[tokio::test]
async fn test_core_signals() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();
let (context, _) = Context::new_for_test(4);
let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
let mut last_round_blocks = Vec::new();
for round in 1..=10 {
let mut this_round_blocks = Vec::new();
sleep(default_params.min_round_delay).await;
for core_fixture in &mut cores {
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
);
let new_round = receive(
Duration::from_secs(1),
core_fixture.signal_receivers.new_round_receiver(),
)
.await;
assert_eq!(new_round, round);
let extended_block = tokio::time::timeout(
Duration::from_secs(1),
core_fixture.block_receiver.recv(),
)
.await
.unwrap()
.unwrap();
assert_eq!(extended_block.block.round(), round);
assert_eq!(
extended_block.block.author(),
core_fixture.core.context.own_index
);
this_round_blocks.push(core_fixture.core.last_proposed_block().clone());
let block = core_fixture.core.last_proposed_block();
assert_eq!(
block.ancestors().len(),
core_fixture.core.context.committee.size()
);
for ancestor in block.ancestors() {
if block.round() > 1 {
assert!(
last_round_blocks
.iter()
.any(|block| block.reference() == *ancestor),
"Reference from previous round should be added"
);
}
}
}
last_round_blocks = this_round_blocks;
}
for core_fixture in cores {
core_fixture.dag_state.write().flush();
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 7);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), 7);
}
}
#[tokio::test]
async fn test_core_compress_proposal_references() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();
let (context, _) = Context::new_for_test(4);
let mut cores = create_cores(context, vec![1, 1, 1, 1]).await;
let mut last_round_blocks = Vec::new();
let mut all_blocks = Vec::new();
let excluded_authority = AuthorityIndex::new_for_test(3);
for round in 1..=10 {
let mut this_round_blocks = Vec::new();
for core_fixture in &mut cores {
if core_fixture.core.context.own_index == excluded_authority {
continue;
}
core_fixture.add_blocks(last_round_blocks.clone()).unwrap();
core_fixture.core.round_tracker.write().update_from_probe(
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
vec![
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
vec![round, round, round, round],
],
);
core_fixture.core.new_block(round, true).unwrap();
let block = core_fixture.core.last_proposed_block();
assert_eq!(block.round(), round);
this_round_blocks.push(block.clone());
}
last_round_blocks = this_round_blocks.clone();
all_blocks.extend(this_round_blocks);
}
let core_fixture = &mut cores[excluded_authority];
sleep(default_params.min_round_delay).await;
core_fixture.add_blocks(all_blocks).unwrap();
let block = core_fixture.core.last_proposed_block();
assert_eq!(block.round(), 11);
assert_eq!(block.ancestors().len(), 4);
for block_ref in block.ancestors() {
if block_ref.author == excluded_authority {
assert_eq!(block_ref.round, 1);
} else {
assert_eq!(block_ref.round, 10);
}
}
core_fixture.dag_state.write().flush();
let last_commit = core_fixture
.store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
assert_eq!(last_commit.index(), 6);
let all_stored_commits = core_fixture
.store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), 6);
}
#[tokio::test]
async fn try_select_certified_leaders() {
telemetry_subscribers::init_for_testing();
let (context, _) = Context::new_for_test(4);
let authority_index = AuthorityIndex::new_for_test(0);
let core =
CoreTextFixture::new(context.clone(), vec![1, 1, 1, 1], authority_index, true).await;
let mut core = core.core;
let mut dag_builder = DagBuilder::new(Arc::new(context.clone()));
dag_builder.layers(1..=12).build();
let limit = 2;
let blocks = dag_builder.blocks(1..=12);
for block in blocks {
core.dag_state.write().accept_block(block);
}
let sub_dags_and_commits = dag_builder.get_sub_dag_and_certified_commits(1..=4);
let mut certified_commits = sub_dags_and_commits
.into_iter()
.map(|(_, commit)| commit)
.collect::<Vec<_>>();
let leaders = core.try_select_certified_leaders(&mut certified_commits, limit);
assert_eq!(leaders.len(), 2);
assert_eq!(certified_commits.len(), 2);
}
pub(crate) async fn receive<T: Copy>(timeout: Duration, mut receiver: watch::Receiver<T>) -> T {
tokio::time::timeout(timeout, receiver.changed())
.await
.expect("Timeout while waiting to read from receiver")
.expect("Signal receive channel shouldn't be closed");
*receiver.borrow_and_update()
}
}