consensus_core/universal_committer.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::{collections::VecDeque, sync::Arc};
use consensus_config::AuthorityIndex;
use parking_lot::RwLock;
use crate::{
base_committer::BaseCommitter,
block::{Round, Slot, GENESIS_ROUND},
commit::{DecidedLeader, Decision},
context::Context,
dag_state::DagState,
};
#[cfg(test)]
#[path = "tests/universal_committer_tests.rs"]
mod universal_committer_tests;
#[cfg(test)]
#[path = "tests/pipelined_committer_tests.rs"]
mod pipelined_committer_tests;
/// A universal committer uses a collection of committers to commit a sequence of leaders.
/// It can be configured to use a combination of different commit strategies, including
/// multi-leaders, backup leaders, and pipelines.
pub(crate) struct UniversalCommitter {
/// The per-epoch configuration of this authority.
context: Arc<Context>,
/// In memory block store representing the dag state
dag_state: Arc<RwLock<DagState>>,
/// The list of committers for multi-leader or pipelining
committers: Vec<BaseCommitter>,
}
impl UniversalCommitter {
/// Try to decide part of the dag. This function is idempotent and returns an ordered list of
/// decided leaders.
#[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
let highest_accepted_round = self.dag_state.read().highest_accepted_round();
// Try to decide as many leaders as possible, starting with the highest round.
let mut leaders = VecDeque::new();
let last_round = match self
.context
.protocol_config
.mysticeti_num_leaders_per_round()
{
Some(1) => {
// Ensure that we don't commit any leaders from the same round as last_decided
// until we have full support for multi-leader per round.
// This can happen when we are on a leader schedule boundary and the leader
// elected for the round changes with the new schedule.
last_decided.round + 1
}
_ => last_decided.round,
};
// try to commit a leader up to the highest_accepted_round - 2. There is no
// reason to try and iterate on higher rounds as in order to make a direct
// decision for a leader at round R we need blocks from round R+2 to figure
// out that enough certificates and support exist to commit a leader.
'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
for committer in self.committers.iter().rev() {
// Skip committers that don't have a leader for this round.
let Some(slot) = committer.elect_leader(round) else {
tracing::debug!("No leader for round {round}, skipping");
continue;
};
// now that we reached the last committed leader we can stop the commit rule
if slot == last_decided {
tracing::debug!("Reached last committed {slot}, now exit");
break 'outer;
}
tracing::debug!("Trying to decide {slot} with {committer}",);
// Try to directly decide the leader.
let mut status = committer.try_direct_decide(slot);
tracing::debug!("Outcome of direct rule: {status}");
// If we can't directly decide the leader, try to indirectly decide it.
if status.is_decided() {
leaders.push_front((status, Decision::Direct));
} else {
status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
tracing::debug!("Outcome of indirect rule: {status}");
leaders.push_front((status, Decision::Indirect));
}
}
}
// The decided sequence is the longest prefix of decided leaders.
let mut decided_leaders = Vec::new();
for (leader, decision) in leaders {
if leader.round() == GENESIS_ROUND {
continue;
}
let Some(decided_leader) = leader.into_decided_leader() else {
break;
};
Self::update_metrics(&self.context, &decided_leader, decision);
decided_leaders.push(decided_leader);
}
tracing::debug!("Decided {decided_leaders:?}");
decided_leaders
}
/// Return list of leaders for the round.
/// Can return empty vec if round does not have a designated leader.
pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
self.committers
.iter()
.filter_map(|committer| committer.elect_leader(round))
.map(|l| l.authority)
.collect()
}
/// Update metrics.
pub(crate) fn update_metrics(
context: &Context,
decided_leader: &DecidedLeader,
decision: Decision,
) {
let decision_str = match decision {
Decision::Direct => "direct",
Decision::Indirect => "indirect",
Decision::Certified => "certified",
};
let status = match decided_leader {
DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
};
let leader_host = &context
.committee
.authority(decided_leader.slot().authority)
.hostname;
context
.metrics
.node_metrics
.committed_leaders_total
.with_label_values(&[leader_host, &status])
.inc();
}
}
/// A builder for a universal committer. By default, the builder creates a single
/// base committer, that is, a single leader and no pipeline.
pub(crate) mod universal_committer_builder {
use super::*;
use crate::{
base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
leader_schedule::LeaderSchedule,
};
pub(crate) struct UniversalCommitterBuilder {
context: Arc<Context>,
leader_schedule: Arc<LeaderSchedule>,
dag_state: Arc<RwLock<DagState>>,
wave_length: Round,
number_of_leaders: usize,
pipeline: bool,
}
impl UniversalCommitterBuilder {
pub(crate) fn new(
context: Arc<Context>,
leader_schedule: Arc<LeaderSchedule>,
dag_state: Arc<RwLock<DagState>>,
) -> Self {
Self {
context,
leader_schedule,
dag_state,
wave_length: DEFAULT_WAVE_LENGTH,
number_of_leaders: 1,
pipeline: false,
}
}
#[allow(unused)]
pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self {
self.wave_length = wave_length;
self
}
pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
self.number_of_leaders = number_of_leaders;
self
}
pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
self.pipeline = pipeline;
self
}
pub(crate) fn build(self) -> UniversalCommitter {
let mut committers = Vec::new();
let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
for round_offset in 0..pipeline_stages {
for leader_offset in 0..self.number_of_leaders {
let options = BaseCommitterOptions {
wave_length: self.wave_length,
round_offset,
leader_offset: leader_offset as Round,
};
let committer = BaseCommitter::new(
self.context.clone(),
self.leader_schedule.clone(),
self.dag_state.clone(),
options,
);
committers.push(committer);
}
}
UniversalCommitter {
context: self.context,
dag_state: self.dag_state,
committers,
}
}
}
}