use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use consensus_config::AuthorityIndex;
use consensus_types::block::BlockRef;
use serde::{Deserialize, Serialize};
use crate::{
block::BlockAPI,
commit::{CommitRange, CommittedSubDag},
context::Context,
stake_aggregator::{QuorumThreshold, StakeAggregator},
};
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct ReputationScores {
pub(crate) scores_per_authority: Vec<u64>,
pub(crate) commit_range: CommitRange,
}
impl ReputationScores {
pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
Self {
scores_per_authority,
commit_range,
}
}
pub(crate) fn highest_score(&self) -> u64 {
*self.scores_per_authority.iter().max().unwrap_or(&0)
}
pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
self.scores_per_authority
.iter()
.enumerate()
.map(|(index, score)| {
(
context
.committee
.to_authority_index(index)
.expect("Should be a valid AuthorityIndex"),
*score,
)
})
.collect()
}
pub(crate) fn update_metrics(&self, context: Arc<Context>) {
for (index, score) in self.scores_per_authority.iter().enumerate() {
let authority_index = context
.committee
.to_authority_index(index)
.expect("Should be a valid AuthorityIndex");
let authority = context.committee.authority(authority_index);
if !authority.hostname.is_empty() {
context
.metrics
.node_metrics
.reputation_scores
.with_label_values(&[&authority.hostname])
.set(*score as i64);
}
}
}
}
pub(crate) struct ScoringSubdag {
pub(crate) context: Arc<Context>,
pub(crate) commit_range: Option<CommitRange>,
pub(crate) leaders: HashSet<BlockRef>,
pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
}
impl ScoringSubdag {
pub(crate) fn new(context: Arc<Context>) -> Self {
Self {
context,
commit_range: None,
leaders: HashSet::new(),
votes: BTreeMap::new(),
}
}
pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
.start_timer();
for subdag in committed_subdags {
if self.commit_range.is_none() {
self.commit_range = Some(CommitRange::new(
subdag.commit_ref.index..=subdag.commit_ref.index,
));
} else {
let commit_range = self.commit_range.as_mut().unwrap();
commit_range.extend_to(subdag.commit_ref.index);
}
tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
self.leaders.insert(subdag.leader);
for block in subdag.blocks {
for ancestor in block.ancestors() {
if ancestor.round != block.round().saturating_sub(1) {
continue;
}
if self.leaders.contains(ancestor) {
tracing::trace!(
"Found a vote {} for leader {ancestor} from authority {}",
block.reference(),
block.author()
);
assert!(self
.votes
.insert(block.reference(), StakeAggregator::new())
.is_none(), "Vote {block} already exists. Duplicate vote found for leader {ancestor}");
}
if let Some(stake) = self.votes.get_mut(ancestor) {
tracing::trace!(
"Found a distributed vote {ancestor} from authority {}",
ancestor.author
);
stake.add(block.author(), &self.context.committee);
}
}
}
}
}
pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
let scores_per_authority = self.distributed_votes_scores();
ReputationScores::new(
self.commit_range
.clone()
.expect("CommitRange should be set if calculate_scores is called."),
scores_per_authority,
)
}
fn distributed_votes_scores(&self) -> Vec<u64> {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["ScoringSubdag::score_distributed_votes"])
.start_timer();
let num_authorities = self.context.committee.size();
let mut scores_per_authority = vec![0_u64; num_authorities];
for (vote, stake_agg) in self.votes.iter() {
let authority = vote.author;
let stake = stake_agg.stake();
tracing::trace!(
"[{}] scores +{stake} reputation for {authority}!",
self.context.own_index,
);
scores_per_authority[authority.value()] += stake;
}
scores_per_authority
}
pub(crate) fn scored_subdags_count(&self) -> usize {
if let Some(commit_range) = &self.commit_range {
commit_range.size()
} else {
0
}
}
pub(crate) fn is_empty(&self) -> bool {
self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
}
pub(crate) fn clear(&mut self) {
self.leaders.clear();
self.votes.clear();
self.commit_range = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_dag_builder::DagBuilder;
#[tokio::test]
async fn test_reputation_scores_authorities_by_score() {
let context = Arc::new(Context::new_for_test(4).0);
let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
let authorities = scores.authorities_by_score(context);
assert_eq!(
authorities,
vec![
(AuthorityIndex::new_for_test(0), 4),
(AuthorityIndex::new_for_test(1), 1),
(AuthorityIndex::new_for_test(2), 1),
(AuthorityIndex::new_for_test(3), 3),
]
);
}
#[tokio::test]
async fn test_reputation_scores_update_metrics() {
let context = Arc::new(Context::new_for_test(4).0);
let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
scores.update_metrics(context.clone());
let metrics = context.metrics.node_metrics.reputation_scores.clone();
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_0"])
.unwrap()
.get(),
1
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_1"])
.unwrap()
.get(),
2
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_2"])
.unwrap()
.get(),
4
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_3"])
.unwrap()
.get(),
3
);
}
#[tokio::test]
async fn test_scoring_subdag() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=3).build();
dag_builder
.layer(4)
.authorities(vec![
AuthorityIndex::new_for_test(1),
AuthorityIndex::new_for_test(2),
AuthorityIndex::new_for_test(3),
])
.skip_block()
.build();
let mut scoring_subdag = ScoringSubdag::new(context.clone());
for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
scoring_subdag.add_subdags(vec![sub_dag]);
}
let scores = scoring_subdag.calculate_distributed_vote_scores();
assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
assert_eq!(scores.commit_range, (1..=4).into());
}
}