use std::{
collections::BTreeMap,
fmt::{Debug, Formatter},
sync::Arc,
};
use consensus_config::{AuthorityIndex, Stake};
use parking_lot::RwLock;
use rand::{prelude::SliceRandom, rngs::StdRng, SeedableRng};
use crate::{
commit::CommitRange, context::Context, dag_state::DagState, leader_scoring::ReputationScores,
CommitIndex, Round,
};
#[derive(Clone)]
pub(crate) struct LeaderSchedule {
pub leader_swap_table: Arc<RwLock<LeaderSwapTable>>,
context: Arc<Context>,
num_commits_per_schedule: u64,
}
impl LeaderSchedule {
#[cfg(not(msim))]
const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 300;
#[cfg(msim)]
const CONSENSUS_COMMITS_PER_SCHEDULE: u64 = 10;
pub(crate) fn new(context: Arc<Context>, leader_swap_table: LeaderSwapTable) -> Self {
Self {
context,
num_commits_per_schedule: Self::CONSENSUS_COMMITS_PER_SCHEDULE,
leader_swap_table: Arc::new(RwLock::new(leader_swap_table)),
}
}
#[cfg(test)]
pub(crate) fn with_num_commits_per_schedule(mut self, num_commits_per_schedule: u64) -> Self {
self.num_commits_per_schedule = num_commits_per_schedule;
self
}
pub(crate) fn from_store(context: Arc<Context>, dag_state: Arc<RwLock<DagState>>) -> Self {
let leader_swap_table = dag_state.read().recover_last_commit_info().map_or(
LeaderSwapTable::default(),
|(last_commit_ref, last_commit_info)| {
LeaderSwapTable::new(
context.clone(),
last_commit_ref.index,
last_commit_info.reputation_scores,
)
},
);
tracing::info!(
"LeaderSchedule recovered using {leader_swap_table:?}. There are {} committed subdags scored in DagState.",
dag_state.read().scoring_subdags_count(),
);
Self::new(context, leader_swap_table)
}
pub(crate) fn commits_until_leader_schedule_update(
&self,
dag_state: Arc<RwLock<DagState>>,
) -> usize {
let subdag_count = dag_state.read().scoring_subdags_count() as u64;
assert!(
subdag_count <= self.num_commits_per_schedule,
"Committed subdags count exceeds the number of commits per schedule"
);
self.num_commits_per_schedule
.checked_sub(subdag_count)
.unwrap() as usize
}
pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
dag_state.read().is_scoring_subdag_empty()
}
pub(crate) fn update_leader_schedule_v2(&self, dag_state: &RwLock<DagState>) {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["LeaderSchedule::update_leader_schedule"])
.start_timer();
let (reputation_scores, last_commit_index) = {
let dag_state = dag_state.read();
let reputation_scores = dag_state.calculate_scoring_subdag_scores();
let last_commit_index = dag_state.scoring_subdag_commit_range();
(reputation_scores, last_commit_index)
};
{
let mut dag_state = dag_state.write();
dag_state.clear_scoring_subdag();
dag_state.add_commit_info(reputation_scores.clone());
}
self.update_leader_swap_table(LeaderSwapTable::new(
self.context.clone(),
last_commit_index,
reputation_scores.clone(),
));
reputation_scores.update_metrics(self.context.clone());
self.context
.metrics
.node_metrics
.num_of_bad_nodes
.set(self.leader_swap_table.read().bad_nodes.len() as i64);
}
pub(crate) fn elect_leader(&self, round: u32, leader_offset: u32) -> AuthorityIndex {
cfg_if::cfg_if! {
if #[cfg(test)] {
let leader = AuthorityIndex::new_for_test((round + leader_offset) % self.context.committee.size() as u32);
let table = self.leader_swap_table.read();
table.swap(leader, round, leader_offset).unwrap_or(leader)
} else {
let leader = self.elect_leader_stake_based(round, leader_offset);
let table = self.leader_swap_table.read();
table.swap(leader, round, leader_offset).unwrap_or(leader)
}
}
}
pub(crate) fn elect_leader_stake_based(&self, round: u32, offset: u32) -> AuthorityIndex {
assert!((offset as usize) < self.context.committee.size());
let mut seed_bytes = [0u8; 32];
seed_bytes[32 - 4..].copy_from_slice(&(round).to_le_bytes());
let mut rng = StdRng::from_seed(seed_bytes);
let choices = self
.context
.committee
.authorities()
.map(|(index, authority)| (index, authority.stake as f32))
.collect::<Vec<_>>();
let leader_index = *choices
.choose_multiple_weighted(&mut rng, self.context.committee.size(), |item| item.1)
.expect("Weighted choice error: stake values incorrect!")
.skip(offset as usize)
.map(|(index, _)| index)
.next()
.unwrap();
leader_index
}
fn update_leader_swap_table(&self, table: LeaderSwapTable) {
let read = self.leader_swap_table.read();
let old_commit_range = &read.reputation_scores.commit_range;
let new_commit_range = &table.reputation_scores.commit_range;
if *old_commit_range != CommitRange::default() {
assert!(
old_commit_range.is_next_range(new_commit_range) && old_commit_range.is_equal_size(new_commit_range),
"The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable {old_commit_range:?} vs new LeaderSwapTable {new_commit_range:?}",
);
}
drop(read);
tracing::trace!("Updating {table:?}");
let mut write = self.leader_swap_table.write();
*write = table;
}
}
#[derive(Default, Clone)]
pub(crate) struct LeaderSwapTable {
pub(crate) good_nodes: Vec<(AuthorityIndex, String, Stake)>,
pub(crate) bad_nodes: BTreeMap<AuthorityIndex, (String, Stake)>,
pub(crate) reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
pub(crate) reputation_scores: ReputationScores,
}
impl LeaderSwapTable {
pub(crate) fn new(
context: Arc<Context>,
commit_index: CommitIndex,
reputation_scores: ReputationScores,
) -> Self {
let swap_stake_threshold = context
.protocol_config
.consensus_bad_nodes_stake_threshold();
Self::new_inner(
context,
swap_stake_threshold,
commit_index,
reputation_scores,
)
}
fn new_inner(
context: Arc<Context>,
#[allow(unused_variables)] swap_stake_threshold: u64,
commit_index: CommitIndex,
reputation_scores: ReputationScores,
) -> Self {
#[cfg(msim)]
let swap_stake_threshold = 33;
assert!(
(0..=33).contains(&swap_stake_threshold),
"The swap_stake_threshold ({swap_stake_threshold}) should be in range [0 - 33], out of bounds parameter detected"
);
if reputation_scores.scores_per_authority.is_empty() {
return Self::default();
}
let mut seed_bytes = [0u8; 32];
seed_bytes[28..32].copy_from_slice(&commit_index.to_le_bytes());
let mut rng = StdRng::from_seed(seed_bytes);
let mut authorities_by_score = reputation_scores.authorities_by_score(context.clone());
assert_eq!(authorities_by_score.len(), context.committee.size());
authorities_by_score.shuffle(&mut rng);
authorities_by_score.sort_by(|a1, a2| a2.1.cmp(&a1.1));
let good_nodes = Self::retrieve_first_nodes(
context.clone(),
authorities_by_score.iter(),
swap_stake_threshold,
)
.into_iter()
.collect::<Vec<(AuthorityIndex, String, Stake)>>();
let bad_nodes = Self::retrieve_first_nodes(
context.clone(),
authorities_by_score.iter().rev(),
swap_stake_threshold,
)
.into_iter()
.map(|(idx, hostname, stake)| (idx, (hostname, stake)))
.collect::<BTreeMap<AuthorityIndex, (String, Stake)>>();
good_nodes.iter().for_each(|(idx, hostname, stake)| {
tracing::debug!(
"Good node {hostname} with stake {stake} has score {} for {:?}",
reputation_scores.scores_per_authority[idx.to_owned()],
reputation_scores.commit_range,
);
});
bad_nodes.iter().for_each(|(idx, (hostname, stake))| {
tracing::debug!(
"Bad node {hostname} with stake {stake} has score {} for {:?}",
reputation_scores.scores_per_authority[idx.to_owned()],
reputation_scores.commit_range,
);
});
tracing::info!("Scores used for new LeaderSwapTable: {reputation_scores:?}");
Self {
good_nodes,
bad_nodes,
reputation_scores_desc: authorities_by_score,
reputation_scores,
}
}
pub(crate) fn swap(
&self,
leader: AuthorityIndex,
leader_round: Round,
leader_offset: u32,
) -> Option<AuthorityIndex> {
if self.bad_nodes.contains_key(&leader) {
assert!(
leader_offset == 0,
"Swap for multi-leader case not implemented yet."
);
let mut seed_bytes = [0u8; 32];
seed_bytes[24..28].copy_from_slice(&leader_round.to_le_bytes());
seed_bytes[28..32].copy_from_slice(&leader_offset.to_le_bytes());
let mut rng = StdRng::from_seed(seed_bytes);
let (idx, _hostname, _stake) = self
.good_nodes
.choose(&mut rng)
.expect("There should be at least one good node available");
tracing::trace!(
"Swapping bad leader {} -> {} for round {}",
leader,
idx,
leader_round
);
return Some(*idx);
}
None
}
fn retrieve_first_nodes<'a>(
context: Arc<Context>,
authorities: impl Iterator<Item = &'a (AuthorityIndex, u64)>,
stake_threshold: u64,
) -> Vec<(AuthorityIndex, String, Stake)> {
let mut filtered_authorities = Vec::new();
let mut stake = 0;
for &(authority_idx, _score) in authorities {
stake += context.committee.stake(authority_idx);
if stake > (stake_threshold * context.committee.total_stake()) / 100 as Stake {
break;
}
let authority = context.committee.authority(authority_idx);
filtered_authorities.push((authority_idx, authority.hostname.clone(), authority.stake));
}
filtered_authorities
}
}
impl Debug for LeaderSwapTable {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!(
"LeaderSwapTable for {:?}, good_nodes: {:?} with stake: {}, bad_nodes: {:?} with stake: {}",
self.reputation_scores.commit_range,
self.good_nodes
.iter()
.map(|(idx, _hostname, _stake)| idx.to_owned())
.collect::<Vec<AuthorityIndex>>(),
self.good_nodes
.iter()
.map(|(_idx, _hostname, stake)| stake)
.sum::<Stake>(),
self.bad_nodes.keys().map(|idx| idx.to_owned()),
self.bad_nodes
.values()
.map(|(_hostname, stake)| stake)
.sum::<Stake>(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
storage::{mem_store::MemStore, Store, WriteBatch},
test_dag_builder::DagBuilder,
};
#[tokio::test]
async fn test_elect_leader() {
let context = Arc::new(Context::new_for_test(4).0);
let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
assert_eq!(
leader_schedule.elect_leader(0, 0),
AuthorityIndex::new_for_test(0)
);
assert_eq!(
leader_schedule.elect_leader(1, 0),
AuthorityIndex::new_for_test(1)
);
assert_eq!(
leader_schedule.elect_leader(5, 0),
AuthorityIndex::new_for_test(1)
);
assert_ne!(
leader_schedule.elect_leader_stake_based(1, 1),
leader_schedule.elect_leader_stake_based(1, 2)
);
}
#[tokio::test]
async fn test_elect_leader_stake_based() {
let context = Arc::new(Context::new_for_test(4).0);
let leader_schedule = LeaderSchedule::new(context, LeaderSwapTable::default());
assert_eq!(
leader_schedule.elect_leader_stake_based(0, 0),
AuthorityIndex::new_for_test(1)
);
assert_eq!(
leader_schedule.elect_leader_stake_based(1, 0),
AuthorityIndex::new_for_test(1)
);
assert_eq!(
leader_schedule.elect_leader_stake_based(5, 0),
AuthorityIndex::new_for_test(3)
);
assert_ne!(
leader_schedule.elect_leader_stake_based(1, 1),
leader_schedule.elect_leader_stake_based(1, 2)
);
}
#[tokio::test]
async fn test_leader_schedule_from_store() {
telemetry_subscribers::init_for_testing();
let mut context = Context::new_for_test(4).0;
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=11).build();
let mut subdags = vec![];
let mut expected_commits = vec![];
let mut blocks_to_write = vec![];
for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) {
for block in sub_dag.blocks.iter() {
blocks_to_write.push(block.clone());
}
expected_commits.push(commit);
subdags.push(sub_dag);
}
let commit_range = (1..=10).into();
let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
let committed_rounds = vec![9, 9, 10, 9];
let commit_ref = expected_commits[9].reference();
let commit_info = CommitInfo {
reputation_scores,
committed_rounds,
};
store
.write(
WriteBatch::default()
.commit_info(vec![(commit_ref, commit_info)])
.blocks(blocks_to_write)
.commits(expected_commits),
)
.unwrap();
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
assert_eq!(
dag_builder.last_committed_rounds.clone(),
dag_state.read().last_committed_rounds()
);
assert_eq!(1, dag_state.read().scoring_subdags_count());
let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
let expected_scores = ReputationScores::new((11..=11).into(), vec![0, 0, 0, 0]);
assert_eq!(recovered_scores, expected_scores);
let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
let leader_swap_table = leader_schedule.leader_swap_table.read();
assert_eq!(leader_swap_table.good_nodes.len(), 1);
assert_eq!(
leader_swap_table.good_nodes[0].0,
AuthorityIndex::new_for_test(0)
);
assert_eq!(leader_swap_table.bad_nodes.len(), 1);
assert!(
leader_swap_table
.bad_nodes
.contains_key(&AuthorityIndex::new_for_test(2)),
"{:?}",
leader_swap_table.bad_nodes
);
}
#[tokio::test]
async fn test_leader_schedule_from_store_no_commits() {
telemetry_subscribers::init_for_testing();
let mut context = Context::new_for_test(4).0;
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
let expected_last_committed_rounds = vec![0, 0, 0, 0];
assert_eq!(
expected_last_committed_rounds,
dag_state.read().last_committed_rounds()
);
assert_eq!(0, dag_state.read().scoring_subdags_count());
let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
let leader_swap_table = leader_schedule.leader_swap_table.read();
assert_eq!(leader_swap_table.good_nodes.len(), 0);
assert_eq!(leader_swap_table.bad_nodes.len(), 0);
}
#[tokio::test]
async fn test_leader_schedule_from_store_no_commit_info() {
telemetry_subscribers::init_for_testing();
let mut context = Context::new_for_test(4).0;
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context);
let store = Arc::new(MemStore::new());
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=2).build();
let mut expected_scored_subdags = vec![];
let mut expected_commits = vec![];
let mut blocks_to_write = vec![];
for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) {
for block in sub_dag.blocks.iter() {
blocks_to_write.push(block.clone());
}
expected_commits.push(commit);
expected_scored_subdags.push(sub_dag);
}
store
.write(
WriteBatch::default()
.blocks(blocks_to_write)
.commits(expected_commits),
)
.unwrap();
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
assert_eq!(
dag_builder.last_committed_rounds.clone(),
dag_state.read().last_committed_rounds()
);
assert_eq!(
expected_scored_subdags.len(),
dag_state.read().scoring_subdags_count()
);
let recovered_scores = dag_state.read().calculate_scoring_subdag_scores();
let expected_scores = ReputationScores::new((1..=2).into(), vec![0, 0, 0, 0]);
assert_eq!(recovered_scores, expected_scores);
let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
let leader_swap_table = leader_schedule.leader_swap_table.read();
assert_eq!(leader_swap_table.good_nodes.len(), 0);
assert_eq!(leader_swap_table.bad_nodes.len(), 0);
}
#[tokio::test]
async fn test_leader_schedule_commits_until_leader_schedule_update() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
Arc::new(MemStore::new()),
)));
let unscored_subdags = vec![CommittedSubDag::new(
BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
vec![],
vec![],
context.clock.timestamp_utc_ms(),
CommitRef::new(1, CommitDigest::MIN),
vec![],
)];
dag_state.write().add_scoring_subdags(unscored_subdags);
let commits_until_leader_schedule_update =
leader_schedule.commits_until_leader_schedule_update(dag_state.clone());
assert_eq!(commits_until_leader_schedule_update, 299);
}
#[tokio::test]
async fn test_leader_schedule_update_leader_schedule() {
telemetry_subscribers::init_for_testing();
let mut context = Context::new_for_test(4).0;
context
.protocol_config
.set_consensus_bad_nodes_stake_threshold_for_testing(33);
let context = Arc::new(context);
let leader_schedule = Arc::new(LeaderSchedule::new(
context.clone(),
LeaderSwapTable::default(),
));
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
Arc::new(MemStore::new()),
)));
let max_round: u32 = 4;
let num_authorities: u32 = 4;
let mut blocks = Vec::new();
let (genesis_references, genesis): (Vec<_>, Vec<_>) = context
.committee
.authorities()
.map(|index| {
let author_idx = index.0.value() as u32;
let block = TestBlock::new(0, author_idx).build();
VerifiedBlock::new_for_test(block)
})
.map(|block| (block.reference(), block))
.unzip();
blocks.extend(genesis);
let mut ancestors = genesis_references;
let mut leader = None;
for round in 1..=max_round {
let mut new_ancestors = vec![];
for author in 0..num_authorities {
let base_ts = round as BlockTimestampMs * 1000;
let block = VerifiedBlock::new_for_test(
TestBlock::new(round, author)
.set_timestamp_ms(base_ts + (author + round) as u64)
.set_ancestors(ancestors.clone())
.build(),
);
new_ancestors.push(block.reference());
if round == 3 && author == 0 {
tracing::info!("Skipping {block} in committed subdags blocks");
continue;
}
blocks.push(block.clone());
if round == max_round {
leader = Some(block.clone());
break;
}
}
ancestors = new_ancestors;
}
let leader_block = leader.unwrap();
let leader_ref = leader_block.reference();
let commit_index = 1;
let rejected_transactions = vec![vec![]; blocks.len()];
let last_commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
leader_ref,
blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
);
let unscored_subdags = vec![CommittedSubDag::new(
leader_ref,
blocks,
rejected_transactions,
context.clock.timestamp_utc_ms(),
last_commit.reference(),
vec![],
)];
let mut dag_state_write = dag_state.write();
dag_state_write.set_last_commit(last_commit);
dag_state_write.add_scoring_subdags(unscored_subdags);
drop(dag_state_write);
assert_eq!(
leader_schedule.elect_leader(4, 0),
AuthorityIndex::new_for_test(0)
);
leader_schedule.update_leader_schedule_v2(&dag_state);
let leader_swap_table = leader_schedule.leader_swap_table.read();
assert_eq!(leader_swap_table.good_nodes.len(), 1);
assert_eq!(
leader_swap_table.good_nodes[0].0,
AuthorityIndex::new_for_test(2)
);
assert_eq!(leader_swap_table.bad_nodes.len(), 1);
assert!(leader_swap_table
.bad_nodes
.contains_key(&AuthorityIndex::new_for_test(0)));
assert_eq!(
leader_schedule.elect_leader(4, 0),
AuthorityIndex::new_for_test(2)
);
}
#[tokio::test]
async fn test_leader_swap_table() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let swap_stake_threshold = 33;
let reputation_scores = ReputationScores::new(
(0..=10).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
assert_eq!(leader_swap_table.good_nodes.len(), 1);
assert_eq!(
leader_swap_table.good_nodes[0].0,
AuthorityIndex::new_for_test(3)
);
assert_eq!(leader_swap_table.bad_nodes.len(), 1);
assert!(leader_swap_table
.bad_nodes
.contains_key(&AuthorityIndex::new_for_test(0)));
}
#[tokio::test]
async fn test_leader_swap_table_swap() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let swap_stake_threshold = 33;
let reputation_scores = ReputationScores::new(
(0..=10).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
let leader = AuthorityIndex::new_for_test(0);
let leader_round = 1;
let leader_offset = 0;
let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
assert_eq!(swapped_leader, Some(AuthorityIndex::new_for_test(3)));
let leader = AuthorityIndex::new_for_test(1);
let leader_round = 1;
let leader_offset = 0;
let swapped_leader = leader_swap_table.swap(leader, leader_round, leader_offset);
assert_eq!(swapped_leader, None);
}
#[tokio::test]
async fn test_leader_swap_table_retrieve_first_nodes() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let authorities = [
(AuthorityIndex::new_for_test(0), 1),
(AuthorityIndex::new_for_test(1), 2),
(AuthorityIndex::new_for_test(2), 3),
(AuthorityIndex::new_for_test(3), 4),
];
let stake_threshold = 50;
let filtered_authorities = LeaderSwapTable::retrieve_first_nodes(
context.clone(),
authorities.iter(),
stake_threshold,
);
assert_eq!(filtered_authorities.len(), 2);
let authority_0_idx = AuthorityIndex::new_for_test(0);
let authority_0 = context.committee.authority(authority_0_idx);
assert!(filtered_authorities.contains(&(
authority_0_idx,
authority_0.hostname.clone(),
authority_0.stake
)));
let authority_1_idx = AuthorityIndex::new_for_test(1);
let authority_1 = context.committee.authority(authority_1_idx);
assert!(filtered_authorities.contains(&(
authority_1_idx,
authority_1.hostname.clone(),
authority_1.stake
)));
}
#[tokio::test]
#[should_panic(
expected = "The swap_stake_threshold (34) should be in range [0 - 33], out of bounds parameter detected"
)]
async fn test_leader_swap_table_swap_stake_threshold_out_of_bounds() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let swap_stake_threshold = 34;
let reputation_scores = ReputationScores::new(
(0..=10).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
LeaderSwapTable::new_inner(context, swap_stake_threshold, 0, reputation_scores);
}
#[tokio::test]
async fn test_update_leader_swap_table() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let swap_stake_threshold = 33;
let reputation_scores = ReputationScores::new(
(1..=10).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
leader_schedule.update_leader_swap_table(leader_swap_table.clone());
let reputation_scores = ReputationScores::new(
(11..=20).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
leader_schedule.update_leader_swap_table(leader_swap_table.clone());
}
#[tokio::test]
#[should_panic(
expected = "The new LeaderSwapTable has an invalid CommitRange. Old LeaderSwapTable CommitRange(11..=20) vs new LeaderSwapTable CommitRange(21..=25)"
)]
async fn test_update_bad_leader_swap_table() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let swap_stake_threshold = 33;
let reputation_scores = ReputationScores::new(
(1..=10).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
leader_schedule.update_leader_swap_table(leader_swap_table.clone());
let reputation_scores = ReputationScores::new(
(11..=20).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
leader_schedule.update_leader_swap_table(leader_swap_table.clone());
let reputation_scores = ReputationScores::new(
(21..=25).into(),
(0..4).map(|i| i as u64).collect::<Vec<_>>(),
);
let leader_swap_table =
LeaderSwapTable::new_inner(context.clone(), swap_stake_threshold, 0, reputation_scores);
leader_schedule.update_leader_swap_table(leader_swap_table.clone());
}
}