consensus_core/
universal_committer.rs1use std::{collections::VecDeque, sync::Arc};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use parking_lot::RwLock;
9
10use crate::{
11 base_committer::BaseCommitter,
12 block::{GENESIS_ROUND, Slot},
13 commit::{DecidedLeader, Decision},
14 context::Context,
15 dag_state::DagState,
16};
17
18#[cfg(test)]
19#[path = "tests/universal_committer_tests.rs"]
20mod universal_committer_tests;
21
22#[cfg(test)]
23#[path = "tests/pipelined_committer_tests.rs"]
24mod pipelined_committer_tests;
25
26pub(crate) struct UniversalCommitter {
30 context: Arc<Context>,
32 dag_state: Arc<RwLock<DagState>>,
34 committers: Vec<BaseCommitter>,
36}
37
38impl UniversalCommitter {
39 #[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
42 pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
43 let highest_accepted_round = self.dag_state.read().highest_accepted_round();
44
45 let mut leaders = VecDeque::new();
47
48 let last_round = match self.context.protocol_config.num_leaders_per_round() {
49 Some(1) => {
50 last_decided.round + 1
55 }
56 _ => last_decided.round,
57 };
58
59 'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
64 for committer in self.committers.iter().rev() {
65 let Some(slot) = committer.elect_leader(round) else {
67 tracing::debug!("No leader for round {round}, skipping");
68 continue;
69 };
70
71 if slot == last_decided {
73 tracing::debug!("Reached last committed {slot}, now exit");
74 break 'outer;
75 }
76
77 tracing::trace!("Trying to decide {slot} with {committer}",);
78
79 let mut status = committer.try_direct_decide(slot);
81 tracing::debug!("Outcome of direct rule: {status} with {committer}");
82
83 if status.is_decided() {
85 leaders.push_front((status, Decision::Direct));
86 } else {
87 status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
88 tracing::debug!("Outcome of indirect rule: {status} with {committer}");
89 leaders.push_front((status, Decision::Indirect));
90 }
91 }
92 }
93
94 let mut decided_leaders = Vec::new();
96 for (leader, decision) in leaders {
97 if leader.round() == GENESIS_ROUND {
98 continue;
99 }
100 let Some(decided_leader) = leader.into_decided_leader(decision == Decision::Direct)
101 else {
102 break;
103 };
104 Self::update_metrics(&self.context, &decided_leader, decision);
105 decided_leaders.push(decided_leader);
106 }
107 if !decided_leaders.is_empty() {
108 tracing::debug!("Decided {decided_leaders:?}");
109 }
110 decided_leaders
111 }
112
113 pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
116 self.committers
117 .iter()
118 .filter_map(|committer| committer.elect_leader(round))
119 .map(|l| l.authority)
120 .collect()
121 }
122
123 pub(crate) fn update_metrics(
125 context: &Context,
126 decided_leader: &DecidedLeader,
127 decision: Decision,
128 ) {
129 let decision_str = match decision {
130 Decision::Direct => "direct",
131 Decision::Indirect => "indirect",
132 Decision::Certified => "certified",
133 };
134 let status = match decided_leader {
135 DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
136 DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
137 };
138 let leader_host = &context
139 .committee
140 .authority(decided_leader.slot().authority)
141 .hostname;
142 context
143 .metrics
144 .node_metrics
145 .committed_leaders_total
146 .with_label_values(&[leader_host, &status])
147 .inc();
148 }
149}
150
151pub(crate) mod universal_committer_builder {
154 use super::*;
155 use crate::{
156 base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
157 leader_schedule::LeaderSchedule,
158 };
159
160 pub(crate) struct UniversalCommitterBuilder {
161 context: Arc<Context>,
162 leader_schedule: Arc<LeaderSchedule>,
163 dag_state: Arc<RwLock<DagState>>,
164 wave_length: Round,
165 number_of_leaders: usize,
166 pipeline: bool,
167 }
168
169 impl UniversalCommitterBuilder {
170 pub(crate) fn new(
171 context: Arc<Context>,
172 leader_schedule: Arc<LeaderSchedule>,
173 dag_state: Arc<RwLock<DagState>>,
174 ) -> Self {
175 Self {
176 context,
177 leader_schedule,
178 dag_state,
179 wave_length: DEFAULT_WAVE_LENGTH,
180 number_of_leaders: 1,
181 pipeline: false,
182 }
183 }
184
185 pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
186 self.number_of_leaders = number_of_leaders;
187 self
188 }
189
190 pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
191 self.pipeline = pipeline;
192 self
193 }
194
195 pub(crate) fn build(self) -> UniversalCommitter {
196 let mut committers = Vec::new();
197 let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
198 for round_offset in 0..pipeline_stages {
199 for leader_offset in 0..self.number_of_leaders {
200 let options = BaseCommitterOptions {
201 wave_length: self.wave_length,
202 round_offset,
203 leader_offset: leader_offset as Round,
204 };
205 let committer = BaseCommitter::new(
206 self.context.clone(),
207 self.leader_schedule.clone(),
208 self.dag_state.clone(),
209 options,
210 );
211 committers.push(committer);
212 }
213 }
214
215 UniversalCommitter {
216 context: self.context,
217 dag_state: self.dag_state,
218 committers,
219 }
220 }
221 }
222}