consensus_core/
universal_committer.rsuse std::{collections::VecDeque, sync::Arc};
use consensus_config::AuthorityIndex;
use consensus_types::block::Round;
use parking_lot::RwLock;
use crate::{
base_committer::BaseCommitter,
block::{Slot, GENESIS_ROUND},
commit::{DecidedLeader, Decision},
context::Context,
dag_state::DagState,
};
#[cfg(test)]
#[path = "tests/universal_committer_tests.rs"]
mod universal_committer_tests;
#[cfg(test)]
#[path = "tests/pipelined_committer_tests.rs"]
mod pipelined_committer_tests;
pub(crate) struct UniversalCommitter {
context: Arc<Context>,
dag_state: Arc<RwLock<DagState>>,
committers: Vec<BaseCommitter>,
}
impl UniversalCommitter {
#[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
let highest_accepted_round = self.dag_state.read().highest_accepted_round();
let mut leaders = VecDeque::new();
let last_round = match self
.context
.protocol_config
.mysticeti_num_leaders_per_round()
{
Some(1) => {
last_decided.round + 1
}
_ => last_decided.round,
};
'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
for committer in self.committers.iter().rev() {
let Some(slot) = committer.elect_leader(round) else {
tracing::debug!("No leader for round {round}, skipping");
continue;
};
if slot == last_decided {
tracing::debug!("Reached last committed {slot}, now exit");
break 'outer;
}
tracing::trace!("Trying to decide {slot} with {committer}",);
let mut status = committer.try_direct_decide(slot);
tracing::debug!("Outcome of direct rule: {status} with {committer}");
if status.is_decided() {
leaders.push_front((status, Decision::Direct));
} else {
status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
tracing::debug!("Outcome of indirect rule: {status} with {committer}");
leaders.push_front((status, Decision::Indirect));
}
}
}
let mut decided_leaders = Vec::new();
for (leader, decision) in leaders {
if leader.round() == GENESIS_ROUND {
continue;
}
let Some(decided_leader) = leader.into_decided_leader(decision == Decision::Direct)
else {
break;
};
Self::update_metrics(&self.context, &decided_leader, decision);
decided_leaders.push(decided_leader);
}
if !decided_leaders.is_empty() {
tracing::debug!("Decided {decided_leaders:?}");
}
decided_leaders
}
pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
self.committers
.iter()
.filter_map(|committer| committer.elect_leader(round))
.map(|l| l.authority)
.collect()
}
pub(crate) fn update_metrics(
context: &Context,
decided_leader: &DecidedLeader,
decision: Decision,
) {
let decision_str = match decision {
Decision::Direct => "direct",
Decision::Indirect => "indirect",
Decision::Certified => "certified",
};
let status = match decided_leader {
DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
};
let leader_host = &context
.committee
.authority(decided_leader.slot().authority)
.hostname;
context
.metrics
.node_metrics
.committed_leaders_total
.with_label_values(&[leader_host, &status])
.inc();
}
}
pub(crate) mod universal_committer_builder {
use super::*;
use crate::{
base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
leader_schedule::LeaderSchedule,
};
pub(crate) struct UniversalCommitterBuilder {
context: Arc<Context>,
leader_schedule: Arc<LeaderSchedule>,
dag_state: Arc<RwLock<DagState>>,
wave_length: Round,
number_of_leaders: usize,
pipeline: bool,
}
impl UniversalCommitterBuilder {
pub(crate) fn new(
context: Arc<Context>,
leader_schedule: Arc<LeaderSchedule>,
dag_state: Arc<RwLock<DagState>>,
) -> Self {
Self {
context,
leader_schedule,
dag_state,
wave_length: DEFAULT_WAVE_LENGTH,
number_of_leaders: 1,
pipeline: false,
}
}
#[allow(unused)]
pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self {
self.wave_length = wave_length;
self
}
pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
self.number_of_leaders = number_of_leaders;
self
}
pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
self.pipeline = pipeline;
self
}
pub(crate) fn build(self) -> UniversalCommitter {
let mut committers = Vec::new();
let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
for round_offset in 0..pipeline_stages {
for leader_offset in 0..self.number_of_leaders {
let options = BaseCommitterOptions {
wave_length: self.wave_length,
round_offset,
leader_offset: leader_offset as Round,
};
let committer = BaseCommitter::new(
self.context.clone(),
self.leader_schedule.clone(),
self.dag_state.clone(),
options,
);
committers.push(committer);
}
}
UniversalCommitter {
context: self.context,
dag_state: self.dag_state,
committers,
}
}
}
}