consensus_core/
leader_schedule.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    fmt::{Debug, Formatter},
7    sync::Arc,
8};
9
10use consensus_config::{AuthorityIndex, Stake};
11use consensus_types::block::Round;
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16    CommitIndex, commit::CommitRange, context::Context, dag_state::DagState,
17    leader_scoring::ReputationScores,
18};
19
20/// The `LeaderSchedule` is responsible for producing the leader schedule across
21/// an epoch. The leader schedule is subject to change periodically based on
22/// calculated `ReputationScores` of the authorities.
23#[derive(Clone)]
24pub(crate) struct LeaderSchedule {
25    pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
26    context: Arc<Context>,
27    num_commits_per_schedule: u64,
28}
29
30impl LeaderSchedule {
31    /// The window where the schedule change takes place in consensus. It represents
32    /// number of committed sub dags.
33    /// TODO: move this to protocol config
34    #[cfg(not(msim))]
35    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
36    #[cfg(msim)]
37    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
38
39    pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
40        Self {
41            context,
42            num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
43            leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
44        }
45    }
46
47    #[cfg(test)]
48    pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
49        self.num_commits_per_schedule = num_commits_per_schedule;
50        self
51    }
52
53    /// Restores the `LeaderSchedule` from storage. It will attempt to retrieve the
54    /// last stored `ReputationScores` and use them to build a `LeaderSwapTable`.
55    pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
56        let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
57            LeaderSwapTable::default(),
58            |(last_commit_ref, last_commit_info)| {
59                LeaderSwapTable::new(
60                    context.clone(),
61                    last_commit_ref.index,
62                    last_commit_info.reputation_scores,
63                )
64            },
65        );
66
67        tracing::info!(
68            "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
69            dag_state.read().scoring_subdags_count(),
70        );
71
72        // create the schedule
73        Self::new(context, leader_swap_table)
74    }
75
76    pub(crate) fn commits_until_leader_schedule_update(
77        &self,
78        dag_state: Arc<RwLock<DagState>>,
79    ) -> usize {
80        let subdag_count = dag_state.read().scoring_subdags_count() as u64;
81
82        assert!(
83            subdag_count <= self.num_commits_per_schedule,
84            "Committed subdags count exceeds the number of commits per schedule"
85        );
86        self.num_commits_per_schedule
87            .checked_sub(subdag_count)
88            .unwrap() as usize
89    }
90
91    /// Checks whether the dag state sub dags list is empty. If yes then that means that
92    /// either (1) the system has just started and there is no unscored sub dag available (2) the
93    /// schedule has updated - new scores have been calculated. Both cases we consider as valid cases
94    /// where the schedule has been updated.
95    pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
96        dag_state.read().is_scoring_subdag_empty()
97    }
98
99    pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
100        let _s = self
101            .context
102            .metrics
103            .node_metrics
104            .scope_processing_time
105            .with_label_values(&["LeaderSchedule::update_leader_schedule"])
106            .start_timer();
107
108        let (reputation_scores, last_commit_index) = {
109            let dag_state = dag_state.read();
110            let reputation_scores = dag_state.calculate_scoring_subdag_scores();
111
112            let last_commit_index = dag_state.scoring_subdag_commit_range();
113
114            (reputation_scores, last_commit_index)
115        };
116
117        {
118            let mut dag_state = dag_state.write();
119            // Clear scoring subdag as we have updated the leader schedule
120            dag_state.clear_scoring_subdag();
121            // Buffer score and last commit rounds in dag state to be persisted later
122            dag_state.add_commit_info(reputation_scores.clone());
123        }
124
125        self.update_leader_swap_table(LeaderSwapTable::new(
126            self.context.clone(),
127            last_commit_index,
128            reputation_scores.clone(),
129        ));
130
131        reputation_scores.update_metrics(self.context.clone());
132
133        self.context
134            .metrics
135            .node_metrics
136            .num_of_bad_nodes
137            .set(self.leader_swap_table.read().bad_nodes.len() as i64);
138    }
139
140    pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
141        cfg_if::cfg_if! {
142            // TODO: we need to differentiate the leader strategy in tests, so for
143            // some type of testing (ex sim tests) we can use the staked approach.
144            if #[cfg(test)] {
145                let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
146                let table = self.leader_swap_table.read();
147                table.swap(leader, round, leader_offset).unwrap_or(leader)
148            } else {
149                let leader = self.elect_leader_stake_based(round, leader_offset);
150                let table = self.leader_swap_table.read();
151                table.swap(leader, round, leader_offset).unwrap_or(leader)
152            }
153        }
154    }
155
156    pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
157        assert!((offset as usize) < self.context.committee.size());
158
159        // To ensure that we elect different leaders for the same round (using
160        // different offset) we are using the round number as seed to shuffle in
161        // a weighted way the results, but skip based on the offset.
162        // TODO: use a cache in case this proves to be computationally expensive
163        let mut seed_bytes = [0u8; 32];
164        seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
165        let mut rng = StdRng::from_seed(seed_bytes);
166
167        let choices = self
168            .context
169            .committee
170            .authorities()
171            .map(|(index, authority)| (index, authority.stake as f32))
172            .collect::<Vec<_>>();
173
174        *choices
175            .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
176            .expect("Weighted choice error: stake values incorrect!")
177            .skip(offset as usize)
178            .map(|(index, _)| index)
179            .next()
180            .unwrap()
181    }
182
183    /// Atomically updates the `LeaderSwapTable` with the new provided one. Any
184    /// leader queried from now on will get calculated according to this swap
185    /// table until a new one is provided again.
186    fn update_leader_swap_table(&self, table: LeaderSwapTable) {
187        let read = self.leader_swap_table.read();
188        let old_commit_range = &read.reputation_scores.commit_range;
189        let new_commit_range = &table.reputation_scores.commit_range;
190
191        // Unless LeaderSchedule is brand new and using the default commit range
192        // of CommitRange(0..0) all future LeaderSwapTables should be calculated
193        // from a CommitRange of equal length and immediately following the
194        // preceding commit range of the old swap table.
195        if *old_commit_range != CommitRange::default() {
196            assert!(
197                old_commit_range.is_next_range(new_commit_range)
198                    && old_commit_range.is_equal_size(new_commit_range),
199                "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
200            );
201        }
202        drop(read);
203
204        tracing::trace!("Updating {table:?}");
205
206        let mut write = self.leader_swap_table.write();
207        *write = table;
208    }
209}
210
211#[derive(Default, Clone)]
212pub(crate) struct LeaderSwapTable {
213    /// The list of `f` (by configurable stake) authorities with best scores as
214    /// those defined by the provided `ReputationScores`. Those authorities will
215    /// be used in the position of the `bad_nodes` on the final leader schedule.
216    /// Storing the hostname & stake along side the authority index for debugging.
217    pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
218
219    /// The set of `f` (by configurable stake) authorities with the worst scores
220    /// as those defined by the provided `ReputationScores`. Every time where such
221    /// authority is elected as leader on the schedule, it will swapped by one of
222    /// the authorities of the `good_nodes`.
223    /// Storing the hostname & stake along side the authority index for debugging.
224    pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
225
226    /// Scores by authority in descending order, needed by other parts of the system
227    /// for a consistent view on how each validator performs in consensus.
228    pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
229
230    // The scores for which the leader swap table was built from. This struct is
231    // used for debugging purposes. Once `good_nodes` & `bad_nodes` are identified
232    // the `reputation_scores` are no longer needed functionally for the swap table.
233    pub(crate) reputation_scores: ReputationScores,
234}
235
236impl LeaderSwapTable {
237    // Constructs a new table based on the provided reputation scores. The
238    // `swap_stake_threshold` designates the total (by stake) nodes that will be
239    // considered as "bad" based on their scores and will be replaced by good nodes.
240    // The `swap_stake_threshold` should be in the range of [0 - 33].
241    pub(crate) fn new(
242        context: Arc<Context>,
243        commit_index: CommitIndex,
244        reputation_scores: ReputationScores,
245    ) -> Self {
246        let swap_stake_threshold = context
247            .protocol_config
248            .consensus_bad_nodes_stake_threshold();
249        Self::new_inner(
250            context,
251            swap_stake_threshold,
252            commit_index,
253            reputation_scores,
254        )
255    }
256
257    fn new_inner(
258        context: Arc<Context>,
259        // Ignore linter warning in simtests.
260        // TODO: maybe override protocol configs in tests for swap_stake_threshold, and call new().
261        #[allow(unused_variables)] swap_stake_threshold: u64,
262        commit_index: CommitIndex,
263        reputation_scores: ReputationScores,
264    ) -> Self {
265        #[cfg(msim)]
266        let swap_stake_threshold = 33;
267
268        assert!(
269            (0..=33).contains(&swap_stake_threshold),
270            "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
271        );
272
273        // When reputation scores are disabled or at genesis, use the default value.
274        if reputation_scores.scores_per_authority.is_empty() {
275            return Self::default();
276        }
277
278        // Randomize order of authorities when they have the same score,
279        // to avoid bias in the selection of the good and bad nodes.
280        let mut seed_bytes = [0u8; 32];
281        seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
282        let mut rng = StdRng::from_seed(seed_bytes);
283        let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
284        assert_eq!(authorities_by_score.len(), context.committee.size());
285        authorities_by_score.shuffle(&mut rng);
286        // Stable sort the authorities by score descending. Order of authorities with the same score is preserved.
287        authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
288
289        // Calculating the good nodes
290        let good_nodes = Self::retrieve_first_nodes(
291            context.clone(),
292            authorities_by_score.iter(),
293            swap_stake_threshold,
294        )
295        .into_iter()
296        .collect::<Vec<(AuthorityIndex, String, Stake)>>();
297
298        // Calculating the bad nodes
299        // Reverse the sorted authorities to score ascending so we get the first
300        // low scorers up to the provided stake threshold.
301        let bad_nodes = Self::retrieve_first_nodes(
302            context.clone(),
303            authorities_by_score.iter().rev(),
304            swap_stake_threshold,
305        )
306        .into_iter()
307        .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
308        .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
309
310        good_nodes.iter().for_each(|(idx, hostname, stake)| {
311            tracing::debug!(
312                "Good node {hostname} with stake {stake} has score {} for {:?}",
313                reputation_scores.scores_per_authority[idx.to_owned()],
314                reputation_scores.commit_range,
315            );
316        });
317
318        bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
319            tracing::debug!(
320                "Bad node {hostname} with stake {stake} has score {} for {:?}",
321                reputation_scores.scores_per_authority[idx.to_owned()],
322                reputation_scores.commit_range,
323            );
324        });
325
326        tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
327
328        Self {
329            good_nodes,
330            bad_nodes,
331            reputation_scores_desc: authorities_by_score,
332            reputation_scores,
333        }
334    }
335
336    /// Checks whether the provided leader is a bad performer and needs to be
337    /// swapped in the schedule with a good performer. If not, then the method
338    /// returns None. Otherwise the leader to swap with is returned instead. The
339    /// `leader_round` & `leader_offset` represents the DAG slot on which the
340    /// provided `AuthorityIndex` is a leader on and is used as a seed to random
341    /// function in order to calculate the good node that will swap in that round
342    /// with the bad node. We are intentionally not doing weighted randomness as
343    /// we want to give to all the good nodes equal opportunity to get swapped
344    /// with bad nodes and nothave one node with enough stake end up swapping
345    /// bad nodes more frequently than the others on the final schedule.
346    pub(crate) fn swap(
347        &self,
348        leader: AuthorityIndex,
349        leader_round: Round,
350        leader_offset: u32,
351    ) -> Option<AuthorityIndex> {
352        if self.bad_nodes.contains_key(&leader) {
353            // TODO: Re-work swap for the multileader case
354            assert!(
355                leader_offset == 0,
356                "Swap for multi-leader case not implemented yet."
357            );
358            let mut seed_bytes = [0u8; 32];
359            seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
360            seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
361            let mut rng = StdRng::from_seed(seed_bytes);
362
363            let (idx, _hostname, _stake) = self
364                .good_nodes
365                .choose(&mut rng)
366                .expect("There should be at least one good node available");
367
368            tracing::trace!(
369                "Swapping bad leader {} -> {} for round {}",
370                leader,
371                idx,
372                leader_round
373            );
374
375            return Some(*idx);
376        }
377        None
378    }
379
380    /// Retrieves the first nodes provided by the iterator `authorities` until the
381    /// `stake_threshold` has been reached. The `stake_threshold` should be between
382    /// [0, 100] and expresses the percentage of stake that is considered the cutoff.
383    /// It's the caller's responsibility to ensure that the elements of the `authorities`
384    /// input is already sorted.
385    fn retrieve_first_nodes<'a>(
386        context: Arc<Context>,
387        authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
388        stake_threshold: u64,
389    ) -> Vec<(AuthorityIndex, String, Stake)> {
390        let mut filtered_authorities = Vec::new();
391
392        let mut stake = 0;
393        for &(authority_idx, _score) in authorities {
394            stake += context.committee.stake(authority_idx);
395
396            // If the total accumulated stake has surpassed the stake threshold
397            // then we omit this last authority and we exit the loop. Important to
398            // note that this means if the threshold is too low we may not have
399            // any nodes returned.
400            if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
401                break;
402            }
403
404            let authority = context.committee.authority(authority_idx);
405            filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
406        }
407
408        filtered_authorities
409    }
410}
411
412impl Debug for LeaderSwapTable {
413    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
414        f.write_str(&format!(
415            "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
416            self.reputation_scores.commit_range,
417            self.good_nodes
418                .iter()
419                .map(|(idx, _hostname, _stake)| idx.to_owned())
420                .collect::<Vec<AuthorityIndex>>(),
421            self.good_nodes
422                .iter()
423                .map(|(_idx, _hostname, stake)| stake)
424                .sum::<Stake>(),
425            self.bad_nodes.keys().map(|idx| idx.to_owned()),
426            self.bad_nodes
427                .values()
428                .map(|(_hostname, stake)| stake)
429                .sum::<Stake>(),
430        ))
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
437
438    use super::*;
439    use crate::{
440        block::{TestBlock, VerifiedBlock},
441        commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
442        storage::{Store, WriteBatch, mem_store::MemStore},
443        test_dag_builder::DagBuilder,
444    };
445
446    #[tokio::test]
447    async fn test_elect_leader() {
448        let context = Arc::new(Context::new_for_test(4).0);
449        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
450
451        assert_eq!(
452            leader_schedule.elect_leader(0, 0),
453            AuthorityIndex::new_for_test(0)
454        );
455        assert_eq!(
456            leader_schedule.elect_leader(1, 0),
457            AuthorityIndex::new_for_test(1)
458        );
459        assert_eq!(
460            leader_schedule.elect_leader(5, 0),
461            AuthorityIndex::new_for_test(1)
462        );
463        // ensure we elect different leaders for the same round for the multi-leader case
464        assert_ne!(
465            leader_schedule.elect_leader_stake_based(1, 1),
466            leader_schedule.elect_leader_stake_based(1, 2)
467        );
468    }
469
470    #[tokio::test]
471    async fn test_elect_leader_stake_based() {
472        let context = Arc::new(Context::new_for_test(4).0);
473        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
474
475        assert_eq!(
476            leader_schedule.elect_leader_stake_based(0, 0),
477            AuthorityIndex::new_for_test(1)
478        );
479        assert_eq!(
480            leader_schedule.elect_leader_stake_based(1, 0),
481            AuthorityIndex::new_for_test(1)
482        );
483        assert_eq!(
484            leader_schedule.elect_leader_stake_based(5, 0),
485            AuthorityIndex::new_for_test(3)
486        );
487        // ensure we elect different leaders for the same round for the multi-leader case
488        assert_ne!(
489            leader_schedule.elect_leader_stake_based(1, 1),
490            leader_schedule.elect_leader_stake_based(1, 2)
491        );
492    }
493
494    #[tokio::test]
495    async fn test_leader_schedule_from_store() {
496        telemetry_subscribers::init_for_testing();
497        let mut context = Context::new_for_test(4).0;
498        context
499            .protocol_config
500            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
501        let context = Arc::new(context);
502        let store = Arc::new(MemStore::new());
503
504        // Populate fully connected test blocks for round 0 ~ 11, authorities 0 ~ 3.
505        let mut dag_builder = DagBuilder::new(context.clone());
506        dag_builder.layers(1..=11).build();
507        let mut subdags = vec![];
508        let mut expected_commits = vec![];
509        let mut blocks_to_write = vec![];
510
511        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
512            for block in sub_dag.blocks.iter() {
513                blocks_to_write.push(block.clone());
514            }
515            expected_commits.push(commit);
516            subdags.push(sub_dag);
517        }
518
519        // The CommitInfo for the first 10 commits are written to store. This is the
520        // info that LeaderSchedule will be recovered from
521        let commit_range = (1..=10).into();
522        let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
523        let committed_rounds = vec![9, 9, 10, 9];
524        let commit_ref = expected_commits[9].reference();
525        let commit_info = CommitInfo {
526            reputation_scores,
527            committed_rounds,
528        };
529
530        // CommitIndex '11' will be written to store. This should result in the cached
531        // last_committed_rounds & unscored subdags in DagState to be updated with the
532        // latest commit information on recovery.
533        store
534            .write(
535                WriteBatch::default()
536                    .commit_info(vec![(commit_ref, commit_info)])
537                    .blocks(blocks_to_write)
538                    .commits(expected_commits),
539            )
540            .unwrap();
541
542        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
543
544        // Check that DagState recovery from stored CommitInfo worked correctly
545        assert_eq!(
546            dag_builder.last_committed_rounds.clone(),
547            dag_state.read().last_committed_rounds()
548        );
549        assert_eq!(1, dag_state.read().scoring_subdags_count());
550        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
551        let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
552        assert_eq!(recovered_scores, expected_scores);
553
554        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
555
556        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
557        let leader_swap_table = leader_schedule.leader_swap_table.read();
558        assert_eq!(leader_swap_table.good_nodes.len(), 1);
559        assert_eq!(
560            leader_swap_table.good_nodes[0].0,
561            AuthorityIndex::new_for_test(0)
562        );
563        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
564        assert!(
565            leader_swap_table
566                .bad_nodes
567                .contains_key(&AuthorityIndex::new_for_test(2)),
568            "{:?}",
569            leader_swap_table.bad_nodes
570        );
571    }
572
573    #[tokio::test]
574    async fn test_leader_schedule_from_store_no_commits() {
575        telemetry_subscribers::init_for_testing();
576        let mut context = Context::new_for_test(4).0;
577        context
578            .protocol_config
579            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
580        let context = Arc::new(context);
581        let store = Arc::new(MemStore::new());
582
583        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
584
585        let expected_last_committed_rounds = vec![0, 0, 0, 0];
586
587        // Check that DagState recovery from stored CommitInfo worked correctly
588        assert_eq!(
589            expected_last_committed_rounds,
590            dag_state.read().last_committed_rounds()
591        );
592        assert_eq!(0, dag_state.read().scoring_subdags_count());
593
594        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
595
596        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
597        let leader_swap_table = leader_schedule.leader_swap_table.read();
598        assert_eq!(leader_swap_table.good_nodes.len(), 0);
599        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
600    }
601
602    #[tokio::test]
603    async fn test_leader_schedule_from_store_no_commit_info() {
604        telemetry_subscribers::init_for_testing();
605        let mut context = Context::new_for_test(4).0;
606        context
607            .protocol_config
608            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
609        let context = Arc::new(context);
610        let store = Arc::new(MemStore::new());
611
612        // Populate fully connected test blocks for round 0 ~ 2, authorities 0 ~ 3.
613        let mut dag_builder = DagBuilder::new(context.clone());
614        dag_builder.layers(1..=2).build();
615
616        let mut expected_scored_subdags = vec![];
617        let mut expected_commits = vec![];
618        let mut blocks_to_write = vec![];
619
620        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
621            for block in sub_dag.blocks.iter() {
622                blocks_to_write.push(block.clone());
623            }
624            expected_commits.push(commit);
625            expected_scored_subdags.push(sub_dag);
626        }
627
628        // The CommitInfo for the first 2 commits are written to store. 10 commits
629        // would have been required for a leader schedule update so at this point
630        // no commit info should have been persisted and no leader schedule should
631        // be recovered. However dag state should have properly recovered the
632        // unscored subdags & last committed rounds.
633        store
634            .write(
635                WriteBatch::default()
636                    .blocks(blocks_to_write)
637                    .commits(expected_commits),
638            )
639            .unwrap();
640
641        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
642
643        // Check that DagState recovery from stored CommitInfo worked correctly
644        assert_eq!(
645            dag_builder.last_committed_rounds.clone(),
646            dag_state.read().last_committed_rounds()
647        );
648        assert_eq!(
649            expected_scored_subdags.len(),
650            dag_state.read().scoring_subdags_count()
651        );
652        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
653        let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
654        assert_eq!(recovered_scores, expected_scores);
655
656        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
657
658        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
659        let leader_swap_table = leader_schedule.leader_swap_table.read();
660        assert_eq!(leader_swap_table.good_nodes.len(), 0);
661        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
662    }
663
664    #[tokio::test]
665    async fn test_leader_schedule_commits_until_leader_schedule_update() {
666        telemetry_subscribers::init_for_testing();
667        let context = Arc::new(Context::new_for_test(4).0);
668        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
669
670        let dag_state = Arc::new(RwLock::new(DagState::new(
671            context.clone(),
672            Arc::new(MemStore::new()),
673        )));
674        let unscored_subdags = vec![CommittedSubDag::new(
675            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
676            vec![],
677            context.clock.timestamp_utc_ms(),
678            CommitRef::new(1, CommitDigest::MIN),
679        )];
680        dag_state.write().add_scoring_subdags(unscored_subdags);
681
682        let commits_until_leader_schedule_update =
683            leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
684        assert_eq!(commits_until_leader_schedule_update, 299);
685    }
686
687    #[tokio::test]
688    async fn test_leader_schedule_update_leader_schedule() {
689        telemetry_subscribers::init_for_testing();
690        let mut context = Context::new_for_test(4).0;
691        context
692            .protocol_config
693            .set_consensus_bad_nodes_stake_threshold_for_testing(33);
694        let context = Arc::new(context);
695        let leader_schedule = Arc::new(LeaderSchedule::new(
696            context.clone(),
697            LeaderSwapTable::default(),
698        ));
699        let dag_state = Arc::new(RwLock::new(DagState::new(
700            context.clone(),
701            Arc::new(MemStore::new()),
702        )));
703
704        // Populate fully connected test blocks for round 0 ~ 4, authorities 0 ~ 3.
705        let max_round: u32 = 4;
706        let num_authorities: u32 = 4;
707
708        let mut blocks = Vec::new();
709        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
710            .committee
711            .authorities()
712            .map(|index| {
713                let author_idx = index.0.value() as u32;
714                let block = TestBlock::new(0, author_idx).build();
715                VerifiedBlock::new_for_test(block)
716            })
717            .map(|block| (block.reference(), block))
718            .unzip();
719        blocks.extend(genesis);
720
721        let mut ancestors = genesis_references;
722        let mut leader = None;
723        for round in 1..=max_round {
724            let mut new_ancestors = vec![];
725            for author in 0..num_authorities {
726                let base_ts = round as BlockTimestampMs * 1000;
727                let block = VerifiedBlock::new_for_test(
728                    TestBlock::new(round, author)
729                        .set_timestamp_ms(base_ts + (author + round) as u64)
730                        .set_ancestors(ancestors.clone())
731                        .build(),
732                );
733                new_ancestors.push(block.reference());
734
735                // Simulate referenced block which was part of another committed
736                // subdag.
737                if round == 3 && author == 0 {
738                    tracing::info!("Skipping {block} in committed subdags blocks");
739                    continue;
740                }
741
742                blocks.push(block.clone());
743
744                // only write one block for the final round, which is the leader
745                // of the committed subdag.
746                if round == max_round {
747                    leader = Some(block.clone());
748                    break;
749                }
750            }
751            ancestors = new_ancestors;
752        }
753
754        let leader_block = leader.unwrap();
755        let leader_ref = leader_block.reference();
756        let commit_index = 1;
757
758        let last_commit = TrustedCommit::new_for_test(
759            commit_index,
760            CommitDigest::MIN,
761            context.clock.timestamp_utc_ms(),
762            leader_ref,
763            blocks
764                .iter()
765                .map(|block| block.reference())
766                .collect::<Vec<_>>(),
767        );
768
769        let unscored_subdags = vec![CommittedSubDag::new(
770            leader_ref,
771            blocks,
772            context.clock.timestamp_utc_ms(),
773            last_commit.reference(),
774        )];
775
776        let mut dag_state_write = dag_state.write();
777        dag_state_write.set_last_commit(last_commit);
778        dag_state_write.add_scoring_subdags(unscored_subdags);
779        drop(dag_state_write);
780
781        assert_eq!(
782            leader_schedule.elect_leader(4, 0),
783            AuthorityIndex::new_for_test(0)
784        );
785
786        leader_schedule.update_leader_schedule_v2(&dag_state);
787
788        let leader_swap_table = leader_schedule.leader_swap_table.read();
789        assert_eq!(leader_swap_table.good_nodes.len(), 1);
790        assert_eq!(
791            leader_swap_table.good_nodes[0].0,
792            AuthorityIndex::new_for_test(2)
793        );
794        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
795        assert!(
796            leader_swap_table
797                .bad_nodes
798                .contains_key(&AuthorityIndex::new_for_test(0))
799        );
800        assert_eq!(
801            leader_schedule.elect_leader(4, 0),
802            AuthorityIndex::new_for_test(2)
803        );
804    }
805
806    #[tokio::test]
807    async fn test_leader_swap_table() {
808        telemetry_subscribers::init_for_testing();
809        let context = Arc::new(Context::new_for_test(4).0);
810
811        let swap_stake_threshold = 33;
812        let reputation_scores = ReputationScores::new(
813            (0..=10).into(),
814            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
815        );
816        let leader_swap_table =
817            LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
818
819        assert_eq!(leader_swap_table.good_nodes.len(), 1);
820        assert_eq!(
821            leader_swap_table.good_nodes[0].0,
822            AuthorityIndex::new_for_test(3)
823        );
824        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
825        assert!(
826            leader_swap_table
827                .bad_nodes
828                .contains_key(&AuthorityIndex::new_for_test(0))
829        );
830    }
831
832    #[tokio::test]
833    async fn test_leader_swap_table_swap() {
834        telemetry_subscribers::init_for_testing();
835        let context = Arc::new(Context::new_for_test(4).0);
836
837        let swap_stake_threshold = 33;
838        let reputation_scores = ReputationScores::new(
839            (0..=10).into(),
840            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
841        );
842        let leader_swap_table =
843            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
844
845        // Test swapping a bad leader
846        let leader = AuthorityIndex::new_for_test(0);
847        let leader_round = 1;
848        let leader_offset = 0;
849        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
850        assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
851
852        // Test not swapping a good leader
853        let leader = AuthorityIndex::new_for_test(1);
854        let leader_round = 1;
855        let leader_offset = 0;
856        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
857        assert_eq!(swapped_leader, None);
858    }
859
860    #[tokio::test]
861    async fn test_leader_swap_table_retrieve_first_nodes() {
862        telemetry_subscribers::init_for_testing();
863        let context = Arc::new(Context::new_for_test(4).0);
864
865        let authorities = [
866            (AuthorityIndex::new_for_test(0), 1),
867            (AuthorityIndex::new_for_test(1), 2),
868            (AuthorityIndex::new_for_test(2), 3),
869            (AuthorityIndex::new_for_test(3), 4),
870        ];
871
872        let stake_threshold = 50;
873        let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
874            context.clone(),
875            authorities.iter(),
876            stake_threshold,
877        );
878
879        // Test setup includes 4 validators with even stake. Therefore with a
880        // stake_threshold of 50% we should see 2 validators filtered.
881        assert_eq!(filtered_authorities.len(), 2);
882        let authority_0_idx = AuthorityIndex::new_for_test(0);
883        let authority_0 = context.committee.authority(authority_0_idx);
884        assert!(filtered_authorities.contains(&(
885            authority_0_idx,
886            authority_0.hostname.clone(),
887            authority_0.stake
888        )));
889        let authority_1_idx = AuthorityIndex::new_for_test(1);
890        let authority_1 = context.committee.authority(authority_1_idx);
891        assert!(filtered_authorities.contains(&(
892            authority_1_idx,
893            authority_1.hostname.clone(),
894            authority_1.stake
895        )));
896    }
897
898    #[tokio::test]
899    #[should_panic(
900        expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
901    )]
902    async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
903        telemetry_subscribers::init_for_testing();
904        let context = Arc::new(Context::new_for_test(4).0);
905
906        let swap_stake_threshold = 34;
907        let reputation_scores = ReputationScores::new(
908            (0..=10).into(),
909            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
910        );
911        LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
912    }
913
914    #[tokio::test]
915    async fn test_update_leader_swap_table() {
916        telemetry_subscribers::init_for_testing();
917        let context = Arc::new(Context::new_for_test(4).0);
918
919        let swap_stake_threshold = 33;
920        let reputation_scores = ReputationScores::new(
921            (1..=10).into(),
922            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
923        );
924        let leader_swap_table =
925            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
926
927        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
928
929        // Update leader from brand new schedule to first real schedule
930        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
931
932        let reputation_scores = ReputationScores::new(
933            (11..=20).into(),
934            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
935        );
936        let leader_swap_table =
937            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
938
939        // Update leader from old swap table to new valid swap table
940        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
941    }
942
943    #[tokio::test]
944    #[should_panic(
945        expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
946    )]
947    async fn test_update_bad_leader_swap_table() {
948        telemetry_subscribers::init_for_testing();
949        let context = Arc::new(Context::new_for_test(4).0);
950
951        let swap_stake_threshold = 33;
952        let reputation_scores = ReputationScores::new(
953            (1..=10).into(),
954            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
955        );
956        let leader_swap_table =
957            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
958
959        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
960
961        // Update leader from brand new schedule to first real schedule
962        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
963
964        let reputation_scores = ReputationScores::new(
965            (11..=20).into(),
966            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
967        );
968        let leader_swap_table =
969            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
970
971        // Update leader from old swap table to new valid swap table
972        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
973
974        let reputation_scores = ReputationScores::new(
975            (21..=25).into(),
976            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
977        );
978        let leader_swap_table =
979            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
980
981        // Update leader from old swap table to new invalid swap table
982        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
983    }
984}