1use std::{collections::HashMap, fmt::Display, sync::Arc};
5
6use consensus_config::{AuthorityIndex, Stake};
7use consensus_types::block::{BlockRef, Round};
8use parking_lot::RwLock;
9use tracing::warn;
10
11use crate::{
12 block::{BlockAPI, Slot, VerifiedBlock},
13 commit::{DEFAULT_WAVE_LENGTH, LeaderStatus, WaveNumber},
14 context::Context,
15 dag_state::DagState,
16 leader_schedule::LeaderSchedule,
17 stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[cfg(test)]
21#[path = "tests/base_committer_tests.rs"]
22mod base_committer_tests;
23
24#[cfg(test)]
25#[path = "tests/base_committer_declarative_tests.rs"]
26mod base_committer_declarative_tests;
27
28pub(crate) struct BaseCommitterOptions {
29 pub wave_length: u32,
32 pub leader_offset: u32,
36 pub round_offset: u32,
40}
41
42impl Default for BaseCommitterOptions {
43 fn default() -> Self {
44 Self {
45 wave_length: DEFAULT_WAVE_LENGTH,
46 leader_offset: 0,
47 round_offset: 0,
48 }
49 }
50}
51
52pub(crate) struct BaseCommitter {
57 context: Arc<Context>,
59 leader_schedule: Arc<LeaderSchedule>,
62 dag_state: Arc<RwLock<DagState>>,
64 options: BaseCommitterOptions,
66}
67
68impl BaseCommitter {
69 pub fn new(
70 context: Arc<Context>,
71 leader_schedule: Arc<LeaderSchedule>,
72 dag_state: Arc<RwLock<DagState>>,
73 options: BaseCommitterOptions,
74 ) -> Self {
75 Self {
76 context,
77 leader_schedule,
78 dag_state,
79 options,
80 }
81 }
82
83 #[tracing::instrument(skip_all, fields(leader = %leader))]
86 pub fn try_direct_decide(&self, leader: Slot) -> LeaderStatus {
87 let voting_round = leader.round + 1;
90 if self.enough_leader_blame(voting_round, leader.authority) {
91 return LeaderStatus::Skip(leader);
92 }
93
94 let wave = self.wave_number(leader.round);
98 let decision_round = self.decision_round(wave);
99 let leader_blocks = self.dag_state.read().get_uncommitted_blocks_at_slot(leader);
100 let mut leaders_with_enough_support: Vec<_> = leader_blocks
101 .into_iter()
102 .filter(|l| self.enough_leader_support(decision_round, l))
103 .map(LeaderStatus::Commit)
104 .collect();
105
106 if leaders_with_enough_support.len() > 1 {
109 panic!(
110 "[{self}] More than one candidate for {leader}: {leaders_with_enough_support:?}"
111 );
112 }
113
114 leaders_with_enough_support
115 .pop()
116 .unwrap_or(LeaderStatus::Undecided(leader))
117 }
118
119 #[tracing::instrument(skip_all, fields(leader = %leader_slot))]
122 pub fn try_indirect_decide<'a>(
123 &self,
124 leader_slot: Slot,
125 leaders: impl Iterator<Item = &'a LeaderStatus>,
126 ) -> LeaderStatus {
127 let anchors = leaders.filter(|x| leader_slot.round + self.options.wave_length <= x.round());
130
131 for anchor in anchors {
132 tracing::trace!(
133 "[{self}] Trying to indirect-decide {leader_slot} using anchor {anchor}",
134 );
135 match anchor {
136 LeaderStatus::Commit(anchor) => {
137 return self.decide_leader_from_anchor(anchor, leader_slot);
138 }
139 LeaderStatus::Skip(..) => (),
140 LeaderStatus::Undecided(..) => break,
141 }
142 }
143
144 LeaderStatus::Undecided(leader_slot)
145 }
146
147 pub fn elect_leader(&self, round: Round) -> Option<Slot> {
148 let wave = self.wave_number(round);
149 tracing::trace!(
150 "elect_leader: round={}, wave={}, leader_round={}, leader_offset={}",
151 round,
152 wave,
153 self.leader_round(wave),
154 self.options.leader_offset
155 );
156 if self.leader_round(wave) != round {
157 return None;
158 }
159
160 Some(Slot::new(
161 round,
162 self.leader_schedule
163 .elect_leader(round, self.options.leader_offset),
164 ))
165 }
166
167 pub(crate) fn leader_round(&self, wave: WaveNumber) -> Round {
171 (wave * self.options.wave_length) + self.options.round_offset
172 }
173
174 pub(crate) fn decision_round(&self, wave: WaveNumber) -> Round {
178 let wave_length = self.options.wave_length;
179 (wave * wave_length) + wave_length - 1 + self.options.round_offset
180 }
181
182 pub(crate) fn wave_number(&self, round: Round) -> WaveNumber {
185 round.saturating_sub(self.options.round_offset) / self.options.wave_length
186 }
187
188 fn find_supported_block(&self, leader_slot: Slot, from: &VerifiedBlock) -> Option<BlockRef> {
194 if from.round() < leader_slot.round {
195 return None;
196 }
197 for ancestor in from.ancestors() {
198 if Slot::from(*ancestor) == leader_slot {
199 return Some(*ancestor);
200 }
201 if ancestor.round <= leader_slot.round {
203 continue;
204 }
205 let ancestor = self
206 .dag_state
207 .read()
208 .get_block(ancestor)
209 .unwrap_or_else(|| panic!("Block not found in storage: {:?}", ancestor));
210 if let Some(support) = self.find_supported_block(leader_slot, &ancestor) {
211 return Some(support);
212 }
213 }
214 None
215 }
216
217 fn is_vote(&self, potential_vote: &VerifiedBlock, leader_block: &VerifiedBlock) -> bool {
220 let reference = leader_block.reference();
221 let leader_slot = Slot::from(reference);
222 self.find_supported_block(leader_slot, potential_vote) == Some(reference)
223 }
224
225 fn is_certificate(
232 &self,
233 potential_certificate: &VerifiedBlock,
234 leader_block: &VerifiedBlock,
235 all_votes: &mut HashMap<BlockRef, bool>,
236 ) -> bool {
237 let gc_round = self.dag_state.read().gc_round();
238
239 let mut votes_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
240 for reference in potential_certificate.ancestors() {
241 let is_vote = if let Some(is_vote) = all_votes.get(reference) {
242 *is_vote
243 } else {
244 let potential_vote = self.dag_state.read().get_block(reference);
245
246 let is_vote = {
247 if let Some(potential_vote) = potential_vote {
248 self.is_vote(&potential_vote, leader_block)
249 } else {
250 assert!(
251 reference.round <= gc_round,
252 "Block not found in storage: {:?} , and is not below gc_round: {gc_round}",
253 reference
254 );
255 false
256 }
257 };
258
259 all_votes.insert(*reference, is_vote);
260 is_vote
261 };
262
263 if is_vote {
264 tracing::trace!("[{self}] {reference} is a vote for {leader_block}");
265 if votes_stake_aggregator.add(reference.author, &self.context.committee) {
266 tracing::trace!(
267 "[{self}] {potential_certificate} is a certificate for leader {leader_block}"
268 );
269 return true;
270 }
271 } else {
272 tracing::trace!("[{self}] {reference} is not a vote for {leader_block}",);
273 }
274 }
275 tracing::trace!(
276 "[{self}] {potential_certificate} is not a certificate for leader {leader_block}"
277 );
278 false
279 }
280
281 fn decide_leader_from_anchor(&self, anchor: &VerifiedBlock, leader_slot: Slot) -> LeaderStatus {
285 let leader_blocks = self
288 .dag_state
289 .read()
290 .get_uncommitted_blocks_at_slot(leader_slot);
291
292 if leader_blocks.len() > 1 {
294 tracing::warn!(
295 "Multiple blocks found for leader slot {leader_slot}: {:?}",
296 leader_blocks
297 );
298 }
299
300 let wave = self.wave_number(leader_slot.round);
303 let decision_round = self.decision_round(wave);
304 let potential_certificates = self
305 .dag_state
306 .read()
307 .ancestors_at_round(anchor, decision_round);
308
309 let mut certified_leader_blocks: Vec<_> = leader_blocks
312 .into_iter()
313 .filter(|leader_block| {
314 let mut all_votes = HashMap::new();
315 potential_certificates.iter().any(|potential_certificate| {
316 self.is_certificate(potential_certificate, leader_block, &mut all_votes)
317 })
318 })
319 .collect();
320
321 if certified_leader_blocks.len() > 1 {
323 panic!(
324 "More than one certified leader at wave {wave} in {leader_slot}: {certified_leader_blocks:?}"
325 );
326 }
327
328 match certified_leader_blocks.pop() {
331 Some(certified_leader_block) => LeaderStatus::Commit(certified_leader_block),
332 None => LeaderStatus::Skip(leader_slot),
333 }
334 }
335
336 fn enough_leader_blame(&self, voting_round: Round, leader: AuthorityIndex) -> bool {
338 let voting_blocks = self
339 .dag_state
340 .read()
341 .get_uncommitted_blocks_at_round(voting_round);
342
343 let mut blame_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
344 for voting_block in &voting_blocks {
345 let voter = voting_block.reference().author;
346 if voting_block
347 .ancestors()
348 .iter()
349 .all(|ancestor| ancestor.author != leader)
350 {
351 tracing::trace!(
352 "[{self}] {voting_block} is a blame for leader {}",
353 Slot::new(voting_round - 1, leader)
354 );
355 if blame_stake_aggregator.add(voter, &self.context.committee) {
356 return true;
357 }
358 } else {
359 tracing::trace!(
360 "[{self}] {voting_block} is not a blame for leader {}",
361 Slot::new(voting_round - 1, leader)
362 );
363 }
364 }
365 false
366 }
367
368 fn enough_leader_support(&self, decision_round: Round, leader_block: &VerifiedBlock) -> bool {
371 let decision_blocks = self
372 .dag_state
373 .read()
374 .get_uncommitted_blocks_at_round(decision_round);
375
376 let total_stake: Stake = decision_blocks
379 .iter()
380 .map(|b| self.context.committee.stake(b.author()))
381 .sum();
382 if !self.context.committee.reached_quorum(total_stake) {
383 tracing::trace!(
384 "Not enough support for {leader_block}. Stake not enough: {total_stake} < {}",
385 self.context.committee.quorum_threshold()
386 );
387 return false;
388 }
389
390 let mut certificate_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
391 let mut all_votes = HashMap::new();
392 for decision_block in &decision_blocks {
393 let authority = decision_block.reference().author;
394 if self.is_certificate(decision_block, leader_block, &mut all_votes)
395 && certificate_stake_aggregator.add(authority, &self.context.committee)
396 {
397 return true;
398 }
399 }
400 false
401 }
402}
403
404impl Display for BaseCommitter {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 write!(
407 f,
408 "Committer-L{}-R{}",
409 self.options.leader_offset, self.options.round_offset
410 )
411 }
412}
413
414#[cfg(test)]
418mod base_committer_builder {
419 use super::*;
420 use crate::leader_schedule::LeaderSwapTable;
421
422 pub(crate) struct BaseCommitterBuilder {
423 context: Arc<Context>,
424 dag_state: Arc<RwLock<DagState>>,
425 wave_length: u32,
426 leader_offset: u32,
427 round_offset: u32,
428 }
429
430 impl BaseCommitterBuilder {
431 pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
432 Self {
433 context,
434 dag_state,
435 wave_length: DEFAULT_WAVE_LENGTH,
436 leader_offset: 0,
437 round_offset: 0,
438 }
439 }
440
441 #[allow(unused)]
442 pub(crate) fn with_wave_length(mut self, wave_length: u32) -> Self {
443 self.wave_length = wave_length;
444 self
445 }
446
447 #[allow(unused)]
448 pub(crate) fn with_leader_offset(mut self, leader_offset: u32) -> Self {
449 self.leader_offset = leader_offset;
450 self
451 }
452
453 #[allow(unused)]
454 pub(crate) fn with_round_offset(mut self, round_offset: u32) -> Self {
455 self.round_offset = round_offset;
456 self
457 }
458
459 pub(crate) fn build(self) -> BaseCommitter {
460 let options = BaseCommitterOptions {
461 wave_length: DEFAULT_WAVE_LENGTH,
462 leader_offset: 0,
463 round_offset: 0,
464 };
465 BaseCommitter::new(
466 self.context.clone(),
467 Arc::new(LeaderSchedule::new(
468 self.context,
469 LeaderSwapTable::default(),
470 )),
471 self.dag_state,
472 options,
473 )
474 }
475 }
476}