consensus_core/
universal_committer.rs1use std::{collections::VecDeque, sync::Arc};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use parking_lot::RwLock;
9
10use crate::{
11 base_committer::BaseCommitter,
12 block::{GENESIS_ROUND, Slot},
13 commit::{DecidedLeader, Decision},
14 context::Context,
15 dag_state::DagState,
16};
17
18#[cfg(test)]
19#[path = "tests/universal_committer_tests.rs"]
20mod universal_committer_tests;
21
22#[cfg(test)]
23#[path = "tests/pipelined_committer_tests.rs"]
24mod pipelined_committer_tests;
25
26pub(crate) struct UniversalCommitter {
30 context: Arc<Context>,
32 dag_state: Arc<RwLock<DagState>>,
34 committers: Vec<BaseCommitter>,
36}
37
38impl UniversalCommitter {
39 #[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
42 pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
43 let highest_accepted_round = self.dag_state.read().highest_accepted_round();
44
45 let mut leaders = VecDeque::new();
47
48 let last_round = match self
49 .context
50 .protocol_config
51 .mysticeti_num_leaders_per_round()
52 {
53 Some(1) => {
54 last_decided.round + 1
59 }
60 _ => last_decided.round,
61 };
62
63 'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
68 for committer in self.committers.iter().rev() {
69 let Some(slot) = committer.elect_leader(round) else {
71 tracing::debug!("No leader for round {round}, skipping");
72 continue;
73 };
74
75 if slot == last_decided {
77 tracing::debug!("Reached last committed {slot}, now exit");
78 break 'outer;
79 }
80
81 tracing::trace!("Trying to decide {slot} with {committer}",);
82
83 let mut status = committer.try_direct_decide(slot);
85 tracing::debug!("Outcome of direct rule: {status} with {committer}");
86
87 if status.is_decided() {
89 leaders.push_front((status, Decision::Direct));
90 } else {
91 status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
92 tracing::debug!("Outcome of indirect rule: {status} with {committer}");
93 leaders.push_front((status, Decision::Indirect));
94 }
95 }
96 }
97
98 let mut decided_leaders = Vec::new();
100 for (leader, decision) in leaders {
101 if leader.round() == GENESIS_ROUND {
102 continue;
103 }
104 let Some(decided_leader) = leader.into_decided_leader(decision == Decision::Direct)
105 else {
106 break;
107 };
108 Self::update_metrics(&self.context, &decided_leader, decision);
109 decided_leaders.push(decided_leader);
110 }
111 if !decided_leaders.is_empty() {
112 tracing::debug!("Decided {decided_leaders:?}");
113 }
114 decided_leaders
115 }
116
117 pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
120 self.committers
121 .iter()
122 .filter_map(|committer| committer.elect_leader(round))
123 .map(|l| l.authority)
124 .collect()
125 }
126
127 pub(crate) fn update_metrics(
129 context: &Context,
130 decided_leader: &DecidedLeader,
131 decision: Decision,
132 ) {
133 let decision_str = match decision {
134 Decision::Direct => "direct",
135 Decision::Indirect => "indirect",
136 Decision::Certified => "certified",
137 };
138 let status = match decided_leader {
139 DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
140 DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
141 };
142 let leader_host = &context
143 .committee
144 .authority(decided_leader.slot().authority)
145 .hostname;
146 context
147 .metrics
148 .node_metrics
149 .committed_leaders_total
150 .with_label_values(&[leader_host, &status])
151 .inc();
152 }
153}
154
155pub(crate) mod universal_committer_builder {
158 use super::*;
159 use crate::{
160 base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
161 leader_schedule::LeaderSchedule,
162 };
163
164 pub(crate) struct UniversalCommitterBuilder {
165 context: Arc<Context>,
166 leader_schedule: Arc<LeaderSchedule>,
167 dag_state: Arc<RwLock<DagState>>,
168 wave_length: Round,
169 number_of_leaders: usize,
170 pipeline: bool,
171 }
172
173 impl UniversalCommitterBuilder {
174 pub(crate) fn new(
175 context: Arc<Context>,
176 leader_schedule: Arc<LeaderSchedule>,
177 dag_state: Arc<RwLock<DagState>>,
178 ) -> Self {
179 Self {
180 context,
181 leader_schedule,
182 dag_state,
183 wave_length: DEFAULT_WAVE_LENGTH,
184 number_of_leaders: 1,
185 pipeline: false,
186 }
187 }
188
189 #[allow(unused)]
190 pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self {
191 self.wave_length = wave_length;
192 self
193 }
194
195 pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
196 self.number_of_leaders = number_of_leaders;
197 self
198 }
199
200 pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
201 self.pipeline = pipeline;
202 self
203 }
204
205 pub(crate) fn build(self) -> UniversalCommitter {
206 let mut committers = Vec::new();
207 let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
208 for round_offset in 0..pipeline_stages {
209 for leader_offset in 0..self.number_of_leaders {
210 let options = BaseCommitterOptions {
211 wave_length: self.wave_length,
212 round_offset,
213 leader_offset: leader_offset as Round,
214 };
215 let committer = BaseCommitter::new(
216 self.context.clone(),
217 self.leader_schedule.clone(),
218 self.dag_state.clone(),
219 options,
220 );
221 committers.push(committer);
222 }
223 }
224
225 UniversalCommitter {
226 context: self.context,
227 dag_state: self.dag_state,
228 committers,
229 }
230 }
231 }
232}