1use std::{
5 collections::BTreeMap,
6 fmt::{Debug, Formatter},
7 sync::Arc,
8};
9
10use consensus_config::{AuthorityIndex, Stake};
11use consensus_types::block::Round;
12use parking_lot::RwLock;
13use rand::{SeedableRng, prelude::SliceRandom, rngs::StdRng};
14
15use crate::{
16 CommitIndex, commit::CommitRange, context::Context, dag_state::DagState,
17 leader_scoring::ReputationScores,
18};
19
20#[derive(Clone)]
24pub(crate) struct LeaderSchedule {
25 pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
26 context: Arc<Context>,
27 num_commits_per_schedule: u64,
28}
29
30impl LeaderSchedule {
31 #[cfg(not(msim))]
35 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
36 #[cfg(msim)]
37 const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
38
39 pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
40 Self {
41 context,
42 num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
43 leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
44 }
45 }
46
47 #[cfg(test)]
48 pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
49 self.num_commits_per_schedule = num_commits_per_schedule;
50 self
51 }
52
53 pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
56 let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
57 LeaderSwapTable::default(),
58 |(last_commit_ref, last_commit_info)| {
59 LeaderSwapTable::new(
60 context.clone(),
61 last_commit_ref.index,
62 last_commit_info.reputation_scores,
63 )
64 },
65 );
66
67 tracing::info!(
68 "LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
69 dag_state.read().scoring_subdags_count(),
70 );
71
72 Self::new(context, leader_swap_table)
74 }
75
76 pub(crate) fn commits_until_leader_schedule_update(
77 &self,
78 dag_state: Arc<RwLock<DagState>>,
79 ) -> usize {
80 let subdag_count = dag_state.read().scoring_subdags_count() as u64;
81
82 assert!(
83 subdag_count <= self.num_commits_per_schedule,
84 "Committed subdags count exceeds the number of commits per schedule"
85 );
86 self.num_commits_per_schedule
87 .checked_sub(subdag_count)
88 .unwrap() as usize
89 }
90
91 pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
96 dag_state.read().is_scoring_subdag_empty()
97 }
98
99 pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
100 let _s = self
101 .context
102 .metrics
103 .node_metrics
104 .scope_processing_time
105 .with_label_values(&["LeaderSchedule::update_leader_schedule"])
106 .start_timer();
107
108 let (reputation_scores, last_commit_index) = {
109 let dag_state = dag_state.read();
110 let reputation_scores = dag_state.calculate_scoring_subdag_scores();
111
112 let last_commit_index = dag_state.scoring_subdag_commit_range();
113
114 (reputation_scores, last_commit_index)
115 };
116
117 {
118 let mut dag_state = dag_state.write();
119 dag_state.clear_scoring_subdag();
121 dag_state.add_commit_info(reputation_scores.clone());
123 }
124
125 self.update_leader_swap_table(LeaderSwapTable::new(
126 self.context.clone(),
127 last_commit_index,
128 reputation_scores.clone(),
129 ));
130
131 reputation_scores.update_metrics(self.context.clone());
132
133 self.context
134 .metrics
135 .node_metrics
136 .num_of_bad_nodes
137 .set(self.leader_swap_table.read().bad_nodes.len() as i64);
138 }
139
140 pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
141 cfg_if::cfg_if! {
142 if #[cfg(test)] {
145 let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
146 let table = self.leader_swap_table.read();
147 table.swap(leader, round, leader_offset).unwrap_or(leader)
148 } else {
149 let leader = self.elect_leader_stake_based(round, leader_offset);
150 let table = self.leader_swap_table.read();
151 table.swap(leader, round, leader_offset).unwrap_or(leader)
152 }
153 }
154 }
155
156 pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
157 assert!((offset as usize) < self.context.committee.size());
158
159 let mut seed_bytes = [0u8; 32];
164 seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
165 let mut rng = StdRng::from_seed(seed_bytes);
166
167 let choices = self
168 .context
169 .committee
170 .authorities()
171 .map(|(index, authority)| (index, authority.stake as f32))
172 .collect::<Vec<_>>();
173
174 *choices
175 .choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
176 .expect("Weighted choice error: stake values incorrect!")
177 .skip(offset as usize)
178 .map(|(index, _)| index)
179 .next()
180 .unwrap()
181 }
182
183 fn update_leader_swap_table(&self, table: LeaderSwapTable) {
187 let read = self.leader_swap_table.read();
188 let old_commit_range = &read.reputation_scores.commit_range;
189 let new_commit_range = &table.reputation_scores.commit_range;
190
191 if *old_commit_range != CommitRange::default() {
196 assert!(
197 old_commit_range.is_next_range(new_commit_range)
198 && old_commit_range.is_equal_size(new_commit_range),
199 "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
200 );
201 }
202 drop(read);
203
204 tracing::trace!("Updating {table:?}");
205
206 let mut write = self.leader_swap_table.write();
207 *write = table;
208 }
209}
210
211#[derive(Default, Clone)]
212pub(crate) struct LeaderSwapTable {
213 pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
218
219 pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
225
226 pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
229
230 pub(crate) reputation_scores: ReputationScores,
234}
235
236impl LeaderSwapTable {
237 pub(crate) fn new(
242 context: Arc<Context>,
243 commit_index: CommitIndex,
244 reputation_scores: ReputationScores,
245 ) -> Self {
246 let swap_stake_threshold = context
247 .protocol_config
248 .consensus_bad_nodes_stake_threshold();
249 Self::new_inner(
250 context,
251 swap_stake_threshold,
252 commit_index,
253 reputation_scores,
254 )
255 }
256
257 fn new_inner(
258 context: Arc<Context>,
259 #[allow(unused_variables)] swap_stake_threshold: u64,
262 commit_index: CommitIndex,
263 reputation_scores: ReputationScores,
264 ) -> Self {
265 #[cfg(msim)]
266 let swap_stake_threshold = 33;
267
268 assert!(
269 (0..=33).contains(&swap_stake_threshold),
270 "The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
271 );
272
273 if reputation_scores.scores_per_authority.is_empty() {
275 return Self::default();
276 }
277
278 let mut seed_bytes = [0u8; 32];
281 seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
282 let mut rng = StdRng::from_seed(seed_bytes);
283 let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
284 assert_eq!(authorities_by_score.len(), context.committee.size());
285 authorities_by_score.shuffle(&mut rng);
286 authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
288
289 let good_nodes = Self::retrieve_first_nodes(
291 context.clone(),
292 authorities_by_score.iter(),
293 swap_stake_threshold,
294 )
295 .into_iter()
296 .collect::<Vec<(AuthorityIndex, String, Stake)>>();
297
298 let bad_nodes = Self::retrieve_first_nodes(
302 context.clone(),
303 authorities_by_score.iter().rev(),
304 swap_stake_threshold,
305 )
306 .into_iter()
307 .map(|(idx, hostname, stake)| (idx, (hostname, stake)))
308 .collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
309
310 good_nodes.iter().for_each(|(idx, hostname, stake)| {
311 tracing::debug!(
312 "Good node {hostname} with stake {stake} has score {} for {:?}",
313 reputation_scores.scores_per_authority[idx.to_owned()],
314 reputation_scores.commit_range,
315 );
316 });
317
318 bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
319 tracing::debug!(
320 "Bad node {hostname} with stake {stake} has score {} for {:?}",
321 reputation_scores.scores_per_authority[idx.to_owned()],
322 reputation_scores.commit_range,
323 );
324 });
325
326 tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
327
328 Self {
329 good_nodes,
330 bad_nodes,
331 reputation_scores_desc: authorities_by_score,
332 reputation_scores,
333 }
334 }
335
336 pub(crate) fn swap(
347 &self,
348 leader: AuthorityIndex,
349 leader_round: Round,
350 leader_offset: u32,
351 ) -> Option<AuthorityIndex> {
352 if self.bad_nodes.contains_key(&leader) {
353 assert!(
355 leader_offset == 0,
356 "Swap for multi-leader case not implemented yet."
357 );
358 let mut seed_bytes = [0u8; 32];
359 seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
360 seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
361 let mut rng = StdRng::from_seed(seed_bytes);
362
363 let (idx, _hostname, _stake) = self
364 .good_nodes
365 .choose(&mut rng)
366 .expect("There should be at least one good node available");
367
368 tracing::trace!(
369 "Swapping bad leader {} -> {} for round {}",
370 leader,
371 idx,
372 leader_round
373 );
374
375 return Some(*idx);
376 }
377 None
378 }
379
380 fn retrieve_first_nodes<'a>(
386 context: Arc<Context>,
387 authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
388 stake_threshold: u64,
389 ) -> Vec<(AuthorityIndex, String, Stake)> {
390 let mut filtered_authorities = Vec::new();
391
392 let mut stake = 0;
393 for &(authority_idx, _score) in authorities {
394 stake += context.committee.stake(authority_idx);
395
396 if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
401 break;
402 }
403
404 let authority = context.committee.authority(authority_idx);
405 filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
406 }
407
408 filtered_authorities
409 }
410}
411
412impl Debug for LeaderSwapTable {
413 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
414 f.write_str(&format!(
415 "LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
416 self.reputation_scores.commit_range,
417 self.good_nodes
418 .iter()
419 .map(|(idx, _hostname, _stake)| idx.to_owned())
420 .collect::<Vec<AuthorityIndex>>(),
421 self.good_nodes
422 .iter()
423 .map(|(_idx, _hostname, stake)| stake)
424 .sum::<Stake>(),
425 self.bad_nodes.keys().map(|idx| idx.to_owned()),
426 self.bad_nodes
427 .values()
428 .map(|(_hostname, stake)| stake)
429 .sum::<Stake>(),
430 ))
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use consensus_types::block::{BlockDigest, BlockRef, BlockTimestampMs};
437
438 use super::*;
439 use crate::{
440 block::{TestBlock, VerifiedBlock},
441 commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
442 storage::{Store, WriteBatch, mem_store::MemStore},
443 test_dag_builder::DagBuilder,
444 };
445
446 #[tokio::test]
447 async fn test_elect_leader() {
448 let context = Arc::new(Context::new_for_test(4).0);
449 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
450
451 assert_eq!(
452 leader_schedule.elect_leader(0, 0),
453 AuthorityIndex::new_for_test(0)
454 );
455 assert_eq!(
456 leader_schedule.elect_leader(1, 0),
457 AuthorityIndex::new_for_test(1)
458 );
459 assert_eq!(
460 leader_schedule.elect_leader(5, 0),
461 AuthorityIndex::new_for_test(1)
462 );
463 assert_ne!(
465 leader_schedule.elect_leader_stake_based(1, 1),
466 leader_schedule.elect_leader_stake_based(1, 2)
467 );
468 }
469
470 #[tokio::test]
471 async fn test_elect_leader_stake_based() {
472 let context = Arc::new(Context::new_for_test(4).0);
473 let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
474
475 assert_eq!(
476 leader_schedule.elect_leader_stake_based(0, 0),
477 AuthorityIndex::new_for_test(1)
478 );
479 assert_eq!(
480 leader_schedule.elect_leader_stake_based(1, 0),
481 AuthorityIndex::new_for_test(1)
482 );
483 assert_eq!(
484 leader_schedule.elect_leader_stake_based(5, 0),
485 AuthorityIndex::new_for_test(3)
486 );
487 assert_ne!(
489 leader_schedule.elect_leader_stake_based(1, 1),
490 leader_schedule.elect_leader_stake_based(1, 2)
491 );
492 }
493
494 #[tokio::test]
495 async fn test_leader_schedule_from_store() {
496 telemetry_subscribers::init_for_testing();
497 let mut context = Context::new_for_test(4).0;
498 context
499 .protocol_config
500 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
501 let context = Arc::new(context);
502 let store = Arc::new(MemStore::new());
503
504 let mut dag_builder = DagBuilder::new(context.clone());
506 dag_builder.layers(1..=11).build();
507 let mut subdags = vec![];
508 let mut expected_commits = vec![];
509 let mut blocks_to_write = vec![];
510
511 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
512 for block in sub_dag.blocks.iter() {
513 blocks_to_write.push(block.clone());
514 }
515 expected_commits.push(commit);
516 subdags.push(sub_dag);
517 }
518
519 let commit_range = (1..=10).into();
522 let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
523 let committed_rounds = vec![9, 9, 10, 9];
524 let commit_ref = expected_commits[9].reference();
525 let commit_info = CommitInfo {
526 reputation_scores,
527 committed_rounds,
528 };
529
530 store
534 .write(
535 WriteBatch::default()
536 .commit_info(vec![(commit_ref, commit_info)])
537 .blocks(blocks_to_write)
538 .commits(expected_commits),
539 )
540 .unwrap();
541
542 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
543
544 assert_eq!(
546 dag_builder.last_committed_rounds.clone(),
547 dag_state.read().last_committed_rounds()
548 );
549 assert_eq!(1, dag_state.read().scoring_subdags_count());
550 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
551 let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
552 assert_eq!(recovered_scores, expected_scores);
553
554 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
555
556 let leader_swap_table = leader_schedule.leader_swap_table.read();
558 assert_eq!(leader_swap_table.good_nodes.len(), 1);
559 assert_eq!(
560 leader_swap_table.good_nodes[0].0,
561 AuthorityIndex::new_for_test(0)
562 );
563 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
564 assert!(
565 leader_swap_table
566 .bad_nodes
567 .contains_key(&AuthorityIndex::new_for_test(2)),
568 "{:?}",
569 leader_swap_table.bad_nodes
570 );
571 }
572
573 #[tokio::test]
574 async fn test_leader_schedule_from_store_no_commits() {
575 telemetry_subscribers::init_for_testing();
576 let mut context = Context::new_for_test(4).0;
577 context
578 .protocol_config
579 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
580 let context = Arc::new(context);
581 let store = Arc::new(MemStore::new());
582
583 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
584
585 let expected_last_committed_rounds = vec![0, 0, 0, 0];
586
587 assert_eq!(
589 expected_last_committed_rounds,
590 dag_state.read().last_committed_rounds()
591 );
592 assert_eq!(0, dag_state.read().scoring_subdags_count());
593
594 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
595
596 let leader_swap_table = leader_schedule.leader_swap_table.read();
598 assert_eq!(leader_swap_table.good_nodes.len(), 0);
599 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
600 }
601
602 #[tokio::test]
603 async fn test_leader_schedule_from_store_no_commit_info() {
604 telemetry_subscribers::init_for_testing();
605 let mut context = Context::new_for_test(4).0;
606 context
607 .protocol_config
608 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
609 let context = Arc::new(context);
610 let store = Arc::new(MemStore::new());
611
612 let mut dag_builder = DagBuilder::new(context.clone());
614 dag_builder.layers(1..=2).build();
615
616 let mut expected_scored_subdags = vec![];
617 let mut expected_commits = vec![];
618 let mut blocks_to_write = vec![];
619
620 for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
621 for block in sub_dag.blocks.iter() {
622 blocks_to_write.push(block.clone());
623 }
624 expected_commits.push(commit);
625 expected_scored_subdags.push(sub_dag);
626 }
627
628 store
634 .write(
635 WriteBatch::default()
636 .blocks(blocks_to_write)
637 .commits(expected_commits),
638 )
639 .unwrap();
640
641 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
642
643 assert_eq!(
645 dag_builder.last_committed_rounds.clone(),
646 dag_state.read().last_committed_rounds()
647 );
648 assert_eq!(
649 expected_scored_subdags.len(),
650 dag_state.read().scoring_subdags_count()
651 );
652 let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
653 let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
654 assert_eq!(recovered_scores, expected_scores);
655
656 let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
657
658 let leader_swap_table = leader_schedule.leader_swap_table.read();
660 assert_eq!(leader_swap_table.good_nodes.len(), 0);
661 assert_eq!(leader_swap_table.bad_nodes.len(), 0);
662 }
663
664 #[tokio::test]
665 async fn test_leader_schedule_commits_until_leader_schedule_update() {
666 telemetry_subscribers::init_for_testing();
667 let context = Arc::new(Context::new_for_test(4).0);
668 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
669
670 let dag_state = Arc::new(RwLock::new(DagState::new(
671 context.clone(),
672 Arc::new(MemStore::new()),
673 )));
674 let unscored_subdags = vec![CommittedSubDag::new(
675 BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
676 vec![],
677 context.clock.timestamp_utc_ms(),
678 CommitRef::new(1, CommitDigest::MIN),
679 )];
680 dag_state.write().add_scoring_subdags(unscored_subdags);
681
682 let commits_until_leader_schedule_update =
683 leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
684 assert_eq!(commits_until_leader_schedule_update, 299);
685 }
686
687 #[tokio::test]
688 async fn test_leader_schedule_update_leader_schedule() {
689 telemetry_subscribers::init_for_testing();
690 let mut context = Context::new_for_test(4).0;
691 context
692 .protocol_config
693 .set_consensus_bad_nodes_stake_threshold_for_testing(33);
694 let context = Arc::new(context);
695 let leader_schedule = Arc::new(LeaderSchedule::new(
696 context.clone(),
697 LeaderSwapTable::default(),
698 ));
699 let dag_state = Arc::new(RwLock::new(DagState::new(
700 context.clone(),
701 Arc::new(MemStore::new()),
702 )));
703
704 let max_round: u32 = 4;
706 let num_authorities: u32 = 4;
707
708 let mut blocks = Vec::new();
709 let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
710 .committee
711 .authorities()
712 .map(|index| {
713 let author_idx = index.0.value() as u32;
714 let block = TestBlock::new(0, author_idx).build();
715 VerifiedBlock::new_for_test(block)
716 })
717 .map(|block| (block.reference(), block))
718 .unzip();
719 blocks.extend(genesis);
720
721 let mut ancestors = genesis_references;
722 let mut leader = None;
723 for round in 1..=max_round {
724 let mut new_ancestors = vec![];
725 for author in 0..num_authorities {
726 let base_ts = round as BlockTimestampMs * 1000;
727 let block = VerifiedBlock::new_for_test(
728 TestBlock::new(round, author)
729 .set_timestamp_ms(base_ts + (author + round) as u64)
730 .set_ancestors(ancestors.clone())
731 .build(),
732 );
733 new_ancestors.push(block.reference());
734
735 if round == 3 && author == 0 {
738 tracing::info!("Skipping {block} in committed subdags blocks");
739 continue;
740 }
741
742 blocks.push(block.clone());
743
744 if round == max_round {
747 leader = Some(block.clone());
748 break;
749 }
750 }
751 ancestors = new_ancestors;
752 }
753
754 let leader_block = leader.unwrap();
755 let leader_ref = leader_block.reference();
756 let commit_index = 1;
757
758 let last_commit = TrustedCommit::new_for_test(
759 commit_index,
760 CommitDigest::MIN,
761 context.clock.timestamp_utc_ms(),
762 leader_ref,
763 blocks
764 .iter()
765 .map(|block| block.reference())
766 .collect::<Vec<_>>(),
767 );
768
769 let unscored_subdags = vec![CommittedSubDag::new(
770 leader_ref,
771 blocks,
772 context.clock.timestamp_utc_ms(),
773 last_commit.reference(),
774 )];
775
776 let mut dag_state_write = dag_state.write();
777 dag_state_write.set_last_commit(last_commit);
778 dag_state_write.add_scoring_subdags(unscored_subdags);
779 drop(dag_state_write);
780
781 assert_eq!(
782 leader_schedule.elect_leader(4, 0),
783 AuthorityIndex::new_for_test(0)
784 );
785
786 leader_schedule.update_leader_schedule_v2(&dag_state);
787
788 let leader_swap_table = leader_schedule.leader_swap_table.read();
789 assert_eq!(leader_swap_table.good_nodes.len(), 1);
790 assert_eq!(
791 leader_swap_table.good_nodes[0].0,
792 AuthorityIndex::new_for_test(2)
793 );
794 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
795 assert!(
796 leader_swap_table
797 .bad_nodes
798 .contains_key(&AuthorityIndex::new_for_test(0))
799 );
800 assert_eq!(
801 leader_schedule.elect_leader(4, 0),
802 AuthorityIndex::new_for_test(2)
803 );
804 }
805
806 #[tokio::test]
807 async fn test_leader_swap_table() {
808 telemetry_subscribers::init_for_testing();
809 let context = Arc::new(Context::new_for_test(4).0);
810
811 let swap_stake_threshold = 33;
812 let reputation_scores = ReputationScores::new(
813 (0..=10).into(),
814 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
815 );
816 let leader_swap_table =
817 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
818
819 assert_eq!(leader_swap_table.good_nodes.len(), 1);
820 assert_eq!(
821 leader_swap_table.good_nodes[0].0,
822 AuthorityIndex::new_for_test(3)
823 );
824 assert_eq!(leader_swap_table.bad_nodes.len(), 1);
825 assert!(
826 leader_swap_table
827 .bad_nodes
828 .contains_key(&AuthorityIndex::new_for_test(0))
829 );
830 }
831
832 #[tokio::test]
833 async fn test_leader_swap_table_swap() {
834 telemetry_subscribers::init_for_testing();
835 let context = Arc::new(Context::new_for_test(4).0);
836
837 let swap_stake_threshold = 33;
838 let reputation_scores = ReputationScores::new(
839 (0..=10).into(),
840 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
841 );
842 let leader_swap_table =
843 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
844
845 let leader = AuthorityIndex::new_for_test(0);
847 let leader_round = 1;
848 let leader_offset = 0;
849 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
850 assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
851
852 let leader = AuthorityIndex::new_for_test(1);
854 let leader_round = 1;
855 let leader_offset = 0;
856 let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
857 assert_eq!(swapped_leader, None);
858 }
859
860 #[tokio::test]
861 async fn test_leader_swap_table_retrieve_first_nodes() {
862 telemetry_subscribers::init_for_testing();
863 let context = Arc::new(Context::new_for_test(4).0);
864
865 let authorities = [
866 (AuthorityIndex::new_for_test(0), 1),
867 (AuthorityIndex::new_for_test(1), 2),
868 (AuthorityIndex::new_for_test(2), 3),
869 (AuthorityIndex::new_for_test(3), 4),
870 ];
871
872 let stake_threshold = 50;
873 let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
874 context.clone(),
875 authorities.iter(),
876 stake_threshold,
877 );
878
879 assert_eq!(filtered_authorities.len(), 2);
882 let authority_0_idx = AuthorityIndex::new_for_test(0);
883 let authority_0 = context.committee.authority(authority_0_idx);
884 assert!(filtered_authorities.contains(&(
885 authority_0_idx,
886 authority_0.hostname.clone(),
887 authority_0.stake
888 )));
889 let authority_1_idx = AuthorityIndex::new_for_test(1);
890 let authority_1 = context.committee.authority(authority_1_idx);
891 assert!(filtered_authorities.contains(&(
892 authority_1_idx,
893 authority_1.hostname.clone(),
894 authority_1.stake
895 )));
896 }
897
898 #[tokio::test]
899 #[should_panic(
900 expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
901 )]
902 async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
903 telemetry_subscribers::init_for_testing();
904 let context = Arc::new(Context::new_for_test(4).0);
905
906 let swap_stake_threshold = 34;
907 let reputation_scores = ReputationScores::new(
908 (0..=10).into(),
909 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
910 );
911 LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
912 }
913
914 #[tokio::test]
915 async fn test_update_leader_swap_table() {
916 telemetry_subscribers::init_for_testing();
917 let context = Arc::new(Context::new_for_test(4).0);
918
919 let swap_stake_threshold = 33;
920 let reputation_scores = ReputationScores::new(
921 (1..=10).into(),
922 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
923 );
924 let leader_swap_table =
925 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
926
927 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
928
929 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
931
932 let reputation_scores = ReputationScores::new(
933 (11..=20).into(),
934 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
935 );
936 let leader_swap_table =
937 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
938
939 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
941 }
942
943 #[tokio::test]
944 #[should_panic(
945 expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
946 )]
947 async fn test_update_bad_leader_swap_table() {
948 telemetry_subscribers::init_for_testing();
949 let context = Arc::new(Context::new_for_test(4).0);
950
951 let swap_stake_threshold = 33;
952 let reputation_scores = ReputationScores::new(
953 (1..=10).into(),
954 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
955 );
956 let leader_swap_table =
957 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
958
959 let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
960
961 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
963
964 let reputation_scores = ReputationScores::new(
965 (11..=20).into(),
966 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
967 );
968 let leader_swap_table =
969 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
970
971 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
973
974 let reputation_scores = ReputationScores::new(
975 (21..=25).into(),
976 (0..4).map(|i| i as u64).collect::<Vec<_>>(),
977 );
978 let leader_swap_table =
979 LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
980
981 leader_schedule.update_leader_swap_table(leader_swap_table.clone());
983 }
984}