consensus_core/
universal_committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::VecDeque, sync::Arc};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use parking_lot::RwLock;
9
10use crate::{
11    base_committer::BaseCommitter,
12    block::{GENESIS_ROUND, Slot},
13    commit::{DecidedLeader, Decision},
14    context::Context,
15    dag_state::DagState,
16};
17
18#[cfg(test)]
19#[path = "tests/universal_committer_tests.rs"]
20mod universal_committer_tests;
21
22#[cfg(test)]
23#[path = "tests/pipelined_committer_tests.rs"]
24mod pipelined_committer_tests;
25
26/// A universal committer uses a collection of committers to commit a sequence of leaders.
27/// It can be configured to use a combination of different commit strategies, including
28/// multi-leaders, backup leaders, and pipelines.
29pub(crate) struct UniversalCommitter {
30    /// The per-epoch configuration of this authority.
31    context: Arc<Context>,
32    /// In memory block store representing the dag state
33    dag_state: Arc<RwLock<DagState>>,
34    /// The list of committers for multi-leader or pipelining
35    committers: Vec<BaseCommitter>,
36}
37
38impl UniversalCommitter {
39    /// Try to decide part of the dag. This function is idempotent and returns an ordered list of
40    /// decided leaders.
41    #[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
42    pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
43        let highest_accepted_round = self.dag_state.read().highest_accepted_round();
44
45        // Try to decide as many leaders as possible, starting with the highest round.
46        let mut leaders = VecDeque::new();
47
48        let last_round = match self.context.protocol_config.num_leaders_per_round() {
49            Some(1) => {
50                // Ensure that we don't commit any leaders from the same round as last_decided
51                // until we have full support for multi-leader per round.
52                // This can happen when we are on a leader schedule boundary and the leader
53                // elected for the round changes with the new schedule.
54                last_decided.round + 1
55            }
56            _ => last_decided.round,
57        };
58
59        // try to commit a leader up to the highest_accepted_round - 2. There is no
60        // reason to try and iterate on higher rounds as in order to make a direct
61        // decision for a leader at round R we need blocks from round R+2 to figure
62        // out that enough certificates and support exist to commit a leader.
63        'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
64            for committer in self.committers.iter().rev() {
65                // Skip committers that don't have a leader for this round.
66                let Some(slot) = committer.elect_leader(round) else {
67                    tracing::debug!("No leader for round {round}, skipping");
68                    continue;
69                };
70
71                // now that we reached the last committed leader we can stop the commit rule
72                if slot == last_decided {
73                    tracing::debug!("Reached last committed {slot}, now exit");
74                    break 'outer;
75                }
76
77                tracing::trace!("Trying to decide {slot} with {committer}",);
78
79                // Try to directly decide the leader.
80                let mut status = committer.try_direct_decide(slot);
81                tracing::debug!("Outcome of direct rule: {status} with {committer}");
82
83                // If we can't directly decide the leader, try to indirectly decide it.
84                if status.is_decided() {
85                    leaders.push_front((status, Decision::Direct));
86                } else {
87                    status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
88                    tracing::debug!("Outcome of indirect rule: {status} with {committer}");
89                    leaders.push_front((status, Decision::Indirect));
90                }
91            }
92        }
93
94        // The decided sequence is the longest prefix of decided leaders.
95        let mut decided_leaders = Vec::new();
96        for (leader, decision) in leaders {
97            if leader.round() == GENESIS_ROUND {
98                continue;
99            }
100            let Some(decided_leader) = leader.into_decided_leader(decision == Decision::Direct)
101            else {
102                break;
103            };
104            Self::update_metrics(&self.context, &decided_leader, decision);
105            decided_leaders.push(decided_leader);
106        }
107        if !decided_leaders.is_empty() {
108            tracing::debug!("Decided {decided_leaders:?}");
109        }
110        decided_leaders
111    }
112
113    /// Return list of leaders for the round.
114    /// Can return empty vec if round does not have a designated leader.
115    pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
116        self.committers
117            .iter()
118            .filter_map(|committer| committer.elect_leader(round))
119            .map(|l| l.authority)
120            .collect()
121    }
122
123    /// Update metrics.
124    pub(crate) fn update_metrics(
125        context: &Context,
126        decided_leader: &DecidedLeader,
127        decision: Decision,
128    ) {
129        let decision_str = match decision {
130            Decision::Direct => "direct",
131            Decision::Indirect => "indirect",
132            Decision::Certified => "certified",
133        };
134        let status = match decided_leader {
135            DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
136            DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
137        };
138        let leader_host = &context
139            .committee
140            .authority(decided_leader.slot().authority)
141            .hostname;
142        context
143            .metrics
144            .node_metrics
145            .committed_leaders_total
146            .with_label_values(&[leader_host, &status])
147            .inc();
148    }
149}
150
151/// A builder for a universal committer. By default, the builder creates a single
152/// base committer, that is, a single leader and no pipeline.
153pub(crate) mod universal_committer_builder {
154    use super::*;
155    use crate::{
156        base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
157        leader_schedule::LeaderSchedule,
158    };
159
160    pub(crate) struct UniversalCommitterBuilder {
161        context: Arc<Context>,
162        leader_schedule: Arc<LeaderSchedule>,
163        dag_state: Arc<RwLock<DagState>>,
164        wave_length: Round,
165        number_of_leaders: usize,
166        pipeline: bool,
167    }
168
169    impl UniversalCommitterBuilder {
170        pub(crate) fn new(
171            context: Arc<Context>,
172            leader_schedule: Arc<LeaderSchedule>,
173            dag_state: Arc<RwLock<DagState>>,
174        ) -> Self {
175            Self {
176                context,
177                leader_schedule,
178                dag_state,
179                wave_length: DEFAULT_WAVE_LENGTH,
180                number_of_leaders: 1,
181                pipeline: false,
182            }
183        }
184
185        pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
186            self.number_of_leaders = number_of_leaders;
187            self
188        }
189
190        pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
191            self.pipeline = pipeline;
192            self
193        }
194
195        pub(crate) fn build(self) -> UniversalCommitter {
196            let mut committers = Vec::new();
197            let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
198            for round_offset in 0..pipeline_stages {
199                for leader_offset in 0..self.number_of_leaders {
200                    let options = BaseCommitterOptions {
201                        wave_length: self.wave_length,
202                        round_offset,
203                        leader_offset: leader_offset as Round,
204                    };
205                    let committer = BaseCommitter::new(
206                        self.context.clone(),
207                        self.leader_schedule.clone(),
208                        self.dag_state.clone(),
209                        options,
210                    );
211                    committers.push(committer);
212                }
213            }
214
215            UniversalCommitter {
216                context: self.context,
217                dag_state: self.dag_state,
218                committers,
219            }
220        }
221    }
222}