1use std::{
5 collections::BTreeMap,
6 fmt::{Debug, Formatter},
7 sync::Arc,
8};
9
10use consensus_config::{AuthorityIndex, Stake};
11use consensus_types::block::Round;
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16 CommitIndex, commit::CommitRange, context::Context, dag_state::DagState,
17 leader_scoring::ReputationScores,
18};
19
20#[derive(Clone)]
24pub(crate) struct LeaderSchedule {
25 pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
26 context: Arc<Context>,
27 num_commits_per_schedule: u64,
28}
29
30impl LeaderSchedule {
31 #[cfg(not(msim))]
35 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
36 #[cfg(msim)]
37 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
38
39 pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
40 Self {
41 context,
42 num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
43 leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
44 }
45 }
46
47 #[cfg(test)]
48 pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
49 self.num_commits_per_schedule = num_commits_per_schedule;
50 self
51 }
52
53 pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
56 let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
57 LeaderSwapTable::default(),
58 |(last_commit_ref, last_commit_info)| {
59 LeaderSwapTable::new(
60 context.clone(),
61 last_commit_ref.index,
62 last_commit_info.reputation_scores,
63 )
64 },
65 );
66
67 tracing::info!(
68 "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
69 dag_state.read().scoring_subdags_count(),
70 );
71
72 Self::new(context, leader_swap_table)
74 }
75
76 pub(crate) fn commits_until_leader_schedule_update(
77 &self,
78 dag_state: Arc<RwLock<DagState>>,
79 ) -> usize {
80 let subdag_count = dag_state.read().scoring_subdags_count() as u64;
81
82 assert!(
83 subdag_count <= self.num_commits_per_schedule,
84 "Committed subdags count exceeds the number of commits per schedule"
85 );
86 self.num_commits_per_schedule
87 .checked_sub(subdag_count)
88 .unwrap() as usize
89 }
90
91 pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
96 dag_state.read().is_scoring_subdag_empty()
97 }
98
99 pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
100 let _s = self
101 .context
102 .metrics
103 .node_metrics
104 .scope_processing_time
105 .with_label_values(&["LeaderSchedule::update_leader_schedule"])
106 .start_timer();
107
108 let (reputation_scores, last_commit_index) = {
109 let dag_state = dag_state.read();
110 let reputation_scores = dag_state.calculate_scoring_subdag_scores();
111
112 let last_commit_index = dag_state.scoring_subdag_commit_range();
113
114 (reputation_scores, last_commit_index)
115 };
116
117 {
118 let mut dag_state = dag_state.write();
119 dag_state.clear_scoring_subdag();
121 dag_state.add_commit_info(reputation_scores.clone());
123 }
124
125 self.update_leader_swap_table(LeaderSwapTable::new(
126 self.context.clone(),
127 last_commit_index,
128 reputation_scores.clone(),
129 ));
130
131 reputation_scores.update_metrics(self.context.clone());
132
133 self.context
134 .metrics
135 .node_metrics
136 .num_of_bad_nodes
137 .set(self.leader_swap_table.read().bad_nodes.len() as i64);
138 }
139
140 pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
141 cfg_if::cfg_if! {
142 if #[cfg(test)] {
145 let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
146 let table = self.leader_swap_table.read();
147 table.swap(leader, round, leader_offset).unwrap_or(leader)
148 } else {
149 let leader = self.elect_leader_stake_based(round, leader_offset);
150 let table = self.leader_swap_table.read();
151 table.swap(leader, round, leader_offset).unwrap_or(leader)
152 }
153 }
154 }
155
156 pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
157 assert!((offset as usize) < self.context.committee.size());
158
159 let mut seed_bytes = [0u8; 32];
164 seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
165 let mut rng = StdRng::from_seed(seed_bytes);
166
167 let choices = self
168 .context
169 .committee
170 .authorities()
171 .map(|(index, authority)| (index, authority.stake as f32))
172 .collect::<Vec<_>>();
173
174 *choices
175 .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
176 .expect("Weighted choice error: stake values incorrect!")
177 .skip(offset as usize)
178 .map(|(index, _)| index)
179 .next()
180 .unwrap()
181 }
182
183 fn update_leader_swap_table(&self, table: LeaderSwapTable) {
187 let read = self.leader_swap_table.read();
188 let old_commit_range = &read.reputation_scores.commit_range;
189 let new_commit_range = &table.reputation_scores.commit_range;
190
191 if *old_commit_range != CommitRange::default() {
196 assert!(
197 old_commit_range.is_next_range(new_commit_range)
198 && old_commit_range.is_equal_size(new_commit_range),
199 "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
200 );
201 }
202 drop(read);
203
204 tracing::trace!("Updating {table:?}");
205
206 let mut write = self.leader_swap_table.write();
207 *write = table;
208 }
209}
210
211#[derive(Default, Clone)]
212pub(crate) struct LeaderSwapTable {
213 pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
218
219 pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
225
226 pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
229
230 pub(crate) reputation_scores: ReputationScores,
234}
235
236impl LeaderSwapTable {
237 pub(crate) fn new(
242 context: Arc<Context>,
243 commit_index: CommitIndex,
244 reputation_scores: ReputationScores,
245 ) -> Self {
246 let swap_stake_threshold = context
247 .protocol_config
248 .consensus_bad_nodes_stake_threshold();
249 Self::new_inner(
250 context,
251 swap_stake_threshold,
252 commit_index,
253 reputation_scores,
254 )
255 }
256
257 fn new_inner(
258 context: Arc<Context>,
259 #[allow(unused_variables)] swap_stake_threshold: u64,
262 commit_index: CommitIndex,
263 reputation_scores: ReputationScores,
264 ) -> Self {
265 #[cfg(msim)]
266 let swap_stake_threshold = 33;
267
268 assert!(
269 (0..=33).contains(&swap_stake_threshold),
270 "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
271 );
272
273 if reputation_scores.scores_per_authority.is_empty() {
275 return Self::default();
276 }
277
278 let mut seed_bytes = [0u8; 32];
281 seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
282 let mut rng = StdRng::from_seed(seed_bytes);
283 let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
284 assert_eq!(authorities_by_score.len(), context.committee.size());
285 authorities_by_score.shuffle(&mut rng);
286 authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
288
289 let good_nodes = Self::retrieve_first_nodes(
291 context.clone(),
292 authorities_by_score.iter(),
293 swap_stake_threshold,
294 )
295 .into_iter()
296 .collect::<Vec<(AuthorityIndex, String, Stake)>>();
297
298 let bad_nodes = Self::retrieve_first_nodes(
302 context.clone(),
303 authorities_by_score.iter().rev(),
304 swap_stake_threshold,
305 )
306 .into_iter()
307 .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
308 .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
309
310 good_nodes.iter().for_each(|(idx, hostname, stake)| {
311 tracing::debug!(
312 "Good node {hostname} with stake {stake} has score {} for {:?}",
313 reputation_scores.scores_per_authority[idx.to_owned()],
314 reputation_scores.commit_range,
315 );
316 });
317
318 bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
319 tracing::debug!(
320 "Bad node {hostname} with stake {stake} has score {} for {:?}",
321 reputation_scores.scores_per_authority[idx.to_owned()],
322 reputation_scores.commit_range,
323 );
324 });
325
326 tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
327
328 Self {
329 good_nodes,
330 bad_nodes,
331 reputation_scores_desc: authorities_by_score,
332 reputation_scores,
333 }
334 }
335
336 pub(crate) fn swap(
347 &self,
348 leader: AuthorityIndex,
349 leader_round: Round,
350 leader_offset: u32,
351 ) -> Option<AuthorityIndex> {
352 if self.bad_nodes.contains_key(&leader) {
353 assert!(
355 leader_offset == 0,
356 "Swap for multi-leader case not implemented yet."
357 );
358 let mut seed_bytes = [0u8; 32];
359 seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
360 seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
361 let mut rng = StdRng::from_seed(seed_bytes);
362
363 let (idx, _hostname, _stake) = self
364 .good_nodes
365 .choose(&mut rng)
366 .expect("There should be at least one good node available");
367
368 tracing::trace!(
369 "Swapping bad leader {} -> {} for round {}",
370 leader,
371 idx,
372 leader_round
373 );
374
375 return Some(*idx);
376 }
377 None
378 }
379
380 fn retrieve_first_nodes<'a>(
386 context: Arc<Context>,
387 authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
388 stake_threshold: u64,
389 ) -> Vec<(AuthorityIndex, String, Stake)> {
390 let mut filtered_authorities = Vec::new();
391
392 let mut stake = 0;
393 for &(authority_idx, _score) in authorities {
394 stake += context.committee.stake(authority_idx);
395
396 if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
401 break;
402 }
403
404 let authority = context.committee.authority(authority_idx);
405 filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
406 }
407
408 filtered_authorities
409 }
410}
411
412impl Debug for LeaderSwapTable {
413 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
414 f.write_str(&format!(
415 "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
416 self.reputation_scores.commit_range,
417 self.good_nodes
418 .iter()
419 .map(|(idx, _hostname, _stake)| idx.to_owned())
420 .collect::<Vec<AuthorityIndex>>(),
421 self.good_nodes
422 .iter()
423 .map(|(_idx, _hostname, stake)| stake)
424 .sum::<Stake>(),
425 self.bad_nodes.keys().map(|idx| idx.to_owned()),
426 self.bad_nodes
427 .values()
428 .map(|(_hostname, stake)| stake)
429 .sum::<Stake>(),
430 ))
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
437
438 use super::*;
439 use crate::{
440 block::{TestBlock, VerifiedBlock},
441 commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
442 storage::{Store, WriteBatch, mem_store::MemStore},
443 test_dag_builder::DagBuilder,
444 };
445
446 #[tokio::test]
447 async fn test_elect_leader() {
448 let context = Arc::new(Context::new_for_test(4).0);
449 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
450
451 assert_eq!(
452 leader_schedule.elect_leader(0, 0),
453 AuthorityIndex::new_for_test(0)
454 );
455 assert_eq!(
456 leader_schedule.elect_leader(1, 0),
457 AuthorityIndex::new_for_test(1)
458 );
459 assert_eq!(
460 leader_schedule.elect_leader(5, 0),
461 AuthorityIndex::new_for_test(1)
462 );
463 assert_ne!(
465 leader_schedule.elect_leader_stake_based(1, 1),
466 leader_schedule.elect_leader_stake_based(1, 2)
467 );
468 }
469
470 #[tokio::test]
471 async fn test_elect_leader_stake_based() {
472 let context = Arc::new(Context::new_for_test(4).0);
473 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
474
475 assert_eq!(
476 leader_schedule.elect_leader_stake_based(0, 0),
477 AuthorityIndex::new_for_test(1)
478 );
479 assert_eq!(
480 leader_schedule.elect_leader_stake_based(1, 0),
481 AuthorityIndex::new_for_test(1)
482 );
483 assert_eq!(
484 leader_schedule.elect_leader_stake_based(5, 0),
485 AuthorityIndex::new_for_test(3)
486 );
487 assert_ne!(
489 leader_schedule.elect_leader_stake_based(1, 1),
490 leader_schedule.elect_leader_stake_based(1, 2)
491 );
492 }
493
494 #[tokio::test]
495 async fn test_leader_schedule_from_store() {
496 telemetry_subscribers::init_for_testing();
497 let mut context = Context::new_for_test(4).0;
498 context
499 .protocol_config
500 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
501 let context = Arc::new(context);
502 let store = Arc::new(MemStore::new());
503
504 let mut dag_builder = DagBuilder::new(context.clone());
506 dag_builder.layers(1..=11).build();
507 let mut subdags = vec![];
508 let mut expected_commits = vec![];
509 let mut blocks_to_write = vec![];
510
511 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
512 for block in sub_dag.blocks.iter() {
513 blocks_to_write.push(block.clone());
514 }
515 expected_commits.push(commit);
516 subdags.push(sub_dag);
517 }
518
519 let commit_range = (1..=10).into();
522 let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
523 let committed_rounds = vec![9, 9, 10, 9];
524 let commit_ref = expected_commits[9].reference();
525 let commit_info = CommitInfo {
526 reputation_scores,
527 committed_rounds,
528 };
529
530 store
534 .write(
535 WriteBatch::default()
536 .commit_info(vec![(commit_ref, commit_info)])
537 .blocks(blocks_to_write)
538 .commits(expected_commits),
539 )
540 .unwrap();
541
542 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
543
544 assert_eq!(
546 dag_builder.last_committed_rounds.clone(),
547 dag_state.read().last_committed_rounds()
548 );
549 assert_eq!(1, dag_state.read().scoring_subdags_count());
550 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
551 let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
552 assert_eq!(recovered_scores, expected_scores);
553
554 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
555
556 let leader_swap_table = leader_schedule.leader_swap_table.read();
558 assert_eq!(leader_swap_table.good_nodes.len(), 1);
559 assert_eq!(
560 leader_swap_table.good_nodes[0].0,
561 AuthorityIndex::new_for_test(0)
562 );
563 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
564 assert!(
565 leader_swap_table
566 .bad_nodes
567 .contains_key(&AuthorityIndex::new_for_test(2)),
568 "{:?}",
569 leader_swap_table.bad_nodes
570 );
571 }
572
573 #[tokio::test]
574 async fn test_leader_schedule_from_store_no_commits() {
575 telemetry_subscribers::init_for_testing();
576 let mut context = Context::new_for_test(4).0;
577 context
578 .protocol_config
579 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
580 let context = Arc::new(context);
581 let store = Arc::new(MemStore::new());
582
583 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
584
585 let expected_last_committed_rounds = vec![0, 0, 0, 0];
586
587 assert_eq!(
589 expected_last_committed_rounds,
590 dag_state.read().last_committed_rounds()
591 );
592 assert_eq!(0, dag_state.read().scoring_subdags_count());
593
594 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
595
596 let leader_swap_table = leader_schedule.leader_swap_table.read();
598 assert_eq!(leader_swap_table.good_nodes.len(), 0);
599 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
600 }
601
602 #[tokio::test]
603 async fn test_leader_schedule_from_store_no_commit_info() {
604 telemetry_subscribers::init_for_testing();
605 let mut context = Context::new_for_test(4).0;
606 context
607 .protocol_config
608 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
609 let context = Arc::new(context);
610 let store = Arc::new(MemStore::new());
611
612 let mut dag_builder = DagBuilder::new(context.clone());
614 dag_builder.layers(1..=2).build();
615
616 let mut expected_scored_subdags = vec![];
617 let mut expected_commits = vec![];
618 let mut blocks_to_write = vec![];
619
620 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
621 for block in sub_dag.blocks.iter() {
622 blocks_to_write.push(block.clone());
623 }
624 expected_commits.push(commit);
625 expected_scored_subdags.push(sub_dag);
626 }
627
628 store
634 .write(
635 WriteBatch::default()
636 .blocks(blocks_to_write)
637 .commits(expected_commits),
638 )
639 .unwrap();
640
641 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
642
643 assert_eq!(
645 dag_builder.last_committed_rounds.clone(),
646 dag_state.read().last_committed_rounds()
647 );
648 assert_eq!(
649 expected_scored_subdags.len(),
650 dag_state.read().scoring_subdags_count()
651 );
652 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
653 let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
654 assert_eq!(recovered_scores, expected_scores);
655
656 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
657
658 let leader_swap_table = leader_schedule.leader_swap_table.read();
660 assert_eq!(leader_swap_table.good_nodes.len(), 0);
661 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
662 }
663
664 #[tokio::test]
665 async fn test_leader_schedule_commits_until_leader_schedule_update() {
666 telemetry_subscribers::init_for_testing();
667 let context = Arc::new(Context::new_for_test(4).0);
668 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
669
670 let dag_state = Arc::new(RwLock::new(DagState::new(
671 context.clone(),
672 Arc::new(MemStore::new()),
673 )));
674 let unscored_subdags = vec![CommittedSubDag::new(
675 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
676 vec![],
677 context.clock.timestamp_utc_ms(),
678 CommitRef::new(1, CommitDigest::MIN),
679 true,
680 )];
681 dag_state.write().add_scoring_subdags(unscored_subdags);
682
683 let commits_until_leader_schedule_update =
684 leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
685 assert_eq!(commits_until_leader_schedule_update, 299);
686 }
687
688 #[tokio::test]
689 async fn test_leader_schedule_update_leader_schedule() {
690 telemetry_subscribers::init_for_testing();
691 let mut context = Context::new_for_test(4).0;
692 context
693 .protocol_config
694 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
695 let context = Arc::new(context);
696 let leader_schedule = Arc::new(LeaderSchedule::new(
697 context.clone(),
698 LeaderSwapTable::default(),
699 ));
700 let dag_state = Arc::new(RwLock::new(DagState::new(
701 context.clone(),
702 Arc::new(MemStore::new()),
703 )));
704
705 let max_round: u32 = 4;
707 let num_authorities: u32 = 4;
708
709 let mut blocks = Vec::new();
710 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
711 .committee
712 .authorities()
713 .map(|index| {
714 let author_idx = index.0.value() as u32;
715 let block = TestBlock::new(0, author_idx).build();
716 VerifiedBlock::new_for_test(block)
717 })
718 .map(|block| (block.reference(), block))
719 .unzip();
720 blocks.extend(genesis);
721
722 let mut ancestors = genesis_references;
723 let mut leader = None;
724 for round in 1..=max_round {
725 let mut new_ancestors = vec![];
726 for author in 0..num_authorities {
727 let base_ts = round as BlockTimestampMs * 1000;
728 let block = VerifiedBlock::new_for_test(
729 TestBlock::new(round, author)
730 .set_timestamp_ms(base_ts + (author + round) as u64)
731 .set_ancestors(ancestors.clone())
732 .build(),
733 );
734 new_ancestors.push(block.reference());
735
736 if round == 3 && author == 0 {
739 tracing::info!("Skipping {block} in committed subdags blocks");
740 continue;
741 }
742
743 blocks.push(block.clone());
744
745 if round == max_round {
748 leader = Some(block.clone());
749 break;
750 }
751 }
752 ancestors = new_ancestors;
753 }
754
755 let leader_block = leader.unwrap();
756 let leader_ref = leader_block.reference();
757 let commit_index = 1;
758
759 let last_commit = TrustedCommit::new_for_test(
760 commit_index,
761 CommitDigest::MIN,
762 context.clock.timestamp_utc_ms(),
763 leader_ref,
764 blocks
765 .iter()
766 .map(|block| block.reference())
767 .collect::<Vec<_>>(),
768 );
769
770 let unscored_subdags = vec![CommittedSubDag::new(
771 leader_ref,
772 blocks,
773 context.clock.timestamp_utc_ms(),
774 last_commit.reference(),
775 true,
776 )];
777
778 let mut dag_state_write = dag_state.write();
779 dag_state_write.set_last_commit(last_commit);
780 dag_state_write.add_scoring_subdags(unscored_subdags);
781 drop(dag_state_write);
782
783 assert_eq!(
784 leader_schedule.elect_leader(4, 0),
785 AuthorityIndex::new_for_test(0)
786 );
787
788 leader_schedule.update_leader_schedule_v2(&dag_state);
789
790 let leader_swap_table = leader_schedule.leader_swap_table.read();
791 assert_eq!(leader_swap_table.good_nodes.len(), 1);
792 assert_eq!(
793 leader_swap_table.good_nodes[0].0,
794 AuthorityIndex::new_for_test(2)
795 );
796 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
797 assert!(
798 leader_swap_table
799 .bad_nodes
800 .contains_key(&AuthorityIndex::new_for_test(0))
801 );
802 assert_eq!(
803 leader_schedule.elect_leader(4, 0),
804 AuthorityIndex::new_for_test(2)
805 );
806 }
807
808 #[tokio::test]
809 async fn test_leader_swap_table() {
810 telemetry_subscribers::init_for_testing();
811 let context = Arc::new(Context::new_for_test(4).0);
812
813 let swap_stake_threshold = 33;
814 let reputation_scores = ReputationScores::new(
815 (0..=10).into(),
816 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
817 );
818 let leader_swap_table =
819 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
820
821 assert_eq!(leader_swap_table.good_nodes.len(), 1);
822 assert_eq!(
823 leader_swap_table.good_nodes[0].0,
824 AuthorityIndex::new_for_test(3)
825 );
826 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
827 assert!(
828 leader_swap_table
829 .bad_nodes
830 .contains_key(&AuthorityIndex::new_for_test(0))
831 );
832 }
833
834 #[tokio::test]
835 async fn test_leader_swap_table_swap() {
836 telemetry_subscribers::init_for_testing();
837 let context = Arc::new(Context::new_for_test(4).0);
838
839 let swap_stake_threshold = 33;
840 let reputation_scores = ReputationScores::new(
841 (0..=10).into(),
842 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
843 );
844 let leader_swap_table =
845 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
846
847 let leader = AuthorityIndex::new_for_test(0);
849 let leader_round = 1;
850 let leader_offset = 0;
851 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
852 assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
853
854 let leader = AuthorityIndex::new_for_test(1);
856 let leader_round = 1;
857 let leader_offset = 0;
858 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
859 assert_eq!(swapped_leader, None);
860 }
861
862 #[tokio::test]
863 async fn test_leader_swap_table_retrieve_first_nodes() {
864 telemetry_subscribers::init_for_testing();
865 let context = Arc::new(Context::new_for_test(4).0);
866
867 let authorities = [
868 (AuthorityIndex::new_for_test(0), 1),
869 (AuthorityIndex::new_for_test(1), 2),
870 (AuthorityIndex::new_for_test(2), 3),
871 (AuthorityIndex::new_for_test(3), 4),
872 ];
873
874 let stake_threshold = 50;
875 let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
876 context.clone(),
877 authorities.iter(),
878 stake_threshold,
879 );
880
881 assert_eq!(filtered_authorities.len(), 2);
884 let authority_0_idx = AuthorityIndex::new_for_test(0);
885 let authority_0 = context.committee.authority(authority_0_idx);
886 assert!(filtered_authorities.contains(&(
887 authority_0_idx,
888 authority_0.hostname.clone(),
889 authority_0.stake
890 )));
891 let authority_1_idx = AuthorityIndex::new_for_test(1);
892 let authority_1 = context.committee.authority(authority_1_idx);
893 assert!(filtered_authorities.contains(&(
894 authority_1_idx,
895 authority_1.hostname.clone(),
896 authority_1.stake
897 )));
898 }
899
900 #[tokio::test]
901 #[should_panic(
902 expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
903 )]
904 async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
905 telemetry_subscribers::init_for_testing();
906 let context = Arc::new(Context::new_for_test(4).0);
907
908 let swap_stake_threshold = 34;
909 let reputation_scores = ReputationScores::new(
910 (0..=10).into(),
911 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
912 );
913 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
914 }
915
916 #[tokio::test]
917 async fn test_update_leader_swap_table() {
918 telemetry_subscribers::init_for_testing();
919 let context = Arc::new(Context::new_for_test(4).0);
920
921 let swap_stake_threshold = 33;
922 let reputation_scores = ReputationScores::new(
923 (1..=10).into(),
924 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
925 );
926 let leader_swap_table =
927 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
928
929 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
930
931 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
933
934 let reputation_scores = ReputationScores::new(
935 (11..=20).into(),
936 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
937 );
938 let leader_swap_table =
939 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
940
941 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
943 }
944
945 #[tokio::test]
946 #[should_panic(
947 expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
948 )]
949 async fn test_update_bad_leader_swap_table() {
950 telemetry_subscribers::init_for_testing();
951 let context = Arc::new(Context::new_for_test(4).0);
952
953 let swap_stake_threshold = 33;
954 let reputation_scores = ReputationScores::new(
955 (1..=10).into(),
956 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
957 );
958 let leader_swap_table =
959 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
960
961 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
962
963 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
965
966 let reputation_scores = ReputationScores::new(
967 (11..=20).into(),
968 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
969 );
970 let leader_swap_table =
971 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
972
973 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
975
976 let reputation_scores = ReputationScores::new(
977 (21..=25).into(),
978 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
979 );
980 let leader_swap_table =
981 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
982
983 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
985 }
986}