consensus_core/
ancestor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_config::{AuthorityIndex, Stake};
7use parking_lot::RwLock;
8use tracing::{debug, info};
9
10use crate::{
11    context::Context, dag_state::DagState, leader_scoring::ReputationScores,
12    round_tracker::QuorumRound,
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub(crate) enum AncestorState {
17    Include,
18    // Exclusion score is the value stored in this state
19    Exclude(u64),
20}
21
22#[derive(Clone)]
23struct AncestorInfo {
24    state: AncestorState,
25    // This will be set to the future clock round for which this ancestor state
26    // will be locked.
27    lock_until_round: u32,
28}
29
30impl AncestorInfo {
31    fn new() -> Self {
32        Self {
33            state: AncestorState::Include,
34            lock_until_round: 0,
35        }
36    }
37
38    fn is_locked(&self, current_clock_round: u32) -> bool {
39        self.lock_until_round >= current_clock_round
40    }
41
42    fn set_lock(&mut self, lock_until_round: u32) {
43        self.lock_until_round = lock_until_round;
44    }
45}
46
47#[derive(Debug)]
48struct StateTransition {
49    authority_id: AuthorityIndex,
50    // The authority propagation score taken from leader scoring.
51    score: u64,
52    // The stake of the authority that is transitioning state.
53    stake: u64,
54    // The authority high quorum round is the lowest round higher or equal to rounds
55    // from a quorum of authorities
56    high_quorum_round: u32,
57}
58
59pub(crate) struct AncestorStateManager {
60    context: Arc<Context>,
61    dag_state: Arc<RwLock<DagState>>,
62    state_map: Vec<AncestorInfo>,
63    excluded_nodes_stake_threshold: u64,
64    // This is the running total of ancestors by stake that have been marked
65    // as excluded. This cannot exceed the excluded_nodes_stake_threshold
66    total_excluded_stake: Stake,
67    // This is the reputation scores that we use for leader election but we are
68    // using it here as a signal for high quality block propagation as well.
69    pub(crate) propagation_scores: ReputationScores,
70}
71
72impl AncestorStateManager {
73    // This value is based on the production round rates of between 10-15 rounds per second
74    // which means we will be locking state between 30-45 seconds.
75    #[cfg(not(test))]
76    const STATE_LOCK_CLOCK_ROUNDS: u32 = 450;
77    #[cfg(test)]
78    const STATE_LOCK_CLOCK_ROUNDS: u32 = 5;
79
80    // Exclusion threshold is based on propagation (reputation) scores
81    const SCORE_EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 20;
82
83    pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
84        let state_map = vec![AncestorInfo::new(); context.committee.size()];
85
86        // Note: this value cannot be greater than the threshold used in leader
87        // schedule to identify bad nodes.
88        let excluded_nodes_stake_threshold_percentage = 2 * context
89            .protocol_config
90            .consensus_bad_nodes_stake_threshold()
91            / 3;
92
93        let excluded_nodes_stake_threshold = (excluded_nodes_stake_threshold_percentage
94            * context.committee.total_stake())
95            / 100 as Stake;
96
97        Self {
98            context,
99            dag_state,
100            state_map,
101            excluded_nodes_stake_threshold,
102            // All ancestors start in the include state.
103            total_excluded_stake: 0,
104            propagation_scores: ReputationScores::default(),
105        }
106    }
107
108    pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
109        self.propagation_scores = scores;
110    }
111
112    pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
113        self.state_map.iter().map(|info| info.state).collect()
114    }
115
116    /// Updates the state of all ancestors based on the latest scores and quorum rounds
117    pub(crate) fn update_all_ancestors_state(&mut self, accepted_quorum_rounds: &[QuorumRound]) {
118        // If round prober has not run yet and we don't have network quorum round,
119        // it is okay because network_high_quorum_round will be zero and we will
120        // include all ancestors until we get more information.
121        let network_high_quorum_round =
122            self.calculate_network_high_quorum_round(accepted_quorum_rounds);
123
124        let current_clock_round = self.dag_state.read().threshold_clock_round();
125        let low_score_threshold = (self.propagation_scores.highest_score()
126            * Self::SCORE_EXCLUSION_THRESHOLD_PERCENTAGE)
127            / 100;
128
129        debug!(
130            "Updating all ancestor state at round {current_clock_round} using network high quorum round of {network_high_quorum_round}, low score threshold of {low_score_threshold}, and exclude stake threshold of {}",
131            self.excluded_nodes_stake_threshold
132        );
133
134        // We will first collect all potential state transitions as we need to ensure
135        // we do not move more ancestors to EXCLUDE state than the excluded_nodes_stake_threshold
136        // allows
137        let mut exclude_to_include = Vec::new();
138        let mut include_to_exclude = Vec::new();
139
140        // If propagation scores are not ready because the first 300 commits have not
141        // happened, this is okay as we will only start excluding ancestors after that
142        // point in time.
143        for (idx, score) in self
144            .propagation_scores
145            .scores_per_authority
146            .iter()
147            .enumerate()
148        {
149            let authority_id = self
150                .context
151                .committee
152                .to_authority_index(idx)
153                .expect("Index should be valid");
154            let ancestor_info = &self.state_map[idx];
155            let (_low, authority_high_quorum_round) = accepted_quorum_rounds[idx];
156            let stake = self.context.committee.authority(authority_id).stake;
157
158            // Skip if locked
159            if ancestor_info.is_locked(current_clock_round) {
160                continue;
161            }
162
163            match ancestor_info.state {
164                AncestorState::Include => {
165                    if *score <= low_score_threshold {
166                        include_to_exclude.push(StateTransition {
167                            authority_id,
168                            score: *score,
169                            stake,
170                            high_quorum_round: authority_high_quorum_round,
171                        });
172                    }
173                }
174                AncestorState::Exclude(_) => {
175                    if *score > low_score_threshold
176                        || authority_high_quorum_round >= network_high_quorum_round
177                    {
178                        exclude_to_include.push(StateTransition {
179                            authority_id,
180                            score: *score,
181                            stake,
182                            high_quorum_round: authority_high_quorum_round,
183                        });
184                    }
185                }
186            }
187        }
188
189        // We can apply the state change for all ancestors that are moving to the
190        // include state as that will never cause us to exceed the excluded_nodes_stake_threshold
191        for transition in exclude_to_include {
192            self.apply_state_change(transition, AncestorState::Include, current_clock_round);
193        }
194
195        // Sort include_to_exclude by worst scores first as these should take priority
196        // to be excluded if we can't exclude them all due to the excluded_nodes_stake_threshold
197        include_to_exclude.sort_by_key(|t| t.score);
198
199        // We can now apply state change for all ancestors that are moving to the exclude
200        // state as we know there is no new stake that will be freed up by ancestor
201        // state transition to include.
202        for transition in include_to_exclude {
203            // If the stake of this ancestor would cause us to exceed the threshold
204            // we do nothing. The lock will continue to be unlocked meaning we can
205            // try again immediately on the next call to update_all_ancestors_state
206            if self.total_excluded_stake + transition.stake <= self.excluded_nodes_stake_threshold {
207                let new_state = AncestorState::Exclude(transition.score);
208                self.apply_state_change(transition, new_state, current_clock_round);
209            } else {
210                info!(
211                    "Authority {} would have moved to {:?} state with score {} & quorum_round {} but we would have exceeded total excluded stake threshold. current_excluded_stake {} + authority_stake {} > exclude_stake_threshold {}",
212                    transition.authority_id,
213                    AncestorState::Exclude(transition.score),
214                    transition.score,
215                    transition.high_quorum_round,
216                    self.total_excluded_stake,
217                    transition.stake,
218                    self.excluded_nodes_stake_threshold
219                );
220            }
221        }
222    }
223
224    fn apply_state_change(
225        &mut self,
226        transition: StateTransition,
227        new_state: AncestorState,
228        current_clock_round: u32,
229    ) {
230        let block_hostname = &self
231            .context
232            .committee
233            .authority(transition.authority_id)
234            .hostname;
235        let ancestor_info = &mut self.state_map[transition.authority_id.value()];
236
237        match (ancestor_info.state, new_state) {
238            (AncestorState::Exclude(_), AncestorState::Include) => {
239                self.total_excluded_stake = self.total_excluded_stake
240                    .checked_sub(transition.stake)
241                    .expect("total_excluded_stake underflow - trying to subtract more stake than we're tracking as excluded");
242            }
243            (AncestorState::Include, AncestorState::Exclude(_)) => {
244                self.total_excluded_stake += transition.stake;
245            }
246            _ => {
247                panic!("Calls to this function should only be made for state transition.")
248            }
249        }
250
251        ancestor_info.state = new_state;
252        let lock_until_round = current_clock_round + Self::STATE_LOCK_CLOCK_ROUNDS;
253        ancestor_info.set_lock(lock_until_round);
254
255        info!(
256            "Authority {} moved to {new_state:?} state with score {} & quorum_round {} and locked until round {lock_until_round}. Total excluded stake: {}",
257            transition.authority_id,
258            transition.score,
259            transition.high_quorum_round,
260            self.total_excluded_stake
261        );
262
263        self.context
264            .metrics
265            .node_metrics
266            .ancestor_state_change_by_authority
267            .with_label_values(&[
268                block_hostname,
269                match new_state {
270                    AncestorState::Include => "include",
271                    AncestorState::Exclude(_) => "exclude",
272                },
273            ])
274            .inc();
275    }
276
277    /// Calculate the network's high quorum round based on accepted rounds via
278    /// RoundTracker.
279    ///
280    /// The authority high quorum round is the lowest round higher or equal to rounds  
281    /// from a quorum of authorities. The network high quorum round is using the high
282    /// quorum round of each authority as tracked by the [`RoundTracker`] and then
283    /// finding the high quroum round of those high quorum rounds.
284    fn calculate_network_high_quorum_round(&self, accepted_quorum_rounds: &[QuorumRound]) -> u32 {
285        let committee = &self.context.committee;
286
287        let mut high_quorum_rounds_with_stake = accepted_quorum_rounds
288            .iter()
289            .zip(committee.authorities())
290            .map(|((_low, high), (_, authority))| (*high, authority.stake))
291            .collect::<Vec<_>>();
292        high_quorum_rounds_with_stake.sort();
293
294        let mut total_stake = 0;
295        let mut network_high_quorum_round = 0;
296
297        for (round, stake) in high_quorum_rounds_with_stake.iter() {
298            total_stake += stake;
299            if total_stake >= self.context.committee.quorum_threshold() {
300                network_high_quorum_round = *round;
301                break;
302            }
303        }
304
305        network_high_quorum_round
306    }
307}
308
309#[cfg(test)]
310mod test {
311    use super::*;
312    use crate::{
313        leader_scoring::ReputationScores, storage::mem_store::MemStore,
314        test_dag_builder::DagBuilder,
315    };
316
317    #[tokio::test]
318    async fn test_calculate_network_high_accepted_quorum_round() {
319        telemetry_subscribers::init_for_testing();
320
321        let (context, _key_pairs) = Context::new_for_test(4);
322        let context = Arc::new(context);
323        let store = Arc::new(MemStore::new());
324        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
325
326        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
327        let mut ancestor_state_manager =
328            AncestorStateManager::new(context.clone(), dag_state.clone());
329        ancestor_state_manager.set_propagation_scores(scores);
330
331        // Quorum rounds are not set yet, so we should calculate a network
332        // quorum round of 0 to start.
333        let network_high_quorum_round =
334            ancestor_state_manager.calculate_network_high_quorum_round(&[]);
335        assert_eq!(network_high_quorum_round, 0);
336
337        let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
338
339        let network_high_quorum_round =
340            ancestor_state_manager.calculate_network_high_quorum_round(&accepted_quorum_rounds);
341        assert_eq!(network_high_quorum_round, 229);
342    }
343
344    // Test all state transitions with probe_accepted_rounds = true
345    // Default all INCLUDE -> EXCLUDE
346    // EXCLUDE -> INCLUDE (Blocked due to lock)
347    // EXCLUDE -> INCLUDE (Pass due to lock expired)
348    // INCLUDE -> EXCLUDE (Blocked due to lock)
349    // INCLUDE -> EXCLUDE (Pass due to lock expired)
350    #[tokio::test]
351    async fn test_update_all_ancestor_state_using_accepted_rounds() {
352        telemetry_subscribers::init_for_testing();
353        let (mut context, _key_pairs) = Context::new_for_test(5);
354        context
355            .protocol_config
356            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
357        let context = Arc::new(context);
358        let store = Arc::new(MemStore::new());
359        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
360        let mut dag_builder = DagBuilder::new(context.clone());
361
362        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3, 4]);
363        let mut ancestor_state_manager = AncestorStateManager::new(context, dag_state.clone());
364        ancestor_state_manager.set_propagation_scores(scores);
365
366        let accepted_quorum_rounds =
367            vec![(225, 229), (225, 229), (229, 300), (229, 300), (229, 300)];
368        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
369
370        // Score threshold for exclude is (4 * 10) / 100 = 0
371        // No ancestors should be excluded in with this threshold
372        let state_map = ancestor_state_manager.get_ancestor_states();
373        for state in state_map.iter() {
374            assert_eq!(*state, AncestorState::Include);
375        }
376
377        let scores = ReputationScores::new((1..=300).into(), vec![10, 9, 100, 100, 100]);
378        ancestor_state_manager.set_propagation_scores(scores);
379        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
380
381        // Score threshold for exclude is (100 * 10) / 100 = 10
382        // Authority 1 with the lowest score will move to the EXCLUDE state
383        // Authority 0 with the next lowest score is eligible to move to the EXCLUDE
384        // state based on the score threshold but it would exceed the total excluded
385        // stake threshold so it remains in the INCLUDE state.
386        let state_map = ancestor_state_manager.get_ancestor_states();
387        for (authority, state) in state_map.iter().enumerate() {
388            if authority == 1 {
389                assert_eq!(*state, AncestorState::Exclude(9));
390            } else {
391                assert_eq!(*state, AncestorState::Include);
392            }
393        }
394
395        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
396
397        // 1 authorities should still be excluded with these scores and no new
398        // clock round updates have happened to expire the locks.
399        let state_map = ancestor_state_manager.get_ancestor_states();
400        for (authority, state) in state_map.iter().enumerate() {
401            if authority == 1 {
402                assert_eq!(*state, AncestorState::Exclude(9));
403            } else {
404                assert_eq!(*state, AncestorState::Include);
405            }
406        }
407
408        // Updating the clock round will expire the lock as we only need 5
409        // clock round updates for tests.
410        dag_builder.layers(1..=6).build();
411        let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
412        dag_state.write().accept_blocks(blocks);
413
414        let accepted_quorum_rounds =
415            vec![(225, 229), (229, 300), (229, 300), (229, 300), (229, 300)];
416        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
417
418        // Authority 1 should now be included again because high quorum round is
419        // at the network high quorum round of 300. Authority 0 will now be moved
420        // to EXCLUDE state as its score is low.
421        let state_map = ancestor_state_manager.get_ancestor_states();
422        for (authority, state) in state_map.iter().enumerate() {
423            if authority == 0 {
424                assert_eq!(*state, AncestorState::Exclude(10));
425            } else {
426                assert_eq!(*state, AncestorState::Include);
427            }
428        }
429
430        let accepted_quorum_rounds =
431            vec![(229, 300), (229, 300), (229, 300), (229, 300), (229, 300)];
432        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
433
434        // Ancestor 0 is still locked in the EXCLUDE state until there is more
435        // clock round updates which is why even though the quorum rounds are
436        // high enough it has not moved to the INCLUDE state.
437        let state_map = ancestor_state_manager.get_ancestor_states();
438
439        for (authority, state) in state_map.iter().enumerate() {
440            if authority == 0 {
441                assert_eq!(*state, AncestorState::Exclude(10));
442            } else {
443                assert_eq!(*state, AncestorState::Include);
444            }
445        }
446
447        // Updating the clock round will expire the lock as we only need 5 updates for tests.
448        dag_builder.layers(7..=12).build();
449        let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
450        dag_state.write().accept_blocks(blocks);
451
452        let scores = ReputationScores::new((1..=300).into(), vec![10, 100, 100, 100, 100]);
453        ancestor_state_manager.set_propagation_scores(scores);
454        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
455
456        // Ancestor 0 can transition to INCLUDE state now that the lock expired
457        // and its quorum round is above the threshold.
458        let state_map = ancestor_state_manager.get_ancestor_states();
459        for state in state_map.iter() {
460            assert_eq!(*state, AncestorState::Include);
461        }
462    }
463}