1use std::{
5 collections::BTreeMap,
6 fmt::{Debug, Formatter},
7 sync::Arc,
8};
9
10use consensus_config::{AuthorityIndex, Stake};
11use consensus_types::block::Round;
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16 CommitIndex, commit::CommitRange, context::Context, dag_state::DagState,
17 leader_scoring::ReputationScores,
18};
19
20#[derive(Clone)]
24pub(crate) struct LeaderSchedule {
25 pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
26 context: Arc<Context>,
27 num_commits_per_schedule: u64,
28}
29
30impl LeaderSchedule {
31 #[cfg(not(msim))]
35 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
36 #[cfg(msim)]
37 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
38
39 pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
40 Self {
41 context,
42 num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
43 leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
44 }
45 }
46
47 #[cfg(test)]
48 pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
49 self.num_commits_per_schedule = num_commits_per_schedule;
50 self
51 }
52
53 pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
56 let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
57 LeaderSwapTable::default(),
58 |(last_commit_ref, last_commit_info)| {
59 LeaderSwapTable::new(
60 context.clone(),
61 last_commit_ref.index,
62 last_commit_info.reputation_scores,
63 )
64 },
65 );
66
67 tracing::info!(
68 "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
69 dag_state.read().scoring_subdags_count(),
70 );
71
72 Self::new(context, leader_swap_table)
74 }
75
76 pub(crate) fn commits_until_leader_schedule_update(
77 &self,
78 dag_state: Arc<RwLock<DagState>>,
79 ) -> usize {
80 let subdag_count = dag_state.read().scoring_subdags_count() as u64;
81
82 assert!(
83 subdag_count <= self.num_commits_per_schedule,
84 "Committed subdags count exceeds the number of commits per schedule"
85 );
86 self.num_commits_per_schedule
87 .checked_sub(subdag_count)
88 .unwrap() as usize
89 }
90
91 pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
92 let _s = self
93 .context
94 .metrics
95 .node_metrics
96 .scope_processing_time
97 .with_label_values(&["LeaderSchedule::update_leader_schedule"])
98 .start_timer();
99
100 let (reputation_scores, last_commit_index) = {
101 let dag_state = dag_state.read();
102 let reputation_scores = dag_state.calculate_scoring_subdag_scores();
103
104 let last_commit_index = dag_state.scoring_subdag_commit_range();
105
106 (reputation_scores, last_commit_index)
107 };
108
109 {
110 let mut dag_state = dag_state.write();
111 dag_state.clear_scoring_subdag();
113 dag_state.add_commit_info(reputation_scores.clone());
115 }
116
117 self.update_leader_swap_table(LeaderSwapTable::new(
118 self.context.clone(),
119 last_commit_index,
120 reputation_scores.clone(),
121 ));
122
123 reputation_scores.update_metrics(self.context.clone());
124
125 self.context
126 .metrics
127 .node_metrics
128 .num_of_bad_nodes
129 .set(self.leader_swap_table.read().bad_nodes.len() as i64);
130 }
131
132 pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
133 cfg_if::cfg_if! {
134 if #[cfg(test)] {
137 let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
138 let table = self.leader_swap_table.read();
139 table.swap(leader, round, leader_offset).unwrap_or(leader)
140 } else {
141 let leader = self.elect_leader_stake_based(round, leader_offset);
142 let table = self.leader_swap_table.read();
143 table.swap(leader, round, leader_offset).unwrap_or(leader)
144 }
145 }
146 }
147
148 pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
149 assert!((offset as usize) < self.context.committee.size());
150
151 let mut seed_bytes = [0u8; 32];
156 seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
157 let mut rng = StdRng::from_seed(seed_bytes);
158
159 let choices = self
160 .context
161 .committee
162 .authorities()
163 .map(|(index, authority)| (index, authority.stake as f32))
164 .collect::<Vec<_>>();
165
166 *choices
167 .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
168 .expect("Weighted choice error: stake values incorrect!")
169 .skip(offset as usize)
170 .map(|(index, _)| index)
171 .next()
172 .unwrap()
173 }
174
175 fn update_leader_swap_table(&self, table: LeaderSwapTable) {
179 let read = self.leader_swap_table.read();
180 let old_commit_range = &read.reputation_scores.commit_range;
181 let new_commit_range = &table.reputation_scores.commit_range;
182
183 if *old_commit_range != CommitRange::default() {
188 assert!(
189 old_commit_range.is_next_range(new_commit_range)
190 && old_commit_range.is_equal_size(new_commit_range),
191 "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
192 );
193 }
194 drop(read);
195
196 tracing::trace!("Updating {table:?}");
197
198 let mut write = self.leader_swap_table.write();
199 *write = table;
200 }
201}
202
203#[derive(Default, Clone)]
204pub(crate) struct LeaderSwapTable {
205 pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
210
211 pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
217
218 pub(crate) reputation_scores: ReputationScores,
222}
223
224impl LeaderSwapTable {
225 pub(crate) fn new(
230 context: Arc<Context>,
231 commit_index: CommitIndex,
232 reputation_scores: ReputationScores,
233 ) -> Self {
234 let swap_stake_threshold = context.protocol_config.bad_nodes_stake_threshold();
235 Self::new_inner(
236 context,
237 swap_stake_threshold,
238 commit_index,
239 reputation_scores,
240 )
241 }
242
243 fn new_inner(
244 context: Arc<Context>,
245 #[allow(unused_variables)] swap_stake_threshold: u64,
248 commit_index: CommitIndex,
249 reputation_scores: ReputationScores,
250 ) -> Self {
251 #[cfg(msim)]
252 let swap_stake_threshold = 33;
253
254 assert!(
255 (0..=33).contains(&swap_stake_threshold),
256 "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
257 );
258
259 if reputation_scores.scores_per_authority.is_empty() {
261 return Self::default();
262 }
263
264 let mut seed_bytes = [0u8; 32];
267 seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
268 let mut rng = StdRng::from_seed(seed_bytes);
269 let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
270 assert_eq!(authorities_by_score.len(), context.committee.size());
271 authorities_by_score.shuffle(&mut rng);
272 authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
274
275 let good_nodes = Self::retrieve_first_nodes(
277 context.clone(),
278 authorities_by_score.iter(),
279 swap_stake_threshold,
280 )
281 .into_iter()
282 .collect::<Vec<(AuthorityIndex, String, Stake)>>();
283
284 let bad_nodes = Self::retrieve_first_nodes(
288 context.clone(),
289 authorities_by_score.iter().rev(),
290 swap_stake_threshold,
291 )
292 .into_iter()
293 .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
294 .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
295
296 good_nodes.iter().for_each(|(idx, hostname, stake)| {
297 tracing::debug!(
298 "Good node {hostname} with stake {stake} has score {} for {:?}",
299 reputation_scores.scores_per_authority[idx.to_owned()],
300 reputation_scores.commit_range,
301 );
302 });
303
304 bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
305 tracing::debug!(
306 "Bad node {hostname} with stake {stake} has score {} for {:?}",
307 reputation_scores.scores_per_authority[idx.to_owned()],
308 reputation_scores.commit_range,
309 );
310 });
311
312 tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
313
314 Self {
315 good_nodes,
316 bad_nodes,
317 reputation_scores,
318 }
319 }
320
321 pub(crate) fn swap(
332 &self,
333 leader: AuthorityIndex,
334 leader_round: Round,
335 leader_offset: u32,
336 ) -> Option<AuthorityIndex> {
337 if self.bad_nodes.contains_key(&leader) {
338 assert!(
340 leader_offset == 0,
341 "Swap for multi-leader case not implemented yet."
342 );
343 let mut seed_bytes = [0u8; 32];
344 seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
345 seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
346 let mut rng = StdRng::from_seed(seed_bytes);
347
348 let (idx, _hostname, _stake) = self
349 .good_nodes
350 .choose(&mut rng)
351 .expect("There should be at least one good node available");
352
353 tracing::trace!(
354 "Swapping bad leader {} -> {} for round {}",
355 leader,
356 idx,
357 leader_round
358 );
359
360 return Some(*idx);
361 }
362 None
363 }
364
365 fn retrieve_first_nodes<'a>(
371 context: Arc<Context>,
372 authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
373 stake_threshold: u64,
374 ) -> Vec<(AuthorityIndex, String, Stake)> {
375 let mut filtered_authorities = Vec::new();
376
377 let mut stake = 0;
378 for &(authority_idx, _score) in authorities {
379 stake += context.committee.stake(authority_idx);
380
381 if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
386 break;
387 }
388
389 let authority = context.committee.authority(authority_idx);
390 filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
391 }
392
393 filtered_authorities
394 }
395}
396
397impl Debug for LeaderSwapTable {
398 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
399 f.write_str(&format!(
400 "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
401 self.reputation_scores.commit_range,
402 self.good_nodes
403 .iter()
404 .map(|(idx, _hostname, _stake)| idx.to_owned())
405 .collect::<Vec<AuthorityIndex>>(),
406 self.good_nodes
407 .iter()
408 .map(|(_idx, _hostname, stake)| stake)
409 .sum::<Stake>(),
410 self.bad_nodes.keys().map(|idx| idx.to_owned()),
411 self.bad_nodes
412 .values()
413 .map(|(_hostname, stake)| stake)
414 .sum::<Stake>(),
415 ))
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
422
423 use super::*;
424 use crate::{
425 block::{TestBlock, VerifiedBlock},
426 commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
427 storage::{Store, WriteBatch, mem_store::MemStore},
428 test_dag_builder::DagBuilder,
429 };
430
431 #[tokio::test]
432 async fn test_elect_leader() {
433 let context = Arc::new(Context::new_for_test(4).0);
434 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
435
436 assert_eq!(
437 leader_schedule.elect_leader(0, 0),
438 AuthorityIndex::new_for_test(0)
439 );
440 assert_eq!(
441 leader_schedule.elect_leader(1, 0),
442 AuthorityIndex::new_for_test(1)
443 );
444 assert_eq!(
445 leader_schedule.elect_leader(5, 0),
446 AuthorityIndex::new_for_test(1)
447 );
448 assert_ne!(
450 leader_schedule.elect_leader_stake_based(1, 1),
451 leader_schedule.elect_leader_stake_based(1, 2)
452 );
453 }
454
455 #[tokio::test]
456 async fn test_elect_leader_stake_based() {
457 let context = Arc::new(Context::new_for_test(4).0);
458 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
459
460 assert_eq!(
461 leader_schedule.elect_leader_stake_based(0, 0),
462 AuthorityIndex::new_for_test(1)
463 );
464 assert_eq!(
465 leader_schedule.elect_leader_stake_based(1, 0),
466 AuthorityIndex::new_for_test(1)
467 );
468 assert_eq!(
469 leader_schedule.elect_leader_stake_based(5, 0),
470 AuthorityIndex::new_for_test(3)
471 );
472 assert_ne!(
474 leader_schedule.elect_leader_stake_based(1, 1),
475 leader_schedule.elect_leader_stake_based(1, 2)
476 );
477 }
478
479 #[tokio::test]
480 async fn test_leader_schedule_from_store() {
481 telemetry_subscribers::init_for_testing();
482 let mut context = Context::new_for_test(4).0;
483 context
484 .protocol_config
485 .set_bad_nodes_stake_threshold_for_testing(33);
486 let context = Arc::new(context);
487 let store = Arc::new(MemStore::new());
488
489 let mut dag_builder = DagBuilder::new(context.clone());
491 dag_builder.layers(1..=11).build();
492 let mut subdags = vec![];
493 let mut expected_commits = vec![];
494 let mut blocks_to_write = vec![];
495
496 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
497 for block in sub_dag.blocks.iter() {
498 blocks_to_write.push(block.clone());
499 }
500 expected_commits.push(commit);
501 subdags.push(sub_dag);
502 }
503
504 let commit_range = (1..=10).into();
507 let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
508 let committed_rounds = vec![9, 9, 10, 9];
509 let commit_ref = expected_commits[9].reference();
510 let commit_info = CommitInfo {
511 reputation_scores,
512 committed_rounds,
513 };
514
515 store
519 .write(
520 WriteBatch::default()
521 .commit_info(vec![(commit_ref, commit_info)])
522 .blocks(blocks_to_write)
523 .commits(expected_commits),
524 )
525 .unwrap();
526
527 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
528
529 assert_eq!(
531 dag_builder.last_committed_rounds.clone(),
532 dag_state.read().last_committed_rounds()
533 );
534 assert_eq!(1, dag_state.read().scoring_subdags_count());
535 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
536 let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
537 assert_eq!(recovered_scores, expected_scores);
538
539 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
540
541 let leader_swap_table = leader_schedule.leader_swap_table.read();
543 assert_eq!(leader_swap_table.good_nodes.len(), 1);
544 assert_eq!(
545 leader_swap_table.good_nodes[0].0,
546 AuthorityIndex::new_for_test(0)
547 );
548 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
549 assert!(
550 leader_swap_table
551 .bad_nodes
552 .contains_key(&AuthorityIndex::new_for_test(2)),
553 "{:?}",
554 leader_swap_table.bad_nodes
555 );
556 }
557
558 #[tokio::test]
559 async fn test_leader_schedule_from_store_no_commits() {
560 telemetry_subscribers::init_for_testing();
561 let mut context = Context::new_for_test(4).0;
562 context
563 .protocol_config
564 .set_bad_nodes_stake_threshold_for_testing(33);
565 let context = Arc::new(context);
566 let store = Arc::new(MemStore::new());
567
568 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
569
570 let expected_last_committed_rounds = vec![0, 0, 0, 0];
571
572 assert_eq!(
574 expected_last_committed_rounds,
575 dag_state.read().last_committed_rounds()
576 );
577 assert_eq!(0, dag_state.read().scoring_subdags_count());
578
579 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
580
581 let leader_swap_table = leader_schedule.leader_swap_table.read();
583 assert_eq!(leader_swap_table.good_nodes.len(), 0);
584 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
585 }
586
587 #[tokio::test]
588 async fn test_leader_schedule_from_store_no_commit_info() {
589 telemetry_subscribers::init_for_testing();
590 let mut context = Context::new_for_test(4).0;
591 context
592 .protocol_config
593 .set_bad_nodes_stake_threshold_for_testing(33);
594 let context = Arc::new(context);
595 let store = Arc::new(MemStore::new());
596
597 let mut dag_builder = DagBuilder::new(context.clone());
599 dag_builder.layers(1..=2).build();
600
601 let mut expected_scored_subdags = vec![];
602 let mut expected_commits = vec![];
603 let mut blocks_to_write = vec![];
604
605 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
606 for block in sub_dag.blocks.iter() {
607 blocks_to_write.push(block.clone());
608 }
609 expected_commits.push(commit);
610 expected_scored_subdags.push(sub_dag);
611 }
612
613 store
619 .write(
620 WriteBatch::default()
621 .blocks(blocks_to_write)
622 .commits(expected_commits),
623 )
624 .unwrap();
625
626 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
627
628 assert_eq!(
630 dag_builder.last_committed_rounds.clone(),
631 dag_state.read().last_committed_rounds()
632 );
633 assert_eq!(
634 expected_scored_subdags.len(),
635 dag_state.read().scoring_subdags_count()
636 );
637 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
638 let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
639 assert_eq!(recovered_scores, expected_scores);
640
641 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
642
643 let leader_swap_table = leader_schedule.leader_swap_table.read();
645 assert_eq!(leader_swap_table.good_nodes.len(), 0);
646 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
647 }
648
649 #[tokio::test]
650 async fn test_leader_schedule_commits_until_leader_schedule_update() {
651 telemetry_subscribers::init_for_testing();
652 let context = Arc::new(Context::new_for_test(4).0);
653 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
654
655 let dag_state = Arc::new(RwLock::new(DagState::new(
656 context.clone(),
657 Arc::new(MemStore::new()),
658 )));
659 let unscored_subdags = vec![CommittedSubDag::new(
660 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
661 vec![],
662 context.clock.timestamp_utc_ms(),
663 CommitRef::new(1, CommitDigest::MIN),
664 )];
665 dag_state.write().add_scoring_subdags(unscored_subdags);
666
667 let commits_until_leader_schedule_update =
668 leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
669 assert_eq!(commits_until_leader_schedule_update, 299);
670 }
671
672 #[tokio::test]
673 async fn test_leader_schedule_update_leader_schedule() {
674 telemetry_subscribers::init_for_testing();
675 let mut context = Context::new_for_test(4).0;
676 context
677 .protocol_config
678 .set_bad_nodes_stake_threshold_for_testing(33);
679 let context = Arc::new(context);
680 let leader_schedule = Arc::new(LeaderSchedule::new(
681 context.clone(),
682 LeaderSwapTable::default(),
683 ));
684 let dag_state = Arc::new(RwLock::new(DagState::new(
685 context.clone(),
686 Arc::new(MemStore::new()),
687 )));
688
689 let max_round: u32 = 4;
691 let num_authorities: u32 = 4;
692
693 let mut blocks = Vec::new();
694 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
695 .committee
696 .authorities()
697 .map(|index| {
698 let author_idx = index.0.value() as u32;
699 let block = TestBlock::new(0, author_idx).build();
700 VerifiedBlock::new_for_test(block)
701 })
702 .map(|block| (block.reference(), block))
703 .unzip();
704 blocks.extend(genesis);
705
706 let mut ancestors = genesis_references;
707 let mut leader = None;
708 for round in 1..=max_round {
709 let mut new_ancestors = vec![];
710 for author in 0..num_authorities {
711 let base_ts = round as BlockTimestampMs * 1000;
712 let block = VerifiedBlock::new_for_test(
713 TestBlock::new(round, author)
714 .set_timestamp_ms(base_ts + (author + round) as u64)
715 .set_ancestors(ancestors.clone())
716 .build(),
717 );
718 new_ancestors.push(block.reference());
719
720 if round == 3 && author == 0 {
723 tracing::info!("Skipping {block} in committed subdags blocks");
724 continue;
725 }
726
727 blocks.push(block.clone());
728
729 if round == max_round {
732 leader = Some(block.clone());
733 break;
734 }
735 }
736 ancestors = new_ancestors;
737 }
738
739 let leader_block = leader.unwrap();
740 let leader_ref = leader_block.reference();
741 let commit_index = 1;
742
743 let last_commit = TrustedCommit::new_for_test(
744 commit_index,
745 CommitDigest::MIN,
746 context.clock.timestamp_utc_ms(),
747 leader_ref,
748 blocks
749 .iter()
750 .map(|block| block.reference())
751 .collect::<Vec<_>>(),
752 );
753
754 let unscored_subdags = vec![CommittedSubDag::new(
755 leader_ref,
756 blocks,
757 context.clock.timestamp_utc_ms(),
758 last_commit.reference(),
759 )];
760
761 let mut dag_state_write = dag_state.write();
762 dag_state_write.set_last_commit(last_commit);
763 dag_state_write.add_scoring_subdags(unscored_subdags);
764 drop(dag_state_write);
765
766 assert_eq!(
767 leader_schedule.elect_leader(4, 0),
768 AuthorityIndex::new_for_test(0)
769 );
770
771 leader_schedule.update_leader_schedule_v2(&dag_state);
772
773 let leader_swap_table = leader_schedule.leader_swap_table.read();
774 assert_eq!(leader_swap_table.good_nodes.len(), 1);
775 assert_eq!(
776 leader_swap_table.good_nodes[0].0,
777 AuthorityIndex::new_for_test(2)
778 );
779 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
780 assert!(
781 leader_swap_table
782 .bad_nodes
783 .contains_key(&AuthorityIndex::new_for_test(0))
784 );
785 assert_eq!(
786 leader_schedule.elect_leader(4, 0),
787 AuthorityIndex::new_for_test(2)
788 );
789 }
790
791 #[tokio::test]
792 async fn test_leader_swap_table() {
793 telemetry_subscribers::init_for_testing();
794 let context = Arc::new(Context::new_for_test(4).0);
795
796 let swap_stake_threshold = 33;
797 let reputation_scores = ReputationScores::new(
798 (0..=10).into(),
799 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
800 );
801 let leader_swap_table =
802 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
803
804 assert_eq!(leader_swap_table.good_nodes.len(), 1);
805 assert_eq!(
806 leader_swap_table.good_nodes[0].0,
807 AuthorityIndex::new_for_test(3)
808 );
809 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
810 assert!(
811 leader_swap_table
812 .bad_nodes
813 .contains_key(&AuthorityIndex::new_for_test(0))
814 );
815 }
816
817 #[tokio::test]
818 async fn test_leader_swap_table_swap() {
819 telemetry_subscribers::init_for_testing();
820 let context = Arc::new(Context::new_for_test(4).0);
821
822 let swap_stake_threshold = 33;
823 let reputation_scores = ReputationScores::new(
824 (0..=10).into(),
825 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
826 );
827 let leader_swap_table =
828 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
829
830 let leader = AuthorityIndex::new_for_test(0);
832 let leader_round = 1;
833 let leader_offset = 0;
834 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
835 assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
836
837 let leader = AuthorityIndex::new_for_test(1);
839 let leader_round = 1;
840 let leader_offset = 0;
841 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
842 assert_eq!(swapped_leader, None);
843 }
844
845 #[tokio::test]
846 async fn test_leader_swap_table_retrieve_first_nodes() {
847 telemetry_subscribers::init_for_testing();
848 let context = Arc::new(Context::new_for_test(4).0);
849
850 let authorities = [
851 (AuthorityIndex::new_for_test(0), 1),
852 (AuthorityIndex::new_for_test(1), 2),
853 (AuthorityIndex::new_for_test(2), 3),
854 (AuthorityIndex::new_for_test(3), 4),
855 ];
856
857 let stake_threshold = 50;
858 let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
859 context.clone(),
860 authorities.iter(),
861 stake_threshold,
862 );
863
864 assert_eq!(filtered_authorities.len(), 2);
867 let authority_0_idx = AuthorityIndex::new_for_test(0);
868 let authority_0 = context.committee.authority(authority_0_idx);
869 assert!(filtered_authorities.contains(&(
870 authority_0_idx,
871 authority_0.hostname.clone(),
872 authority_0.stake
873 )));
874 let authority_1_idx = AuthorityIndex::new_for_test(1);
875 let authority_1 = context.committee.authority(authority_1_idx);
876 assert!(filtered_authorities.contains(&(
877 authority_1_idx,
878 authority_1.hostname.clone(),
879 authority_1.stake
880 )));
881 }
882
883 #[tokio::test]
884 #[should_panic(
885 expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
886 )]
887 async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
888 telemetry_subscribers::init_for_testing();
889 let context = Arc::new(Context::new_for_test(4).0);
890
891 let swap_stake_threshold = 34;
892 let reputation_scores = ReputationScores::new(
893 (0..=10).into(),
894 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
895 );
896 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
897 }
898
899 #[tokio::test]
900 async fn test_update_leader_swap_table() {
901 telemetry_subscribers::init_for_testing();
902 let context = Arc::new(Context::new_for_test(4).0);
903
904 let swap_stake_threshold = 33;
905 let reputation_scores = ReputationScores::new(
906 (1..=10).into(),
907 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
908 );
909 let leader_swap_table =
910 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
911
912 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
913
914 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
916
917 let reputation_scores = ReputationScores::new(
918 (11..=20).into(),
919 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
920 );
921 let leader_swap_table =
922 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
923
924 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
926 }
927
928 #[tokio::test]
929 #[should_panic(
930 expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
931 )]
932 async fn test_update_bad_leader_swap_table() {
933 telemetry_subscribers::init_for_testing();
934 let context = Arc::new(Context::new_for_test(4).0);
935
936 let swap_stake_threshold = 33;
937 let reputation_scores = ReputationScores::new(
938 (1..=10).into(),
939 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
940 );
941 let leader_swap_table =
942 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
943
944 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
945
946 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
948
949 let reputation_scores = ReputationScores::new(
950 (11..=20).into(),
951 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
952 );
953 let leader_swap_table =
954 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
955
956 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
958
959 let reputation_scores = ReputationScores::new(
960 (21..=25).into(),
961 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
962 );
963 let leader_swap_table =
964 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
965
966 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
968 }
969}