consensus_core/
universal_committer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::VecDeque, sync::Arc};

use consensus_config::AuthorityIndex;
use parking_lot::RwLock;

use crate::{
    base_committer::BaseCommitter,
    block::{Round, Slot, GENESIS_ROUND},
    commit::{DecidedLeader, Decision},
    context::Context,
    dag_state::DagState,
};

#[cfg(test)]
#[path = "tests/universal_committer_tests.rs"]
mod universal_committer_tests;

#[cfg(test)]
#[path = "tests/pipelined_committer_tests.rs"]
mod pipelined_committer_tests;

/// A universal committer uses a collection of committers to commit a sequence of leaders.
/// It can be configured to use a combination of different commit strategies, including
/// multi-leaders, backup leaders, and pipelines.
pub(crate) struct UniversalCommitter {
    /// The per-epoch configuration of this authority.
    context: Arc<Context>,
    /// In memory block store representing the dag state
    dag_state: Arc<RwLock<DagState>>,
    /// The list of committers for multi-leader or pipelining
    committers: Vec<BaseCommitter>,
}

impl UniversalCommitter {
    /// Try to decide part of the dag. This function is idempotent and returns an ordered list of
    /// decided leaders.
    #[tracing::instrument(skip_all, fields(last_decided = %last_decided))]
    pub(crate) fn try_decide(&self, last_decided: Slot) -> Vec<DecidedLeader> {
        let highest_accepted_round = self.dag_state.read().highest_accepted_round();

        // Try to decide as many leaders as possible, starting with the highest round.
        let mut leaders = VecDeque::new();

        let last_round = match self
            .context
            .protocol_config
            .mysticeti_num_leaders_per_round()
        {
            Some(1) => {
                // Ensure that we don't commit any leaders from the same round as last_decided
                // until we have full support for multi-leader per round.
                // This can happen when we are on a leader schedule boundary and the leader
                // elected for the round changes with the new schedule.
                last_decided.round + 1
            }
            _ => last_decided.round,
        };

        // try to commit a leader up to the highest_accepted_round - 2. There is no
        // reason to try and iterate on higher rounds as in order to make a direct
        // decision for a leader at round R we need blocks from round R+2 to figure
        // out that enough certificates and support exist to commit a leader.
        'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
            for committer in self.committers.iter().rev() {
                // Skip committers that don't have a leader for this round.
                let Some(slot) = committer.elect_leader(round) else {
                    tracing::debug!("No leader for round {round}, skipping");
                    continue;
                };

                // now that we reached the last committed leader we can stop the commit rule
                if slot == last_decided {
                    tracing::debug!("Reached last committed {slot}, now exit");
                    break 'outer;
                }

                tracing::debug!("Trying to decide {slot} with {committer}",);

                // Try to directly decide the leader.
                let mut status = committer.try_direct_decide(slot);
                tracing::debug!("Outcome of direct rule: {status}");

                // If we can't directly decide the leader, try to indirectly decide it.
                if status.is_decided() {
                    leaders.push_front((status, Decision::Direct));
                } else {
                    status = committer.try_indirect_decide(slot, leaders.iter().map(|(x, _)| x));
                    tracing::debug!("Outcome of indirect rule: {status}");
                    leaders.push_front((status, Decision::Indirect));
                }
            }
        }

        // The decided sequence is the longest prefix of decided leaders.
        let mut decided_leaders = Vec::new();
        for (leader, decision) in leaders {
            if leader.round() == GENESIS_ROUND {
                continue;
            }
            let Some(decided_leader) = leader.into_decided_leader() else {
                break;
            };
            Self::update_metrics(&self.context, &decided_leader, decision);
            decided_leaders.push(decided_leader);
        }
        tracing::debug!("Decided {decided_leaders:?}");
        decided_leaders
    }

    /// Return list of leaders for the round.
    /// Can return empty vec if round does not have a designated leader.
    pub(crate) fn get_leaders(&self, round: Round) -> Vec<AuthorityIndex> {
        self.committers
            .iter()
            .filter_map(|committer| committer.elect_leader(round))
            .map(|l| l.authority)
            .collect()
    }

    /// Update metrics.
    pub(crate) fn update_metrics(
        context: &Context,
        decided_leader: &DecidedLeader,
        decision: Decision,
    ) {
        let decision_str = match decision {
            Decision::Direct => "direct",
            Decision::Indirect => "indirect",
            Decision::Certified => "certified",
        };
        let status = match decided_leader {
            DecidedLeader::Commit(..) => format!("{decision_str}-commit"),
            DecidedLeader::Skip(..) => format!("{decision_str}-skip"),
        };
        let leader_host = &context
            .committee
            .authority(decided_leader.slot().authority)
            .hostname;
        context
            .metrics
            .node_metrics
            .committed_leaders_total
            .with_label_values(&[leader_host, &status])
            .inc();
    }
}

/// A builder for a universal committer. By default, the builder creates a single
/// base committer, that is, a single leader and no pipeline.
pub(crate) mod universal_committer_builder {
    use super::*;
    use crate::{
        base_committer::BaseCommitterOptions, commit::DEFAULT_WAVE_LENGTH,
        leader_schedule::LeaderSchedule,
    };

    pub(crate) struct UniversalCommitterBuilder {
        context: Arc<Context>,
        leader_schedule: Arc<LeaderSchedule>,
        dag_state: Arc<RwLock<DagState>>,
        wave_length: Round,
        number_of_leaders: usize,
        pipeline: bool,
    }

    impl UniversalCommitterBuilder {
        pub(crate) fn new(
            context: Arc<Context>,
            leader_schedule: Arc<LeaderSchedule>,
            dag_state: Arc<RwLock<DagState>>,
        ) -> Self {
            Self {
                context,
                leader_schedule,
                dag_state,
                wave_length: DEFAULT_WAVE_LENGTH,
                number_of_leaders: 1,
                pipeline: false,
            }
        }

        #[allow(unused)]
        pub(crate) fn with_wave_length(mut self, wave_length: Round) -> Self {
            self.wave_length = wave_length;
            self
        }

        pub(crate) fn with_number_of_leaders(mut self, number_of_leaders: usize) -> Self {
            self.number_of_leaders = number_of_leaders;
            self
        }

        pub(crate) fn with_pipeline(mut self, pipeline: bool) -> Self {
            self.pipeline = pipeline;
            self
        }

        pub(crate) fn build(self) -> UniversalCommitter {
            let mut committers = Vec::new();
            let pipeline_stages = if self.pipeline { self.wave_length } else { 1 };
            for round_offset in 0..pipeline_stages {
                for leader_offset in 0..self.number_of_leaders {
                    let options = BaseCommitterOptions {
                        wave_length: self.wave_length,
                        round_offset,
                        leader_offset: leader_offset as Round,
                    };
                    let committer = BaseCommitter::new(
                        self.context.clone(),
                        self.leader_schedule.clone(),
                        self.dag_state.clone(),
                        options,
                    );
                    committers.push(committer);
                }
            }

            UniversalCommitter {
                context: self.context,
                dag_state: self.dag_state,
                committers,
            }
        }
    }
}