consensus_core/leader_scoring.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use consensus_config::AuthorityIndex;
use serde::{Deserialize, Serialize};
use crate::{
block::{BlockAPI, BlockRef},
commit::{CommitRange, CommittedSubDag},
context::Context,
stake_aggregator::{QuorumThreshold, StakeAggregator},
};
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct ReputationScores {
/// Score per authority. Vec index is the `AuthorityIndex`.
pub(crate) scores_per_authority: Vec<u64>,
// The range of commits these scores were calculated from.
pub(crate) commit_range: CommitRange,
}
impl ReputationScores {
pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
Self {
scores_per_authority,
commit_range,
}
}
pub(crate) fn highest_score(&self) -> u64 {
*self.scores_per_authority.iter().max().unwrap_or(&0)
}
// Returns the authorities index with score tuples.
pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
self.scores_per_authority
.iter()
.enumerate()
.map(|(index, score)| {
(
context
.committee
.to_authority_index(index)
.expect("Should be a valid AuthorityIndex"),
*score,
)
})
.collect()
}
pub(crate) fn update_metrics(&self, context: Arc<Context>) {
for (index, score) in self.scores_per_authority.iter().enumerate() {
let authority_index = context
.committee
.to_authority_index(index)
.expect("Should be a valid AuthorityIndex");
let authority = context.committee.authority(authority_index);
if !authority.hostname.is_empty() {
context
.metrics
.node_metrics
.reputation_scores
.with_label_values(&[&authority.hostname])
.set(*score as i64);
}
}
}
}
/// ScoringSubdag represents the scoring votes in a collection of subdags across
/// multiple commits.
/// These subdags are "scoring" for the purposes of leader schedule change. As
/// new subdags are added, the DAG is traversed and votes for leaders are recorded
/// and scored along with stake. On a leader schedule change, finalized reputation
/// scores will be calculated based on the votes & stake collected in this struct.
pub(crate) struct ScoringSubdag {
pub(crate) context: Arc<Context>,
pub(crate) commit_range: Option<CommitRange>,
// Only includes committed leaders for now.
// TODO: Include skipped leaders as well
pub(crate) leaders: HashSet<BlockRef>,
// A map of votes to the stake of strongly linked blocks that include that vote
// Note: Including stake aggregator so that we can quickly check if it exceeds
// quourum threshold and only include those scores for certain scoring strategies.
pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
}
impl ScoringSubdag {
pub(crate) fn new(context: Arc<Context>) -> Self {
Self {
context,
commit_range: None,
leaders: HashSet::new(),
votes: BTreeMap::new(),
}
}
pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
.start_timer();
for subdag in committed_subdags {
// If the commit range is not set, then set it to the range of the first
// committed subdag index.
if self.commit_range.is_none() {
self.commit_range = Some(CommitRange::new(
subdag.commit_ref.index..=subdag.commit_ref.index,
));
} else {
let commit_range = self.commit_range.as_mut().unwrap();
commit_range.extend_to(subdag.commit_ref.index);
}
// Add the committed leader to the list of leaders we will be scoring.
tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
self.leaders.insert(subdag.leader);
// Check each block in subdag. Blocks are in order so we should traverse the
// oldest blocks first
for block in subdag.blocks {
for ancestor in block.ancestors() {
// Weak links may point to blocks with lower round numbers
// than strong links.
if ancestor.round != block.round().saturating_sub(1) {
continue;
}
// If a blocks strong linked ancestor is in leaders, then
// it's a vote for leader.
if self.leaders.contains(ancestor) {
// There should never be duplicate references to blocks
// with strong linked ancestors to leader.
tracing::trace!(
"Found a vote {} for leader {ancestor} from authority {}",
block.reference(),
block.author()
);
assert!(self
.votes
.insert(block.reference(), StakeAggregator::new())
.is_none(), "Vote {block} already exists. Duplicate vote found for leader {ancestor}");
}
if let Some(stake) = self.votes.get_mut(ancestor) {
// Vote is strongly linked to a future block, so we
// consider this a distributed vote.
tracing::trace!(
"Found a distributed vote {ancestor} from authority {}",
ancestor.author
);
stake.add(block.author(), &self.context.committee);
}
}
}
}
}
// Iterate through votes and calculate scores for each authority based on
// distributed vote scoring strategy.
pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
let scores_per_authority = self.distributed_votes_scores();
// TODO: Normalize scores
ReputationScores::new(
self.commit_range
.clone()
.expect("CommitRange should be set if calculate_scores is called."),
scores_per_authority,
)
}
/// This scoring strategy aims to give scores based on overall vote distribution.
/// Instead of only giving one point for each vote that is included in 2f+1
/// blocks. We give a score equal to the amount of stake of all blocks that
/// included the vote.
fn distributed_votes_scores(&self) -> Vec<u64> {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["ScoringSubdag::score_distributed_votes"])
.start_timer();
let num_authorities = self.context.committee.size();
let mut scores_per_authority = vec![0_u64; num_authorities];
for (vote, stake_agg) in self.votes.iter() {
let authority = vote.author;
let stake = stake_agg.stake();
tracing::trace!(
"[{}] scores +{stake} reputation for {authority}!",
self.context.own_index,
);
scores_per_authority[authority.value()] += stake;
}
scores_per_authority
}
pub(crate) fn scored_subdags_count(&self) -> usize {
if let Some(commit_range) = &self.commit_range {
commit_range.size()
} else {
0
}
}
pub(crate) fn is_empty(&self) -> bool {
self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
}
pub(crate) fn clear(&mut self) {
self.leaders.clear();
self.votes.clear();
self.commit_range = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_dag_builder::DagBuilder;
#[tokio::test]
async fn test_reputation_scores_authorities_by_score() {
let context = Arc::new(Context::new_for_test(4).0);
let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
let authorities = scores.authorities_by_score(context);
assert_eq!(
authorities,
vec![
(AuthorityIndex::new_for_test(0), 4),
(AuthorityIndex::new_for_test(1), 1),
(AuthorityIndex::new_for_test(2), 1),
(AuthorityIndex::new_for_test(3), 3),
]
);
}
#[tokio::test]
async fn test_reputation_scores_update_metrics() {
let context = Arc::new(Context::new_for_test(4).0);
let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
scores.update_metrics(context.clone());
let metrics = context.metrics.node_metrics.reputation_scores.clone();
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_0"])
.unwrap()
.get(),
1
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_1"])
.unwrap()
.get(),
2
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_2"])
.unwrap()
.get(),
4
);
assert_eq!(
metrics
.get_metric_with_label_values(&["test_host_3"])
.unwrap()
.get(),
3
);
}
#[tokio::test]
async fn test_scoring_subdag() {
telemetry_subscribers::init_for_testing();
let context = Arc::new(Context::new_for_test(4).0);
// Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder.layers(1..=3).build();
// Build round 4 but with just the leader block
dag_builder
.layer(4)
.authorities(vec![
AuthorityIndex::new_for_test(1),
AuthorityIndex::new_for_test(2),
AuthorityIndex::new_for_test(3),
])
.skip_block()
.build();
let mut scoring_subdag = ScoringSubdag::new(context.clone());
for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
scoring_subdag.add_subdags(vec![sub_dag]);
}
let scores = scoring_subdag.calculate_distributed_vote_scores();
assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
assert_eq!(scores.commit_range, (1..=4).into());
}
}