consensus_core/
ancestor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_config::{AuthorityIndex, Stake};
7use parking_lot::RwLock;
8use tracing::{debug, info};
9
10use crate::{
11    context::Context, dag_state::DagState, leader_scoring::ReputationScores,
12    round_tracker::QuorumRound,
13};
14
15#[derive(Debug, Clone, Copy, Eq, PartialEq)]
16pub(crate) enum AncestorState {
17    Include,
18    // Exclusion score is the value stored in this state
19    Exclude(u64),
20}
21
22#[derive(Clone)]
23struct AncestorInfo {
24    state: AncestorState,
25    // This will be set to the future clock round for which this ancestor state
26    // will be locked.
27    lock_until_round: u32,
28}
29
30impl AncestorInfo {
31    fn new() -> Self {
32        Self {
33            state: AncestorState::Include,
34            lock_until_round: 0,
35        }
36    }
37
38    fn is_locked(&self, current_clock_round: u32) -> bool {
39        self.lock_until_round >= current_clock_round
40    }
41
42    fn set_lock(&mut self, lock_until_round: u32) {
43        self.lock_until_round = lock_until_round;
44    }
45}
46
47#[derive(Debug)]
48struct StateTransition {
49    authority_id: AuthorityIndex,
50    // The authority propagation score taken from leader scoring.
51    score: u64,
52    // The stake of the authority that is transitioning state.
53    stake: u64,
54    // The authority high quorum round is the lowest round higher or equal to rounds
55    // from a quorum of authorities
56    high_quorum_round: u32,
57}
58
59pub(crate) struct AncestorStateManager {
60    context: Arc<Context>,
61    dag_state: Arc<RwLock<DagState>>,
62    state_map: Vec<AncestorInfo>,
63    excluded_nodes_stake_threshold: u64,
64    // This is the running total of ancestors by stake that have been marked
65    // as excluded. This cannot exceed the excluded_nodes_stake_threshold
66    total_excluded_stake: Stake,
67    // This is the reputation scores that we use for leader election but we are
68    // using it here as a signal for high quality block propagation as well.
69    pub(crate) propagation_scores: ReputationScores,
70}
71
72impl AncestorStateManager {
73    // This value is based on the production round rates of between 10-15 rounds per second
74    // which means we will be locking state between 30-45 seconds.
75    #[cfg(not(test))]
76    const STATE_LOCK_CLOCK_ROUNDS: u32 = 450;
77    #[cfg(test)]
78    const STATE_LOCK_CLOCK_ROUNDS: u32 = 5;
79
80    // Exclusion threshold is based on propagation (reputation) scores
81    const SCORE_EXCLUSION_THRESHOLD_PERCENTAGE: u64 = 20;
82
83    pub(crate) fn new(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
84        let state_map = vec![AncestorInfo::new(); context.committee.size()];
85
86        // Note: this value cannot be greater than the threshold used in leader
87        // schedule to identify bad nodes.
88        let excluded_nodes_stake_threshold_percentage =
89            2 * context.protocol_config.bad_nodes_stake_threshold() / 3;
90
91        let excluded_nodes_stake_threshold = (excluded_nodes_stake_threshold_percentage
92            * context.committee.total_stake())
93            / 100 as Stake;
94
95        Self {
96            context,
97            dag_state,
98            state_map,
99            excluded_nodes_stake_threshold,
100            // All ancestors start in the include state.
101            total_excluded_stake: 0,
102            propagation_scores: ReputationScores::default(),
103        }
104    }
105
106    pub(crate) fn set_propagation_scores(&mut self, scores: ReputationScores) {
107        self.propagation_scores = scores;
108    }
109
110    pub(crate) fn get_ancestor_states(&self) -> Vec<AncestorState> {
111        self.state_map.iter().map(|info| info.state).collect()
112    }
113
114    /// Updates the state of all ancestors based on the latest scores and quorum rounds
115    pub(crate) fn update_all_ancestors_state(&mut self, accepted_quorum_rounds: &[QuorumRound]) {
116        // If round prober has not run yet and we don't have network quorum round,
117        // it is okay because network_high_quorum_round will be zero and we will
118        // include all ancestors until we get more information.
119        let network_high_quorum_round =
120            self.calculate_network_high_quorum_round(accepted_quorum_rounds);
121
122        let current_clock_round = self.dag_state.read().threshold_clock_round();
123        let low_score_threshold = (self.propagation_scores.highest_score()
124            * Self::SCORE_EXCLUSION_THRESHOLD_PERCENTAGE)
125            / 100;
126
127        debug!(
128            "Updating all ancestor state at round {current_clock_round} using network high quorum round of {network_high_quorum_round}, low score threshold of {low_score_threshold}, and exclude stake threshold of {}",
129            self.excluded_nodes_stake_threshold
130        );
131
132        // We will first collect all potential state transitions as we need to ensure
133        // we do not move more ancestors to EXCLUDE state than the excluded_nodes_stake_threshold
134        // allows
135        let mut exclude_to_include = Vec::new();
136        let mut include_to_exclude = Vec::new();
137
138        // If propagation scores are not ready because the first 300 commits have not
139        // happened, this is okay as we will only start excluding ancestors after that
140        // point in time.
141        for (idx, score) in self
142            .propagation_scores
143            .scores_per_authority
144            .iter()
145            .enumerate()
146        {
147            let authority_id = self
148                .context
149                .committee
150                .to_authority_index(idx)
151                .expect("Index should be valid");
152            let ancestor_info = &self.state_map[idx];
153            let (_low, authority_high_quorum_round) = accepted_quorum_rounds[idx];
154            let stake = self.context.committee.authority(authority_id).stake;
155
156            // Skip if locked
157            if ancestor_info.is_locked(current_clock_round) {
158                continue;
159            }
160
161            match ancestor_info.state {
162                AncestorState::Include => {
163                    if *score <= low_score_threshold {
164                        include_to_exclude.push(StateTransition {
165                            authority_id,
166                            score: *score,
167                            stake,
168                            high_quorum_round: authority_high_quorum_round,
169                        });
170                    }
171                }
172                AncestorState::Exclude(_) => {
173                    if *score > low_score_threshold
174                        || authority_high_quorum_round >= network_high_quorum_round
175                    {
176                        exclude_to_include.push(StateTransition {
177                            authority_id,
178                            score: *score,
179                            stake,
180                            high_quorum_round: authority_high_quorum_round,
181                        });
182                    }
183                }
184            }
185        }
186
187        // We can apply the state change for all ancestors that are moving to the
188        // include state as that will never cause us to exceed the excluded_nodes_stake_threshold
189        for transition in exclude_to_include {
190            self.apply_state_change(transition, AncestorState::Include, current_clock_round);
191        }
192
193        // Sort include_to_exclude by worst scores first as these should take priority
194        // to be excluded if we can't exclude them all due to the excluded_nodes_stake_threshold
195        include_to_exclude.sort_by_key(|t| t.score);
196
197        // We can now apply state change for all ancestors that are moving to the exclude
198        // state as we know there is no new stake that will be freed up by ancestor
199        // state transition to include.
200        for transition in include_to_exclude {
201            // If the stake of this ancestor would cause us to exceed the threshold
202            // we do nothing. The lock will continue to be unlocked meaning we can
203            // try again immediately on the next call to update_all_ancestors_state
204            if self.total_excluded_stake + transition.stake <= self.excluded_nodes_stake_threshold {
205                let new_state = AncestorState::Exclude(transition.score);
206                self.apply_state_change(transition, new_state, current_clock_round);
207            } else {
208                info!(
209                    "Authority {} would have moved to {:?} state with score {} & quorum_round {} but we would have exceeded total excluded stake threshold. current_excluded_stake {} + authority_stake {} > exclude_stake_threshold {}",
210                    transition.authority_id,
211                    AncestorState::Exclude(transition.score),
212                    transition.score,
213                    transition.high_quorum_round,
214                    self.total_excluded_stake,
215                    transition.stake,
216                    self.excluded_nodes_stake_threshold
217                );
218            }
219        }
220    }
221
222    fn apply_state_change(
223        &mut self,
224        transition: StateTransition,
225        new_state: AncestorState,
226        current_clock_round: u32,
227    ) {
228        let block_hostname = &self
229            .context
230            .committee
231            .authority(transition.authority_id)
232            .hostname;
233        let ancestor_info = &mut self.state_map[transition.authority_id.value()];
234
235        match (ancestor_info.state, new_state) {
236            (AncestorState::Exclude(_), AncestorState::Include) => {
237                self.total_excluded_stake = self.total_excluded_stake
238                    .checked_sub(transition.stake)
239                    .expect("total_excluded_stake underflow - trying to subtract more stake than we're tracking as excluded");
240            }
241            (AncestorState::Include, AncestorState::Exclude(_)) => {
242                self.total_excluded_stake += transition.stake;
243            }
244            _ => {
245                panic!("Calls to this function should only be made for state transition.")
246            }
247        }
248
249        ancestor_info.state = new_state;
250        let lock_until_round = current_clock_round + Self::STATE_LOCK_CLOCK_ROUNDS;
251        ancestor_info.set_lock(lock_until_round);
252
253        info!(
254            "Authority {} moved to {new_state:?} state with score {} & quorum_round {} and locked until round {lock_until_round}. Total excluded stake: {}",
255            transition.authority_id,
256            transition.score,
257            transition.high_quorum_round,
258            self.total_excluded_stake
259        );
260
261        self.context
262            .metrics
263            .node_metrics
264            .ancestor_state_change_by_authority
265            .with_label_values(&[
266                block_hostname.as_str(),
267                match new_state {
268                    AncestorState::Include => "include",
269                    AncestorState::Exclude(_) => "exclude",
270                },
271            ])
272            .inc();
273    }
274
275    /// Calculate the network's high quorum round based on accepted rounds via
276    /// RoundTracker.
277    ///
278    /// The authority high quorum round is the lowest round higher or equal to rounds  
279    /// from a quorum of authorities. The network high quorum round is using the high
280    /// quorum round of each authority as tracked by the [`RoundTracker`] and then
281    /// finding the high quroum round of those high quorum rounds.
282    fn calculate_network_high_quorum_round(&self, accepted_quorum_rounds: &[QuorumRound]) -> u32 {
283        let committee = &self.context.committee;
284
285        let mut high_quorum_rounds_with_stake = accepted_quorum_rounds
286            .iter()
287            .zip(committee.authorities())
288            .map(|((_low, high), (_, authority))| (*high, authority.stake))
289            .collect::<Vec<_>>();
290        high_quorum_rounds_with_stake.sort();
291
292        let mut total_stake = 0;
293        let mut network_high_quorum_round = 0;
294
295        for (round, stake) in high_quorum_rounds_with_stake.iter() {
296            total_stake += stake;
297            if total_stake >= self.context.committee.quorum_threshold() {
298                network_high_quorum_round = *round;
299                break;
300            }
301        }
302
303        network_high_quorum_round
304    }
305}
306
307#[cfg(test)]
308mod test {
309    use super::*;
310    use crate::{
311        leader_scoring::ReputationScores, storage::mem_store::MemStore,
312        test_dag_builder::DagBuilder,
313    };
314
315    #[tokio::test]
316    async fn test_calculate_network_high_accepted_quorum_round() {
317        telemetry_subscribers::init_for_testing();
318
319        let (context, _key_pairs) = Context::new_for_test(4);
320        let context = Arc::new(context);
321        let store = Arc::new(MemStore::new());
322        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
323
324        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
325        let mut ancestor_state_manager =
326            AncestorStateManager::new(context.clone(), dag_state.clone());
327        ancestor_state_manager.set_propagation_scores(scores);
328
329        // Quorum rounds are not set yet, so we should calculate a network
330        // quorum round of 0 to start.
331        let network_high_quorum_round =
332            ancestor_state_manager.calculate_network_high_quorum_round(&[]);
333        assert_eq!(network_high_quorum_round, 0);
334
335        let accepted_quorum_rounds = vec![(50, 229), (175, 229), (179, 229), (179, 300)];
336
337        let network_high_quorum_round =
338            ancestor_state_manager.calculate_network_high_quorum_round(&accepted_quorum_rounds);
339        assert_eq!(network_high_quorum_round, 229);
340    }
341
342    // Test all state transitions with probe_accepted_rounds = true
343    // Default all INCLUDE -> EXCLUDE
344    // EXCLUDE -> INCLUDE (Blocked due to lock)
345    // EXCLUDE -> INCLUDE (Pass due to lock expired)
346    // INCLUDE -> EXCLUDE (Blocked due to lock)
347    // INCLUDE -> EXCLUDE (Pass due to lock expired)
348    #[tokio::test]
349    async fn test_update_all_ancestor_state_using_accepted_rounds() {
350        telemetry_subscribers::init_for_testing();
351        let (mut context, _key_pairs) = Context::new_for_test(5);
352        context
353            .protocol_config
354            .set_bad_nodes_stake_threshold_for_testing(33);
355        let context = Arc::new(context);
356        let store = Arc::new(MemStore::new());
357        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
358        let mut dag_builder = DagBuilder::new(context.clone());
359
360        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3, 4]);
361        let mut ancestor_state_manager = AncestorStateManager::new(context, dag_state.clone());
362        ancestor_state_manager.set_propagation_scores(scores);
363
364        let accepted_quorum_rounds =
365            vec![(225, 229), (225, 229), (229, 300), (229, 300), (229, 300)];
366        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
367
368        // Score threshold for exclude is (4 * 10) / 100 = 0
369        // No ancestors should be excluded in with this threshold
370        let state_map = ancestor_state_manager.get_ancestor_states();
371        for state in state_map.iter() {
372            assert_eq!(*state, AncestorState::Include);
373        }
374
375        let scores = ReputationScores::new((1..=300).into(), vec![10, 9, 100, 100, 100]);
376        ancestor_state_manager.set_propagation_scores(scores);
377        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
378
379        // Score threshold for exclude is (100 * 10) / 100 = 10
380        // Authority 1 with the lowest score will move to the EXCLUDE state
381        // Authority 0 with the next lowest score is eligible to move to the EXCLUDE
382        // state based on the score threshold but it would exceed the total excluded
383        // stake threshold so it remains in the INCLUDE state.
384        let state_map = ancestor_state_manager.get_ancestor_states();
385        for (authority, state) in state_map.iter().enumerate() {
386            if authority == 1 {
387                assert_eq!(*state, AncestorState::Exclude(9));
388            } else {
389                assert_eq!(*state, AncestorState::Include);
390            }
391        }
392
393        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
394
395        // 1 authorities should still be excluded with these scores and no new
396        // clock round updates have happened to expire the locks.
397        let state_map = ancestor_state_manager.get_ancestor_states();
398        for (authority, state) in state_map.iter().enumerate() {
399            if authority == 1 {
400                assert_eq!(*state, AncestorState::Exclude(9));
401            } else {
402                assert_eq!(*state, AncestorState::Include);
403            }
404        }
405
406        // Updating the clock round will expire the lock as we only need 5
407        // clock round updates for tests.
408        dag_builder.layers(1..=6).build();
409        let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
410        dag_state.write().accept_blocks(blocks);
411
412        let accepted_quorum_rounds =
413            vec![(225, 229), (229, 300), (229, 300), (229, 300), (229, 300)];
414        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
415
416        // Authority 1 should now be included again because high quorum round is
417        // at the network high quorum round of 300. Authority 0 will now be moved
418        // to EXCLUDE state as its score is low.
419        let state_map = ancestor_state_manager.get_ancestor_states();
420        for (authority, state) in state_map.iter().enumerate() {
421            if authority == 0 {
422                assert_eq!(*state, AncestorState::Exclude(10));
423            } else {
424                assert_eq!(*state, AncestorState::Include);
425            }
426        }
427
428        let accepted_quorum_rounds =
429            vec![(229, 300), (229, 300), (229, 300), (229, 300), (229, 300)];
430        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
431
432        // Ancestor 0 is still locked in the EXCLUDE state until there is more
433        // clock round updates which is why even though the quorum rounds are
434        // high enough it has not moved to the INCLUDE state.
435        let state_map = ancestor_state_manager.get_ancestor_states();
436
437        for (authority, state) in state_map.iter().enumerate() {
438            if authority == 0 {
439                assert_eq!(*state, AncestorState::Exclude(10));
440            } else {
441                assert_eq!(*state, AncestorState::Include);
442            }
443        }
444
445        // Updating the clock round will expire the lock as we only need 5 updates for tests.
446        dag_builder.layers(7..=12).build();
447        let blocks = dag_builder.blocks.values().cloned().collect::<Vec<_>>();
448        dag_state.write().accept_blocks(blocks);
449
450        let scores = ReputationScores::new((1..=300).into(), vec![10, 100, 100, 100, 100]);
451        ancestor_state_manager.set_propagation_scores(scores);
452        ancestor_state_manager.update_all_ancestors_state(&accepted_quorum_rounds);
453
454        // Ancestor 0 can transition to INCLUDE state now that the lock expired
455        // and its quorum round is above the threshold.
456        let state_map = ancestor_state_manager.get_ancestor_states();
457        for state in state_map.iter() {
458            assert_eq!(*state, AncestorState::Include);
459        }
460    }
461}