consensus_core/
leader_schedule.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    fmt::{Debug, Formatter},
7    sync::Arc,
8};
9
10use consensus_config::{AuthorityIndex, Stake};
11use consensus_types::block::Round;
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16    CommitIndex, commit::CommitRange, context::Context, dag_state::DagState,
17    leader_scoring::ReputationScores,
18};
19
20/// The `LeaderSchedule` is responsible for producing the leader schedule across
21/// an epoch. The leader schedule is subject to change periodically based on
22/// calculated `ReputationScores` of the authorities.
23#[derive(Clone)]
24pub(crate) struct LeaderSchedule {
25    pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
26    context: Arc<Context>,
27    num_commits_per_schedule: u64,
28}
29
30impl LeaderSchedule {
31    /// The window where the schedule change takes place in consensus. It represents
32    /// number of committed sub dags.
33    /// TODO: move this to protocol config
34    #[cfg(not(msim))]
35    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
36    #[cfg(msim)]
37    const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
38
39    pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
40        Self {
41            context,
42            num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
43            leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
44        }
45    }
46
47    #[cfg(test)]
48    pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
49        self.num_commits_per_schedule = num_commits_per_schedule;
50        self
51    }
52
53    /// Restores the `LeaderSchedule` from storage. It will attempt to retrieve the
54    /// last stored `ReputationScores` and use them to build a `LeaderSwapTable`.
55    pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
56        let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
57            LeaderSwapTable::default(),
58            |(last_commit_ref, last_commit_info)| {
59                LeaderSwapTable::new(
60                    context.clone(),
61                    last_commit_ref.index,
62                    last_commit_info.reputation_scores,
63                )
64            },
65        );
66
67        tracing::info!(
68            "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
69            dag_state.read().scoring_subdags_count(),
70        );
71
72        // create the schedule
73        Self::new(context, leader_swap_table)
74    }
75
76    pub(crate) fn commits_until_leader_schedule_update(
77        &self,
78        dag_state: Arc<RwLock<DagState>>,
79    ) -> usize {
80        let subdag_count = dag_state.read().scoring_subdags_count() as u64;
81
82        assert!(
83            subdag_count <= self.num_commits_per_schedule,
84            "Committed subdags count exceeds the number of commits per schedule"
85        );
86        self.num_commits_per_schedule
87            .checked_sub(subdag_count)
88            .unwrap() as usize
89    }
90
91    pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
92        let _s = self
93            .context
94            .metrics
95            .node_metrics
96            .scope_processing_time
97            .with_label_values(&["LeaderSchedule::update_leader_schedule"])
98            .start_timer();
99
100        let (reputation_scores, last_commit_index) = {
101            let dag_state = dag_state.read();
102            let reputation_scores = dag_state.calculate_scoring_subdag_scores();
103
104            let last_commit_index = dag_state.scoring_subdag_commit_range();
105
106            (reputation_scores, last_commit_index)
107        };
108
109        {
110            let mut dag_state = dag_state.write();
111            // Clear scoring subdag as we have updated the leader schedule
112            dag_state.clear_scoring_subdag();
113            // Buffer score and last commit rounds in dag state to be persisted later
114            dag_state.add_commit_info(reputation_scores.clone());
115        }
116
117        self.update_leader_swap_table(LeaderSwapTable::new(
118            self.context.clone(),
119            last_commit_index,
120            reputation_scores.clone(),
121        ));
122
123        reputation_scores.update_metrics(self.context.clone());
124
125        self.context
126            .metrics
127            .node_metrics
128            .num_of_bad_nodes
129            .set(self.leader_swap_table.read().bad_nodes.len() as i64);
130    }
131
132    pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
133        cfg_if::cfg_if! {
134            // TODO: we need to differentiate the leader strategy in tests, so for
135            // some type of testing (ex sim tests) we can use the staked approach.
136            if #[cfg(test)] {
137                let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
138                let table = self.leader_swap_table.read();
139                table.swap(leader, round, leader_offset).unwrap_or(leader)
140            } else {
141                let leader = self.elect_leader_stake_based(round, leader_offset);
142                let table = self.leader_swap_table.read();
143                table.swap(leader, round, leader_offset).unwrap_or(leader)
144            }
145        }
146    }
147
148    pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
149        assert!((offset as usize) < self.context.committee.size());
150
151        // To ensure that we elect different leaders for the same round (using
152        // different offset) we are using the round number as seed to shuffle in
153        // a weighted way the results, but skip based on the offset.
154        // TODO: use a cache in case this proves to be computationally expensive
155        let mut seed_bytes = [0u8; 32];
156        seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
157        let mut rng = StdRng::from_seed(seed_bytes);
158
159        let choices = self
160            .context
161            .committee
162            .authorities()
163            .map(|(index, authority)| (index, authority.stake as f32))
164            .collect::<Vec<_>>();
165
166        *choices
167            .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
168            .expect("Weighted choice error: stake values incorrect!")
169            .skip(offset as usize)
170            .map(|(index, _)| index)
171            .next()
172            .unwrap()
173    }
174
175    /// Atomically updates the `LeaderSwapTable` with the new provided one. Any
176    /// leader queried from now on will get calculated according to this swap
177    /// table until a new one is provided again.
178    fn update_leader_swap_table(&self, table: LeaderSwapTable) {
179        let read = self.leader_swap_table.read();
180        let old_commit_range = &read.reputation_scores.commit_range;
181        let new_commit_range = &table.reputation_scores.commit_range;
182
183        // Unless LeaderSchedule is brand new and using the default commit range
184        // of CommitRange(0..0) all future LeaderSwapTables should be calculated
185        // from a CommitRange of equal length and immediately following the
186        // preceding commit range of the old swap table.
187        if *old_commit_range != CommitRange::default() {
188            assert!(
189                old_commit_range.is_next_range(new_commit_range)
190                    && old_commit_range.is_equal_size(new_commit_range),
191                "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
192            );
193        }
194        drop(read);
195
196        tracing::trace!("Updating {table:?}");
197
198        let mut write = self.leader_swap_table.write();
199        *write = table;
200    }
201}
202
203#[derive(Default, Clone)]
204pub(crate) struct LeaderSwapTable {
205    /// The list of `f` (by configurable stake) authorities with best scores as
206    /// those defined by the provided `ReputationScores`. Those authorities will
207    /// be used in the position of the `bad_nodes` on the final leader schedule.
208    /// Storing the hostname & stake along side the authority index for debugging.
209    pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
210
211    /// The set of `f` (by configurable stake) authorities with the worst scores
212    /// as those defined by the provided `ReputationScores`. Every time where such
213    /// authority is elected as leader on the schedule, it will swapped by one of
214    /// the authorities of the `good_nodes`.
215    /// Storing the hostname & stake along side the authority index for debugging.
216    pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
217
218    // The scores for which the leader swap table was built from. This struct is
219    // used for debugging purposes. Once `good_nodes` & `bad_nodes` are identified
220    // the `reputation_scores` are no longer needed functionally for the swap table.
221    pub(crate) reputation_scores: ReputationScores,
222}
223
224impl LeaderSwapTable {
225    // Constructs a new table based on the provided reputation scores. The
226    // `swap_stake_threshold` designates the total (by stake) nodes that will be
227    // considered as "bad" based on their scores and will be replaced by good nodes.
228    // The `swap_stake_threshold` should be in the range of [0 - 33].
229    pub(crate) fn new(
230        context: Arc<Context>,
231        commit_index: CommitIndex,
232        reputation_scores: ReputationScores,
233    ) -> Self {
234        let swap_stake_threshold = context.protocol_config.bad_nodes_stake_threshold();
235        Self::new_inner(
236            context,
237            swap_stake_threshold,
238            commit_index,
239            reputation_scores,
240        )
241    }
242
243    fn new_inner(
244        context: Arc<Context>,
245        // Ignore linter warning in simtests.
246        // TODO: maybe override protocol configs in tests for swap_stake_threshold, and call new().
247        #[allow(unused_variables)] swap_stake_threshold: u64,
248        commit_index: CommitIndex,
249        reputation_scores: ReputationScores,
250    ) -> Self {
251        #[cfg(msim)]
252        let swap_stake_threshold = 33;
253
254        assert!(
255            (0..=33).contains(&swap_stake_threshold),
256            "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
257        );
258
259        // When reputation scores are disabled or at genesis, use the default value.
260        if reputation_scores.scores_per_authority.is_empty() {
261            return Self::default();
262        }
263
264        // Randomize order of authorities when they have the same score,
265        // to avoid bias in the selection of the good and bad nodes.
266        let mut seed_bytes = [0u8; 32];
267        seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
268        let mut rng = StdRng::from_seed(seed_bytes);
269        let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
270        assert_eq!(authorities_by_score.len(), context.committee.size());
271        authorities_by_score.shuffle(&mut rng);
272        // Stable sort the authorities by score descending. Order of authorities with the same score is preserved.
273        authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
274
275        // Calculating the good nodes
276        let good_nodes = Self::retrieve_first_nodes(
277            context.clone(),
278            authorities_by_score.iter(),
279            swap_stake_threshold,
280        )
281        .into_iter()
282        .collect::<Vec<(AuthorityIndex, String, Stake)>>();
283
284        // Calculating the bad nodes
285        // Reverse the sorted authorities to score ascending so we get the first
286        // low scorers up to the provided stake threshold.
287        let bad_nodes = Self::retrieve_first_nodes(
288            context.clone(),
289            authorities_by_score.iter().rev(),
290            swap_stake_threshold,
291        )
292        .into_iter()
293        .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
294        .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
295
296        good_nodes.iter().for_each(|(idx, hostname, stake)| {
297            tracing::debug!(
298                "Good node {hostname} with stake {stake} has score {} for {:?}",
299                reputation_scores.scores_per_authority[idx.to_owned()],
300                reputation_scores.commit_range,
301            );
302        });
303
304        bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
305            tracing::debug!(
306                "Bad node {hostname} with stake {stake} has score {} for {:?}",
307                reputation_scores.scores_per_authority[idx.to_owned()],
308                reputation_scores.commit_range,
309            );
310        });
311
312        tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
313
314        Self {
315            good_nodes,
316            bad_nodes,
317            reputation_scores,
318        }
319    }
320
321    /// Checks whether the provided leader is a bad performer and needs to be
322    /// swapped in the schedule with a good performer. If not, then the method
323    /// returns None. Otherwise the leader to swap with is returned instead. The
324    /// `leader_round` & `leader_offset` represents the DAG slot on which the
325    /// provided `AuthorityIndex` is a leader on and is used as a seed to random
326    /// function in order to calculate the good node that will swap in that round
327    /// with the bad node. We are intentionally not doing weighted randomness as
328    /// we want to give to all the good nodes equal opportunity to get swapped
329    /// with bad nodes and nothave one node with enough stake end up swapping
330    /// bad nodes more frequently than the others on the final schedule.
331    pub(crate) fn swap(
332        &self,
333        leader: AuthorityIndex,
334        leader_round: Round,
335        leader_offset: u32,
336    ) -> Option<AuthorityIndex> {
337        if self.bad_nodes.contains_key(&leader) {
338            // TODO: Re-work swap for the multileader case
339            assert!(
340                leader_offset == 0,
341                "Swap for multi-leader case not implemented yet."
342            );
343            let mut seed_bytes = [0u8; 32];
344            seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
345            seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
346            let mut rng = StdRng::from_seed(seed_bytes);
347
348            let (idx, _hostname, _stake) = self
349                .good_nodes
350                .choose(&mut rng)
351                .expect("There should be at least one good node available");
352
353            tracing::trace!(
354                "Swapping bad leader {} -> {} for round {}",
355                leader,
356                idx,
357                leader_round
358            );
359
360            return Some(*idx);
361        }
362        None
363    }
364
365    /// Retrieves the first nodes provided by the iterator `authorities` until the
366    /// `stake_threshold` has been reached. The `stake_threshold` should be between
367    /// [0, 100] and expresses the percentage of stake that is considered the cutoff.
368    /// It's the caller's responsibility to ensure that the elements of the `authorities`
369    /// input is already sorted.
370    fn retrieve_first_nodes<'a>(
371        context: Arc<Context>,
372        authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
373        stake_threshold: u64,
374    ) -> Vec<(AuthorityIndex, String, Stake)> {
375        let mut filtered_authorities = Vec::new();
376
377        let mut stake = 0;
378        for &(authority_idx, _score) in authorities {
379            stake += context.committee.stake(authority_idx);
380
381            // If the total accumulated stake has surpassed the stake threshold
382            // then we omit this last authority and we exit the loop. Important to
383            // note that this means if the threshold is too low we may not have
384            // any nodes returned.
385            if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
386                break;
387            }
388
389            let authority = context.committee.authority(authority_idx);
390            filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
391        }
392
393        filtered_authorities
394    }
395}
396
397impl Debug for LeaderSwapTable {
398    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
399        f.write_str(&format!(
400            "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
401            self.reputation_scores.commit_range,
402            self.good_nodes
403                .iter()
404                .map(|(idx, _hostname, _stake)| idx.to_owned())
405                .collect::<Vec<AuthorityIndex>>(),
406            self.good_nodes
407                .iter()
408                .map(|(_idx, _hostname, stake)| stake)
409                .sum::<Stake>(),
410            self.bad_nodes.keys().map(|idx| idx.to_owned()),
411            self.bad_nodes
412                .values()
413                .map(|(_hostname, stake)| stake)
414                .sum::<Stake>(),
415        ))
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
422
423    use super::*;
424    use crate::{
425        block::{TestBlock, VerifiedBlock},
426        commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
427        storage::{Store, WriteBatch, mem_store::MemStore},
428        test_dag_builder::DagBuilder,
429    };
430
431    #[tokio::test]
432    async fn test_elect_leader() {
433        let context = Arc::new(Context::new_for_test(4).0);
434        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
435
436        assert_eq!(
437            leader_schedule.elect_leader(0, 0),
438            AuthorityIndex::new_for_test(0)
439        );
440        assert_eq!(
441            leader_schedule.elect_leader(1, 0),
442            AuthorityIndex::new_for_test(1)
443        );
444        assert_eq!(
445            leader_schedule.elect_leader(5, 0),
446            AuthorityIndex::new_for_test(1)
447        );
448        // ensure we elect different leaders for the same round for the multi-leader case
449        assert_ne!(
450            leader_schedule.elect_leader_stake_based(1, 1),
451            leader_schedule.elect_leader_stake_based(1, 2)
452        );
453    }
454
455    #[tokio::test]
456    async fn test_elect_leader_stake_based() {
457        let context = Arc::new(Context::new_for_test(4).0);
458        let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
459
460        assert_eq!(
461            leader_schedule.elect_leader_stake_based(0, 0),
462            AuthorityIndex::new_for_test(1)
463        );
464        assert_eq!(
465            leader_schedule.elect_leader_stake_based(1, 0),
466            AuthorityIndex::new_for_test(1)
467        );
468        assert_eq!(
469            leader_schedule.elect_leader_stake_based(5, 0),
470            AuthorityIndex::new_for_test(3)
471        );
472        // ensure we elect different leaders for the same round for the multi-leader case
473        assert_ne!(
474            leader_schedule.elect_leader_stake_based(1, 1),
475            leader_schedule.elect_leader_stake_based(1, 2)
476        );
477    }
478
479    #[tokio::test]
480    async fn test_leader_schedule_from_store() {
481        telemetry_subscribers::init_for_testing();
482        let mut context = Context::new_for_test(4).0;
483        context
484            .protocol_config
485            .set_bad_nodes_stake_threshold_for_testing(33);
486        let context = Arc::new(context);
487        let store = Arc::new(MemStore::new());
488
489        // Populate fully connected test blocks for round 0 ~ 11, authorities 0 ~ 3.
490        let mut dag_builder = DagBuilder::new(context.clone());
491        dag_builder.layers(1..=11).build();
492        let mut subdags = vec![];
493        let mut expected_commits = vec![];
494        let mut blocks_to_write = vec![];
495
496        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
497            for block in sub_dag.blocks.iter() {
498                blocks_to_write.push(block.clone());
499            }
500            expected_commits.push(commit);
501            subdags.push(sub_dag);
502        }
503
504        // The CommitInfo for the first 10 commits are written to store. This is the
505        // info that LeaderSchedule will be recovered from
506        let commit_range = (1..=10).into();
507        let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
508        let committed_rounds = vec![9, 9, 10, 9];
509        let commit_ref = expected_commits[9].reference();
510        let commit_info = CommitInfo {
511            reputation_scores,
512            committed_rounds,
513        };
514
515        // CommitIndex '11' will be written to store. This should result in the cached
516        // last_committed_rounds & unscored subdags in DagState to be updated with the
517        // latest commit information on recovery.
518        store
519            .write(
520                WriteBatch::default()
521                    .commit_info(vec![(commit_ref, commit_info)])
522                    .blocks(blocks_to_write)
523                    .commits(expected_commits),
524            )
525            .unwrap();
526
527        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
528
529        // Check that DagState recovery from stored CommitInfo worked correctly
530        assert_eq!(
531            dag_builder.last_committed_rounds.clone(),
532            dag_state.read().last_committed_rounds()
533        );
534        assert_eq!(1, dag_state.read().scoring_subdags_count());
535        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
536        let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
537        assert_eq!(recovered_scores, expected_scores);
538
539        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
540
541        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
542        let leader_swap_table = leader_schedule.leader_swap_table.read();
543        assert_eq!(leader_swap_table.good_nodes.len(), 1);
544        assert_eq!(
545            leader_swap_table.good_nodes[0].0,
546            AuthorityIndex::new_for_test(0)
547        );
548        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
549        assert!(
550            leader_swap_table
551                .bad_nodes
552                .contains_key(&AuthorityIndex::new_for_test(2)),
553            "{:?}",
554            leader_swap_table.bad_nodes
555        );
556    }
557
558    #[tokio::test]
559    async fn test_leader_schedule_from_store_no_commits() {
560        telemetry_subscribers::init_for_testing();
561        let mut context = Context::new_for_test(4).0;
562        context
563            .protocol_config
564            .set_bad_nodes_stake_threshold_for_testing(33);
565        let context = Arc::new(context);
566        let store = Arc::new(MemStore::new());
567
568        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
569
570        let expected_last_committed_rounds = vec![0, 0, 0, 0];
571
572        // Check that DagState recovery from stored CommitInfo worked correctly
573        assert_eq!(
574            expected_last_committed_rounds,
575            dag_state.read().last_committed_rounds()
576        );
577        assert_eq!(0, dag_state.read().scoring_subdags_count());
578
579        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
580
581        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
582        let leader_swap_table = leader_schedule.leader_swap_table.read();
583        assert_eq!(leader_swap_table.good_nodes.len(), 0);
584        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
585    }
586
587    #[tokio::test]
588    async fn test_leader_schedule_from_store_no_commit_info() {
589        telemetry_subscribers::init_for_testing();
590        let mut context = Context::new_for_test(4).0;
591        context
592            .protocol_config
593            .set_bad_nodes_stake_threshold_for_testing(33);
594        let context = Arc::new(context);
595        let store = Arc::new(MemStore::new());
596
597        // Populate fully connected test blocks for round 0 ~ 2, authorities 0 ~ 3.
598        let mut dag_builder = DagBuilder::new(context.clone());
599        dag_builder.layers(1..=2).build();
600
601        let mut expected_scored_subdags = vec![];
602        let mut expected_commits = vec![];
603        let mut blocks_to_write = vec![];
604
605        for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
606            for block in sub_dag.blocks.iter() {
607                blocks_to_write.push(block.clone());
608            }
609            expected_commits.push(commit);
610            expected_scored_subdags.push(sub_dag);
611        }
612
613        // The CommitInfo for the first 2 commits are written to store. 10 commits
614        // would have been required for a leader schedule update so at this point
615        // no commit info should have been persisted and no leader schedule should
616        // be recovered. However dag state should have properly recovered the
617        // unscored subdags & last committed rounds.
618        store
619            .write(
620                WriteBatch::default()
621                    .blocks(blocks_to_write)
622                    .commits(expected_commits),
623            )
624            .unwrap();
625
626        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
627
628        // Check that DagState recovery from stored CommitInfo worked correctly
629        assert_eq!(
630            dag_builder.last_committed_rounds.clone(),
631            dag_state.read().last_committed_rounds()
632        );
633        assert_eq!(
634            expected_scored_subdags.len(),
635            dag_state.read().scoring_subdags_count()
636        );
637        let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
638        let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
639        assert_eq!(recovered_scores, expected_scores);
640
641        let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
642
643        // Check that LeaderSchedule recovery from stored CommitInfo worked correctly
644        let leader_swap_table = leader_schedule.leader_swap_table.read();
645        assert_eq!(leader_swap_table.good_nodes.len(), 0);
646        assert_eq!(leader_swap_table.bad_nodes.len(), 0);
647    }
648
649    #[tokio::test]
650    async fn test_leader_schedule_commits_until_leader_schedule_update() {
651        telemetry_subscribers::init_for_testing();
652        let context = Arc::new(Context::new_for_test(4).0);
653        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
654
655        let dag_state = Arc::new(RwLock::new(DagState::new(
656            context.clone(),
657            Arc::new(MemStore::new()),
658        )));
659        let unscored_subdags = vec![CommittedSubDag::new(
660            BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
661            vec![],
662            context.clock.timestamp_utc_ms(),
663            CommitRef::new(1, CommitDigest::MIN),
664        )];
665        dag_state.write().add_scoring_subdags(unscored_subdags);
666
667        let commits_until_leader_schedule_update =
668            leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
669        assert_eq!(commits_until_leader_schedule_update, 299);
670    }
671
672    #[tokio::test]
673    async fn test_leader_schedule_update_leader_schedule() {
674        telemetry_subscribers::init_for_testing();
675        let mut context = Context::new_for_test(4).0;
676        context
677            .protocol_config
678            .set_bad_nodes_stake_threshold_for_testing(33);
679        let context = Arc::new(context);
680        let leader_schedule = Arc::new(LeaderSchedule::new(
681            context.clone(),
682            LeaderSwapTable::default(),
683        ));
684        let dag_state = Arc::new(RwLock::new(DagState::new(
685            context.clone(),
686            Arc::new(MemStore::new()),
687        )));
688
689        // Populate fully connected test blocks for round 0 ~ 4, authorities 0 ~ 3.
690        let max_round: u32 = 4;
691        let num_authorities: u32 = 4;
692
693        let mut blocks = Vec::new();
694        let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
695            .committee
696            .authorities()
697            .map(|index| {
698                let author_idx = index.0.value() as u32;
699                let block = TestBlock::new(0, author_idx).build();
700                VerifiedBlock::new_for_test(block)
701            })
702            .map(|block| (block.reference(), block))
703            .unzip();
704        blocks.extend(genesis);
705
706        let mut ancestors = genesis_references;
707        let mut leader = None;
708        for round in 1..=max_round {
709            let mut new_ancestors = vec![];
710            for author in 0..num_authorities {
711                let base_ts = round as BlockTimestampMs * 1000;
712                let block = VerifiedBlock::new_for_test(
713                    TestBlock::new(round, author)
714                        .set_timestamp_ms(base_ts + (author + round) as u64)
715                        .set_ancestors(ancestors.clone())
716                        .build(),
717                );
718                new_ancestors.push(block.reference());
719
720                // Simulate referenced block which was part of another committed
721                // subdag.
722                if round == 3 && author == 0 {
723                    tracing::info!("Skipping {block} in committed subdags blocks");
724                    continue;
725                }
726
727                blocks.push(block.clone());
728
729                // only write one block for the final round, which is the leader
730                // of the committed subdag.
731                if round == max_round {
732                    leader = Some(block.clone());
733                    break;
734                }
735            }
736            ancestors = new_ancestors;
737        }
738
739        let leader_block = leader.unwrap();
740        let leader_ref = leader_block.reference();
741        let commit_index = 1;
742
743        let last_commit = TrustedCommit::new_for_test(
744            commit_index,
745            CommitDigest::MIN,
746            context.clock.timestamp_utc_ms(),
747            leader_ref,
748            blocks
749                .iter()
750                .map(|block| block.reference())
751                .collect::<Vec<_>>(),
752        );
753
754        let unscored_subdags = vec![CommittedSubDag::new(
755            leader_ref,
756            blocks,
757            context.clock.timestamp_utc_ms(),
758            last_commit.reference(),
759        )];
760
761        let mut dag_state_write = dag_state.write();
762        dag_state_write.set_last_commit(last_commit);
763        dag_state_write.add_scoring_subdags(unscored_subdags);
764        drop(dag_state_write);
765
766        assert_eq!(
767            leader_schedule.elect_leader(4, 0),
768            AuthorityIndex::new_for_test(0)
769        );
770
771        leader_schedule.update_leader_schedule_v2(&dag_state);
772
773        let leader_swap_table = leader_schedule.leader_swap_table.read();
774        assert_eq!(leader_swap_table.good_nodes.len(), 1);
775        assert_eq!(
776            leader_swap_table.good_nodes[0].0,
777            AuthorityIndex::new_for_test(2)
778        );
779        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
780        assert!(
781            leader_swap_table
782                .bad_nodes
783                .contains_key(&AuthorityIndex::new_for_test(0))
784        );
785        assert_eq!(
786            leader_schedule.elect_leader(4, 0),
787            AuthorityIndex::new_for_test(2)
788        );
789    }
790
791    #[tokio::test]
792    async fn test_leader_swap_table() {
793        telemetry_subscribers::init_for_testing();
794        let context = Arc::new(Context::new_for_test(4).0);
795
796        let swap_stake_threshold = 33;
797        let reputation_scores = ReputationScores::new(
798            (0..=10).into(),
799            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
800        );
801        let leader_swap_table =
802            LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
803
804        assert_eq!(leader_swap_table.good_nodes.len(), 1);
805        assert_eq!(
806            leader_swap_table.good_nodes[0].0,
807            AuthorityIndex::new_for_test(3)
808        );
809        assert_eq!(leader_swap_table.bad_nodes.len(), 1);
810        assert!(
811            leader_swap_table
812                .bad_nodes
813                .contains_key(&AuthorityIndex::new_for_test(0))
814        );
815    }
816
817    #[tokio::test]
818    async fn test_leader_swap_table_swap() {
819        telemetry_subscribers::init_for_testing();
820        let context = Arc::new(Context::new_for_test(4).0);
821
822        let swap_stake_threshold = 33;
823        let reputation_scores = ReputationScores::new(
824            (0..=10).into(),
825            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
826        );
827        let leader_swap_table =
828            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
829
830        // Test swapping a bad leader
831        let leader = AuthorityIndex::new_for_test(0);
832        let leader_round = 1;
833        let leader_offset = 0;
834        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
835        assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
836
837        // Test not swapping a good leader
838        let leader = AuthorityIndex::new_for_test(1);
839        let leader_round = 1;
840        let leader_offset = 0;
841        let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
842        assert_eq!(swapped_leader, None);
843    }
844
845    #[tokio::test]
846    async fn test_leader_swap_table_retrieve_first_nodes() {
847        telemetry_subscribers::init_for_testing();
848        let context = Arc::new(Context::new_for_test(4).0);
849
850        let authorities = [
851            (AuthorityIndex::new_for_test(0), 1),
852            (AuthorityIndex::new_for_test(1), 2),
853            (AuthorityIndex::new_for_test(2), 3),
854            (AuthorityIndex::new_for_test(3), 4),
855        ];
856
857        let stake_threshold = 50;
858        let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
859            context.clone(),
860            authorities.iter(),
861            stake_threshold,
862        );
863
864        // Test setup includes 4 validators with even stake. Therefore with a
865        // stake_threshold of 50% we should see 2 validators filtered.
866        assert_eq!(filtered_authorities.len(), 2);
867        let authority_0_idx = AuthorityIndex::new_for_test(0);
868        let authority_0 = context.committee.authority(authority_0_idx);
869        assert!(filtered_authorities.contains(&(
870            authority_0_idx,
871            authority_0.hostname.clone(),
872            authority_0.stake
873        )));
874        let authority_1_idx = AuthorityIndex::new_for_test(1);
875        let authority_1 = context.committee.authority(authority_1_idx);
876        assert!(filtered_authorities.contains(&(
877            authority_1_idx,
878            authority_1.hostname.clone(),
879            authority_1.stake
880        )));
881    }
882
883    #[tokio::test]
884    #[should_panic(
885        expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
886    )]
887    async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
888        telemetry_subscribers::init_for_testing();
889        let context = Arc::new(Context::new_for_test(4).0);
890
891        let swap_stake_threshold = 34;
892        let reputation_scores = ReputationScores::new(
893            (0..=10).into(),
894            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
895        );
896        LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
897    }
898
899    #[tokio::test]
900    async fn test_update_leader_swap_table() {
901        telemetry_subscribers::init_for_testing();
902        let context = Arc::new(Context::new_for_test(4).0);
903
904        let swap_stake_threshold = 33;
905        let reputation_scores = ReputationScores::new(
906            (1..=10).into(),
907            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
908        );
909        let leader_swap_table =
910            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
911
912        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
913
914        // Update leader from brand new schedule to first real schedule
915        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
916
917        let reputation_scores = ReputationScores::new(
918            (11..=20).into(),
919            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
920        );
921        let leader_swap_table =
922            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
923
924        // Update leader from old swap table to new valid swap table
925        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
926    }
927
928    #[tokio::test]
929    #[should_panic(
930        expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
931    )]
932    async fn test_update_bad_leader_swap_table() {
933        telemetry_subscribers::init_for_testing();
934        let context = Arc::new(Context::new_for_test(4).0);
935
936        let swap_stake_threshold = 33;
937        let reputation_scores = ReputationScores::new(
938            (1..=10).into(),
939            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
940        );
941        let leader_swap_table =
942            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
943
944        let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
945
946        // Update leader from brand new schedule to first real schedule
947        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
948
949        let reputation_scores = ReputationScores::new(
950            (11..=20).into(),
951            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
952        );
953        let leader_swap_table =
954            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
955
956        // Update leader from old swap table to new valid swap table
957        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
958
959        let reputation_scores = ReputationScores::new(
960            (21..=25).into(),
961            (0..4).map(|i| i as u64).collect::<Vec<_>>(),
962        );
963        let leader_swap_table =
964            LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
965
966        // Update leader from old swap table to new invalid swap table
967        leader_schedule.update_leader_swap_table(leader_swap_table.clone());
968    }
969}