consensus_core/
base_committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashMap, fmt::Display, sync::Arc};
5
6use consensus_config::{AuthorityIndex, Stake};
7use consensus_types::block::{BlockRef, Round};
8use parking_lot::RwLock;
9use tracing::warn;
10
11use crate::{
12    block::{BlockAPI, Slot, VerifiedBlock},
13    commit::{DEFAULT_WAVE_LENGTH, LeaderStatus, WaveNumber},
14    context::Context,
15    dag_state::DagState,
16    leader_schedule::LeaderSchedule,
17    stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[cfg(test)]
21#[path = "tests/base_committer_tests.rs"]
22mod base_committer_tests;
23
24#[cfg(test)]
25#[path = "tests/base_committer_declarative_tests.rs"]
26mod base_committer_declarative_tests;
27
28pub(crate) struct BaseCommitterOptions {
29    /// TODO: Re-evaluate if we want this to be configurable after running experiments.
30    /// The length of a wave (minimum 3)
31    pub wave_length: u32,
32    /// The offset used in the leader-election protocol. This is used by the
33    /// multi-committer to ensure that each [`BaseCommitter`] instance elects
34    /// a different leader.
35    pub leader_offset: u32,
36    /// The offset of the first wave. This is used by the pipelined committer to
37    /// ensure that each[`BaseCommitter`] instances operates on a different
38    /// view of the dag.
39    pub round_offset: u32,
40}
41
42impl Default for BaseCommitterOptions {
43    fn default() -> Self {
44        Self {
45            wave_length: DEFAULT_WAVE_LENGTH,
46            leader_offset: 0,
47            round_offset: 0,
48        }
49    }
50}
51
52/// The [`BaseCommitter`] contains the bare bone commit logic. Once instantiated,
53/// the method `try_direct_decide` and `try_indirect_decide` can be called at any
54/// time and any number of times (it is idempotent) to determine whether a leader
55/// can be committed or skipped.
56pub(crate) struct BaseCommitter {
57    /// The per-epoch configuration of this authority.
58    context: Arc<Context>,
59    /// The consensus leader schedule to be used to resolve the leader for a
60    /// given round.
61    leader_schedule: Arc<LeaderSchedule>,
62    /// In memory block store representing the dag state
63    dag_state: Arc<RwLock<DagState>>,
64    /// The options used by this committer
65    options: BaseCommitterOptions,
66}
67
68impl BaseCommitter {
69    pub fn new(
70        context: Arc<Context>,
71        leader_schedule: Arc<LeaderSchedule>,
72        dag_state: Arc<RwLock<DagState>>,
73        options: BaseCommitterOptions,
74    ) -> Self {
75        Self {
76            context,
77            leader_schedule,
78            dag_state,
79            options,
80        }
81    }
82
83    /// Apply the direct decision rule to the specified leader to see whether we
84    /// can direct-commit or direct-skip it.
85    #[tracing::instrument(skip_all, fields(leader = %leader))]
86    pub fn try_direct_decide(&self, leader: Slot) -> LeaderStatus {
87        // Check whether the leader has enough blame. That is, whether there are 2f+1 non-votes
88        // for that leader (which ensure there will never be a certificate for that leader).
89        let voting_round = leader.round + 1;
90        if self.enough_leader_blame(voting_round, leader.authority) {
91            return LeaderStatus::Skip(leader);
92        }
93
94        // Check whether the leader(s) has enough support. That is, whether there are 2f+1
95        // certificates over the leader. Note that there could be more than one leader block
96        // (created by Byzantine leaders).
97        let wave = self.wave_number(leader.round);
98        let decision_round = self.decision_round(wave);
99        let leader_blocks = self.dag_state.read().get_uncommitted_blocks_at_slot(leader);
100        let mut leaders_with_enough_support: Vec<_> = leader_blocks
101            .into_iter()
102            .filter(|l| self.enough_leader_support(decision_round, l))
103            .map(LeaderStatus::Commit)
104            .collect();
105
106        // There can be at most one leader with enough support for each round, otherwise it means
107        // the BFT assumption is broken.
108        if leaders_with_enough_support.len() > 1 {
109            panic!(
110                "[{self}] More than one candidate for {leader}: {leaders_with_enough_support:?}"
111            );
112        }
113
114        leaders_with_enough_support
115            .pop()
116            .unwrap_or(LeaderStatus::Undecided(leader))
117    }
118
119    /// Apply the indirect decision rule to the specified leader to see whether
120    /// we can indirect-commit or indirect-skip it.
121    #[tracing::instrument(skip_all, fields(leader = %leader_slot))]
122    pub fn try_indirect_decide<'a>(
123        &self,
124        leader_slot: Slot,
125        leaders: impl Iterator<Item = &'a LeaderStatus>,
126    ) -> LeaderStatus {
127        // The anchor is the first committed leader with round higher than the decision round of the
128        // target leader. We must stop the iteration upon encountering an undecided leader.
129        let anchors = leaders.filter(|x| leader_slot.round + self.options.wave_length <= x.round());
130
131        for anchor in anchors {
132            tracing::trace!(
133                "[{self}] Trying to indirect-decide {leader_slot} using anchor {anchor}",
134            );
135            match anchor {
136                LeaderStatus::Commit(anchor) => {
137                    return self.decide_leader_from_anchor(anchor, leader_slot);
138                }
139                LeaderStatus::Skip(..) => (),
140                LeaderStatus::Undecided(..) => break,
141            }
142        }
143
144        LeaderStatus::Undecided(leader_slot)
145    }
146
147    pub fn elect_leader(&self, round: Round) -> Option<Slot> {
148        let wave = self.wave_number(round);
149        tracing::trace!(
150            "elect_leader: round={}, wave={}, leader_round={}, leader_offset={}",
151            round,
152            wave,
153            self.leader_round(wave),
154            self.options.leader_offset
155        );
156        if self.leader_round(wave) != round {
157            return None;
158        }
159
160        Some(Slot::new(
161            round,
162            self.leader_schedule
163                .elect_leader(round, self.options.leader_offset),
164        ))
165    }
166
167    /// Return the leader round of the specified wave. The leader round is always
168    /// the first round of the wave. This takes into account round offset for when
169    /// pipelining is enabled.
170    pub(crate) fn leader_round(&self, wave: WaveNumber) -> Round {
171        (wave * self.options.wave_length) + self.options.round_offset
172    }
173
174    /// Return the decision round of the specified wave. The decision round is
175    /// always the last round of the wave. This takes into account round offset
176    /// for when pipelining is enabled.
177    pub(crate) fn decision_round(&self, wave: WaveNumber) -> Round {
178        let wave_length = self.options.wave_length;
179        (wave * wave_length) + wave_length - 1 + self.options.round_offset
180    }
181
182    /// Return the wave in which the specified round belongs. This takes into
183    /// account the round offset for when pipelining is enabled.
184    pub(crate) fn wave_number(&self, round: Round) -> WaveNumber {
185        round.saturating_sub(self.options.round_offset) / self.options.wave_length
186    }
187
188    /// Find which block is supported at a slot (author, round) by the given block.
189    /// Blocks can indirectly reference multiple other blocks at a slot, but only
190    /// one block at a slot will be supported by the given block. If block A supports B
191    /// at a slot, it is guaranteed that any processed block by the same author that
192    /// directly or indirectly includes A will also support B at that slot.
193    fn find_supported_block(&self, leader_slot: Slot, from: &VerifiedBlock) -> Option<BlockRef> {
194        if from.round() < leader_slot.round {
195            return None;
196        }
197        for ancestor in from.ancestors() {
198            if Slot::from(*ancestor) == leader_slot {
199                return Some(*ancestor);
200            }
201            // Weak links may point to blocks with lower round numbers than strong links.
202            if ancestor.round <= leader_slot.round {
203                continue;
204            }
205            let ancestor = self
206                .dag_state
207                .read()
208                .get_block(ancestor)
209                .unwrap_or_else(|| panic!("Block not found in storage: {:?}", ancestor));
210            if let Some(support) = self.find_supported_block(leader_slot, &ancestor) {
211                return Some(support);
212            }
213        }
214        None
215    }
216
217    /// Check whether the specified block (`potential_vote`) is a vote for
218    /// the specified leader (`leader_block`).
219    fn is_vote(&self, potential_vote: &VerifiedBlock, leader_block: &VerifiedBlock) -> bool {
220        let reference = leader_block.reference();
221        let leader_slot = Slot::from(reference);
222        self.find_supported_block(leader_slot, potential_vote) == Some(reference)
223    }
224
225    /// Check whether the specified block (`potential_certificate`) is a certificate
226    /// for the specified leader (`leader_block`). An `all_votes` map can be
227    /// provided as a cache to quickly skip checking against the block store on
228    /// whether a reference is a vote. This is done for efficiency. Bear in mind
229    /// that the `all_votes` should refer to votes considered to the same `leader_block`
230    /// and it can't be reused for different leaders.
231    fn is_certificate(
232        &self,
233        potential_certificate: &VerifiedBlock,
234        leader_block: &VerifiedBlock,
235        all_votes: &mut HashMap<BlockRef, bool>,
236    ) -> bool {
237        let gc_round = self.dag_state.read().gc_round();
238
239        let mut votes_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
240        for reference in potential_certificate.ancestors() {
241            let is_vote = if let Some(is_vote) = all_votes.get(reference) {
242                *is_vote
243            } else {
244                let potential_vote = self.dag_state.read().get_block(reference);
245
246                let is_vote = {
247                    if let Some(potential_vote) = potential_vote {
248                        self.is_vote(&potential_vote, leader_block)
249                    } else {
250                        assert!(
251                            reference.round <= gc_round,
252                            "Block not found in storage: {:?} , and is not below gc_round: {gc_round}",
253                            reference
254                        );
255                        false
256                    }
257                };
258
259                all_votes.insert(*reference, is_vote);
260                is_vote
261            };
262
263            if is_vote {
264                tracing::trace!("[{self}] {reference} is a vote for {leader_block}");
265                if votes_stake_aggregator.add(reference.author, &self.context.committee) {
266                    tracing::trace!(
267                        "[{self}] {potential_certificate} is a certificate for leader {leader_block}"
268                    );
269                    return true;
270                }
271            } else {
272                tracing::trace!("[{self}] {reference} is not a vote for {leader_block}",);
273            }
274        }
275        tracing::trace!(
276            "[{self}] {potential_certificate} is not a certificate for leader {leader_block}"
277        );
278        false
279    }
280
281    /// Decide the status of a target leader from the specified anchor. We commit
282    /// the target leader if it has a certified link to the anchor. Otherwise, we
283    /// skip the target leader.
284    fn decide_leader_from_anchor(&self, anchor: &VerifiedBlock, leader_slot: Slot) -> LeaderStatus {
285        // Get the block(s) proposed by the leader. There could be more than one leader block
286        // in the slot from a Byzantine authority.
287        let leader_blocks = self
288            .dag_state
289            .read()
290            .get_uncommitted_blocks_at_slot(leader_slot);
291
292        // TODO: Re-evaluate this check once we have a better way to handle/track byzantine authorities.
293        if leader_blocks.len() > 1 {
294            tracing::warn!(
295                "Multiple blocks found for leader slot {leader_slot}: {:?}",
296                leader_blocks
297            );
298        }
299
300        // Get all blocks that could be potential certificates for the target leader. These blocks
301        // are in the decision round of the target leader and are linked to the anchor.
302        let wave = self.wave_number(leader_slot.round);
303        let decision_round = self.decision_round(wave);
304        let potential_certificates = self
305            .dag_state
306            .read()
307            .ancestors_at_round(anchor, decision_round);
308
309        // Use those potential certificates to determine which (if any) of the target leader
310        // blocks can be committed.
311        let mut certified_leader_blocks: Vec<_> = leader_blocks
312            .into_iter()
313            .filter(|leader_block| {
314                let mut all_votes = HashMap::new();
315                potential_certificates.iter().any(|potential_certificate| {
316                    self.is_certificate(potential_certificate, leader_block, &mut all_votes)
317                })
318            })
319            .collect();
320
321        // There can be at most one certified leader, otherwise it means the BFT assumption is broken.
322        if certified_leader_blocks.len() > 1 {
323            panic!(
324                "More than one certified leader at wave {wave} in {leader_slot}: {certified_leader_blocks:?}"
325            );
326        }
327
328        // We commit the target leader if it has a certificate that is an ancestor of the anchor.
329        // Otherwise skip it.
330        match certified_leader_blocks.pop() {
331            Some(certified_leader_block) => LeaderStatus::Commit(certified_leader_block),
332            None => LeaderStatus::Skip(leader_slot),
333        }
334    }
335
336    /// Check whether the specified leader has 2f+1 non-votes (blames) to be directly skipped.
337    fn enough_leader_blame(&self, voting_round: Round, leader: AuthorityIndex) -> bool {
338        let voting_blocks = self
339            .dag_state
340            .read()
341            .get_uncommitted_blocks_at_round(voting_round);
342
343        let mut blame_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
344        for voting_block in &voting_blocks {
345            let voter = voting_block.reference().author;
346            if voting_block
347                .ancestors()
348                .iter()
349                .all(|ancestor| ancestor.author != leader)
350            {
351                tracing::trace!(
352                    "[{self}] {voting_block} is a blame for leader {}",
353                    Slot::new(voting_round - 1, leader)
354                );
355                if blame_stake_aggregator.add(voter, &self.context.committee) {
356                    return true;
357                }
358            } else {
359                tracing::trace!(
360                    "[{self}] {voting_block} is not a blame for leader {}",
361                    Slot::new(voting_round - 1, leader)
362                );
363            }
364        }
365        false
366    }
367
368    /// Check whether the specified leader has 2f+1 certificates to be directly
369    /// committed.
370    fn enough_leader_support(&self, decision_round: Round, leader_block: &VerifiedBlock) -> bool {
371        let decision_blocks = self
372            .dag_state
373            .read()
374            .get_uncommitted_blocks_at_round(decision_round);
375
376        // Quickly reject if there isn't enough stake to support the leader from
377        // the potential certificates.
378        let total_stake: Stake = decision_blocks
379            .iter()
380            .map(|b| self.context.committee.stake(b.author()))
381            .sum();
382        if !self.context.committee.reached_quorum(total_stake) {
383            tracing::trace!(
384                "Not enough support for {leader_block}. Stake not enough: {total_stake} < {}",
385                self.context.committee.quorum_threshold()
386            );
387            return false;
388        }
389
390        let mut certificate_stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
391        let mut all_votes = HashMap::new();
392        for decision_block in &decision_blocks {
393            let authority = decision_block.reference().author;
394            if self.is_certificate(decision_block, leader_block, &mut all_votes)
395                && certificate_stake_aggregator.add(authority, &self.context.committee)
396            {
397                return true;
398            }
399        }
400        false
401    }
402}
403
404impl Display for BaseCommitter {
405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406        write!(
407            f,
408            "Committer-L{}-R{}",
409            self.options.leader_offset, self.options.round_offset
410        )
411    }
412}
413
414/// A builder for the base committer. By default, the builder creates a base committer
415/// that has no leader or round offset. Which indicates single leader & pipelining
416/// disabled.
417#[cfg(test)]
418mod base_committer_builder {
419    use super::*;
420    use crate::leader_schedule::LeaderSwapTable;
421
422    pub(crate) struct BaseCommitterBuilder {
423        context: Arc<Context>,
424        dag_state: Arc<RwLock<DagState>>,
425        wave_length: u32,
426        leader_offset: u32,
427        round_offset: u32,
428    }
429
430    impl BaseCommitterBuilder {
431        pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
432            Self {
433                context,
434                dag_state,
435                wave_length: DEFAULT_WAVE_LENGTH,
436                leader_offset: 0,
437                round_offset: 0,
438            }
439        }
440
441        #[allow(unused)]
442        pub(crate) fn with_wave_length(mut self, wave_length: u32) -> Self {
443            self.wave_length = wave_length;
444            self
445        }
446
447        #[allow(unused)]
448        pub(crate) fn with_leader_offset(mut self, leader_offset: u32) -> Self {
449            self.leader_offset = leader_offset;
450            self
451        }
452
453        #[allow(unused)]
454        pub(crate) fn with_round_offset(mut self, round_offset: u32) -> Self {
455            self.round_offset = round_offset;
456            self
457        }
458
459        pub(crate) fn build(self) -> BaseCommitter {
460            let options = BaseCommitterOptions {
461                wave_length: DEFAULT_WAVE_LENGTH,
462                leader_offset: 0,
463                round_offset: 0,
464            };
465            BaseCommitter::new(
466                self.context.clone(),
467                Arc::new(LeaderSchedule::new(
468                    self.context,
469                    LeaderSwapTable::default(),
470                )),
471                self.dag_state,
472                options,
473            )
474        }
475    }
476}