consensus_core/
leader_scoring.rs1use std::{
5 collections::{BTreeMap, HashSet},
6 sync::Arc,
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::BlockRef;
11use serde::{Deserialize, Serialize};
12
13use crate::{
14 block::BlockAPI,
15 commit::{CommitRange, CommittedSubDag},
16 context::Context,
17 stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
21pub(crate) struct ReputationScores {
22 pub(crate) scores_per_authority: Vec<u64>,
24 pub(crate) commit_range: CommitRange,
26}
27
28impl ReputationScores {
29 pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
30 Self {
31 scores_per_authority,
32 commit_range,
33 }
34 }
35
36 pub(crate) fn highest_score(&self) -> u64 {
37 *self.scores_per_authority.iter().max().unwrap_or(&0)
38 }
39
40 pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
42 self.scores_per_authority
43 .iter()
44 .enumerate()
45 .map(|(index, score)| {
46 (
47 context
48 .committee
49 .to_authority_index(index)
50 .expect("Should be a valid AuthorityIndex"),
51 *score,
52 )
53 })
54 .collect()
55 }
56
57 pub(crate) fn update_metrics(&self, context: Arc<Context>) {
58 for (index, score) in self.scores_per_authority.iter().enumerate() {
59 let authority_index = context
60 .committee
61 .to_authority_index(index)
62 .expect("Should be a valid AuthorityIndex");
63 let authority = context.committee.authority(authority_index);
64 if !authority.hostname.is_empty() {
65 context
66 .metrics
67 .node_metrics
68 .reputation_scores
69 .with_label_values(&[&authority.hostname])
70 .set(*score as i64);
71 }
72 }
73 }
74}
75
76pub(crate) struct ScoringSubdag {
83 pub(crate) context: Arc<Context>,
84 pub(crate) commit_range: Option<CommitRange>,
85 pub(crate) leaders: HashSet<BlockRef>,
88 pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
92}
93
94impl ScoringSubdag {
95 pub(crate) fn new(context: Arc<Context>) -> Self {
96 Self {
97 context,
98 commit_range: None,
99 leaders: HashSet::new(),
100 votes: BTreeMap::new(),
101 }
102 }
103
104 pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
105 let _s = self
106 .context
107 .metrics
108 .node_metrics
109 .scope_processing_time
110 .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
111 .start_timer();
112 for subdag in committed_subdags {
113 if self.commit_range.is_none() {
116 self.commit_range = Some(CommitRange::new(
117 subdag.commit_ref.index..=subdag.commit_ref.index,
118 ));
119 } else {
120 let commit_range = self.commit_range.as_mut().unwrap();
121 commit_range.extend_to(subdag.commit_ref.index);
122 }
123
124 tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
126 self.leaders.insert(subdag.leader);
127
128 for block in subdag.blocks {
131 for ancestor in block.ancestors() {
132 if ancestor.round != block.round().saturating_sub(1) {
135 continue;
136 }
137
138 if self.leaders.contains(ancestor) {
141 tracing::trace!(
144 "Found a vote {} for leader {ancestor} from authority {}",
145 block.reference(),
146 block.author()
147 );
148 assert!(
149 self.votes
150 .insert(block.reference(), StakeAggregator::new())
151 .is_none(),
152 "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
153 );
154 }
155
156 if let Some(stake) = self.votes.get_mut(ancestor) {
157 tracing::trace!(
160 "Found a distributed vote {ancestor} from authority {}",
161 ancestor.author
162 );
163 stake.add(block.author(), &self.context.committee);
164 }
165 }
166 }
167 }
168 }
169
170 pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
173 let scores_per_authority = self.distributed_votes_scores();
174
175 ReputationScores::new(
177 self.commit_range
178 .clone()
179 .expect("CommitRange should be set if calculate_scores is called."),
180 scores_per_authority,
181 )
182 }
183
184 fn distributed_votes_scores(&self) -> Vec<u64> {
189 let _s = self
190 .context
191 .metrics
192 .node_metrics
193 .scope_processing_time
194 .with_label_values(&["ScoringSubdag::score_distributed_votes"])
195 .start_timer();
196
197 let num_authorities = self.context.committee.size();
198 let mut scores_per_authority = vec![0_u64; num_authorities];
199
200 for (vote, stake_agg) in self.votes.iter() {
201 let authority = vote.author;
202 let stake = stake_agg.stake();
203 tracing::trace!(
204 "[{}] scores +{stake} reputation for {authority}!",
205 self.context.own_index,
206 );
207 scores_per_authority[authority.value()] += stake;
208 }
209 scores_per_authority
210 }
211
212 pub(crate) fn scored_subdags_count(&self) -> usize {
213 if let Some(commit_range) = &self.commit_range {
214 commit_range.size()
215 } else {
216 0
217 }
218 }
219
220 pub(crate) fn is_empty(&self) -> bool {
221 self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
222 }
223
224 pub(crate) fn clear(&mut self) {
225 self.leaders.clear();
226 self.votes.clear();
227 self.commit_range = None;
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::test_dag_builder::DagBuilder;
235
236 #[tokio::test]
237 async fn test_reputation_scores_authorities_by_score() {
238 let context = Arc::new(Context::new_for_test(4).0);
239 let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
240 let authorities = scores.authorities_by_score(context);
241 assert_eq!(
242 authorities,
243 vec![
244 (AuthorityIndex::new_for_test(0), 4),
245 (AuthorityIndex::new_for_test(1), 1),
246 (AuthorityIndex::new_for_test(2), 1),
247 (AuthorityIndex::new_for_test(3), 3),
248 ]
249 );
250 }
251
252 #[tokio::test]
253 async fn test_reputation_scores_update_metrics() {
254 let context = Arc::new(Context::new_for_test(4).0);
255 let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
256 scores.update_metrics(context.clone());
257 let metrics = context.metrics.node_metrics.reputation_scores.clone();
258 assert_eq!(
259 metrics
260 .get_metric_with_label_values(&["test_host_0"])
261 .unwrap()
262 .get(),
263 1
264 );
265 assert_eq!(
266 metrics
267 .get_metric_with_label_values(&["test_host_1"])
268 .unwrap()
269 .get(),
270 2
271 );
272 assert_eq!(
273 metrics
274 .get_metric_with_label_values(&["test_host_2"])
275 .unwrap()
276 .get(),
277 4
278 );
279 assert_eq!(
280 metrics
281 .get_metric_with_label_values(&["test_host_3"])
282 .unwrap()
283 .get(),
284 3
285 );
286 }
287
288 #[tokio::test]
289 async fn test_scoring_subdag() {
290 telemetry_subscribers::init_for_testing();
291 let context = Arc::new(Context::new_for_test(4).0);
292
293 let mut dag_builder = DagBuilder::new(context.clone());
295 dag_builder.layers(1..=3).build();
296 dag_builder
298 .layer(4)
299 .authorities(vec![
300 AuthorityIndex::new_for_test(1),
301 AuthorityIndex::new_for_test(2),
302 AuthorityIndex::new_for_test(3),
303 ])
304 .skip_block()
305 .build();
306
307 let mut scoring_subdag = ScoringSubdag::new(context.clone());
308
309 for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
310 scoring_subdag.add_subdags(vec![sub_dag]);
311 }
312
313 let scores = scoring_subdag.calculate_distributed_vote_scores();
314 assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
315 assert_eq!(scores.commit_range, (1..=4).into());
316 }
317}