consensus_core/
leader_scoring.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, HashSet},
6    sync::Arc,
7};
8
9use consensus_config::AuthorityIndex;
10use consensus_types::block::BlockRef;
11use serde::{Deserialize, Serialize};
12
13use crate::{
14    block::BlockAPI,
15    commit::{CommitRange, CommittedSubDag},
16    context::Context,
17    stake_aggregator::{QuorumThreshold, StakeAggregator},
18};
19
20#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
21pub(crate) struct ReputationScores {
22    /// Score per authority. Vec index is the `AuthorityIndex`.
23    pub(crate) scores_per_authority: Vec<u64>,
24    // The range of commits these scores were calculated from.
25    pub(crate) commit_range: CommitRange,
26}
27
28impl ReputationScores {
29    pub(crate) fn new(commit_range: CommitRange, scores_per_authority: Vec<u64>) -> Self {
30        Self {
31            scores_per_authority,
32            commit_range,
33        }
34    }
35
36    pub(crate) fn highest_score(&self) -> u64 {
37        *self.scores_per_authority.iter().max().unwrap_or(&0)
38    }
39
40    // Returns the authorities index with score tuples.
41    pub(crate) fn authorities_by_score(&self, context: Arc<Context>) -> Vec<(AuthorityIndex, u64)> {
42        self.scores_per_authority
43            .iter()
44            .enumerate()
45            .map(|(index, score)| {
46                (
47                    context
48                        .committee
49                        .to_authority_index(index)
50                        .expect("Should be a valid AuthorityIndex"),
51                    *score,
52                )
53            })
54            .collect()
55    }
56
57    pub(crate) fn update_metrics(&self, context: Arc<Context>) {
58        for (index, score) in self.scores_per_authority.iter().enumerate() {
59            let authority_index = context
60                .committee
61                .to_authority_index(index)
62                .expect("Should be a valid AuthorityIndex");
63            let authority = context.committee.authority(authority_index);
64            if !authority.hostname.is_empty() {
65                context
66                    .metrics
67                    .node_metrics
68                    .reputation_scores
69                    .with_label_values(&[&authority.hostname])
70                    .set(*score as i64);
71            }
72        }
73    }
74}
75
76/// ScoringSubdag represents the scoring votes in a collection of subdags across
77/// multiple commits.
78/// These subdags are "scoring" for the purposes of leader schedule change. As
79/// new subdags are added, the DAG is traversed and votes for leaders are recorded
80/// and scored along with stake. On a leader schedule change, finalized reputation
81/// scores will be calculated based on the votes & stake collected in this struct.
82pub(crate) struct ScoringSubdag {
83    pub(crate) context: Arc<Context>,
84    pub(crate) commit_range: Option<CommitRange>,
85    // Only includes committed leaders for now.
86    // TODO: Include skipped leaders as well
87    pub(crate) leaders: HashSet<BlockRef>,
88    // A map of votes to the stake of strongly linked blocks that include that vote
89    // Note: Including stake aggregator so that we can quickly check if it exceeds
90    // quourum threshold and only include those scores for certain scoring strategies.
91    pub(crate) votes: BTreeMap<BlockRef, StakeAggregator<QuorumThreshold>>,
92}
93
94impl ScoringSubdag {
95    pub(crate) fn new(context: Arc<Context>) -> Self {
96        Self {
97            context,
98            commit_range: None,
99            leaders: HashSet::new(),
100            votes: BTreeMap::new(),
101        }
102    }
103
104    pub(crate) fn add_subdags(&mut self, committed_subdags: Vec<CommittedSubDag>) {
105        let _s = self
106            .context
107            .metrics
108            .node_metrics
109            .scope_processing_time
110            .with_label_values(&["ScoringSubdag::add_unscored_committed_subdags"])
111            .start_timer();
112        for subdag in committed_subdags {
113            // If the commit range is not set, then set it to the range of the first
114            // committed subdag index.
115            if self.commit_range.is_none() {
116                self.commit_range = Some(CommitRange::new(
117                    subdag.commit_ref.index..=subdag.commit_ref.index,
118                ));
119            } else {
120                let commit_range = self.commit_range.as_mut().unwrap();
121                commit_range.extend_to(subdag.commit_ref.index);
122            }
123
124            // Add the committed leader to the list of leaders we will be scoring.
125            tracing::trace!("Adding new committed leader {} for scoring", subdag.leader);
126            self.leaders.insert(subdag.leader);
127
128            // Check each block in subdag. Blocks are in order so we should traverse the
129            // oldest blocks first
130            for block in subdag.blocks {
131                for ancestor in block.ancestors() {
132                    // Weak links may point to blocks with lower round numbers
133                    // than strong links.
134                    if ancestor.round != block.round().saturating_sub(1) {
135                        continue;
136                    }
137
138                    // If a blocks strong linked ancestor is in leaders, then
139                    // it's a vote for leader.
140                    if self.leaders.contains(ancestor) {
141                        // There should never be duplicate references to blocks
142                        // with strong linked ancestors to leader.
143                        tracing::trace!(
144                            "Found a vote {} for leader {ancestor} from authority {}",
145                            block.reference(),
146                            block.author()
147                        );
148                        assert!(
149                            self.votes
150                                .insert(block.reference(), StakeAggregator::new())
151                                .is_none(),
152                            "Vote {block} already exists. Duplicate vote found for leader {ancestor}"
153                        );
154                    }
155
156                    if let Some(stake) = self.votes.get_mut(ancestor) {
157                        // Vote is strongly linked to a future block, so we
158                        // consider this a distributed vote.
159                        tracing::trace!(
160                            "Found a distributed vote {ancestor} from authority {}",
161                            ancestor.author
162                        );
163                        stake.add(block.author(), &self.context.committee);
164                    }
165                }
166            }
167        }
168    }
169
170    // Iterate through votes and calculate scores for each authority based on
171    // distributed vote scoring strategy.
172    pub(crate) fn calculate_distributed_vote_scores(&self) -> ReputationScores {
173        let scores_per_authority = self.distributed_votes_scores();
174
175        // TODO: Normalize scores
176        ReputationScores::new(
177            self.commit_range
178                .clone()
179                .expect("CommitRange should be set if calculate_scores is called."),
180            scores_per_authority,
181        )
182    }
183
184    /// This scoring strategy aims to give scores based on overall vote distribution.
185    /// Instead of only giving one point for each vote that is included in 2f+1
186    /// blocks. We give a score equal to the amount of stake of all blocks that
187    /// included the vote.
188    fn distributed_votes_scores(&self) -> Vec<u64> {
189        let _s = self
190            .context
191            .metrics
192            .node_metrics
193            .scope_processing_time
194            .with_label_values(&["ScoringSubdag::score_distributed_votes"])
195            .start_timer();
196
197        let num_authorities = self.context.committee.size();
198        let mut scores_per_authority = vec![0_u64; num_authorities];
199
200        for (vote, stake_agg) in self.votes.iter() {
201            let authority = vote.author;
202            let stake = stake_agg.stake();
203            tracing::trace!(
204                "[{}] scores +{stake} reputation for {authority}!",
205                self.context.own_index,
206            );
207            scores_per_authority[authority.value()] += stake;
208        }
209        scores_per_authority
210    }
211
212    pub(crate) fn scored_subdags_count(&self) -> usize {
213        if let Some(commit_range) = &self.commit_range {
214            commit_range.size()
215        } else {
216            0
217        }
218    }
219
220    pub(crate) fn is_empty(&self) -> bool {
221        self.leaders.is_empty() && self.votes.is_empty() && self.commit_range.is_none()
222    }
223
224    pub(crate) fn clear(&mut self) {
225        self.leaders.clear();
226        self.votes.clear();
227        self.commit_range = None;
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::test_dag_builder::DagBuilder;
235
236    #[tokio::test]
237    async fn test_reputation_scores_authorities_by_score() {
238        let context = Arc::new(Context::new_for_test(4).0);
239        let scores = ReputationScores::new((1..=300).into(), vec![4, 1, 1, 3]);
240        let authorities = scores.authorities_by_score(context);
241        assert_eq!(
242            authorities,
243            vec![
244                (AuthorityIndex::new_for_test(0), 4),
245                (AuthorityIndex::new_for_test(1), 1),
246                (AuthorityIndex::new_for_test(2), 1),
247                (AuthorityIndex::new_for_test(3), 3),
248            ]
249        );
250    }
251
252    #[tokio::test]
253    async fn test_reputation_scores_update_metrics() {
254        let context = Arc::new(Context::new_for_test(4).0);
255        let scores = ReputationScores::new((1..=300).into(), vec![1, 2, 4, 3]);
256        scores.update_metrics(context.clone());
257        let metrics = context.metrics.node_metrics.reputation_scores.clone();
258        assert_eq!(
259            metrics
260                .get_metric_with_label_values(&["test_host_0"])
261                .unwrap()
262                .get(),
263            1
264        );
265        assert_eq!(
266            metrics
267                .get_metric_with_label_values(&["test_host_1"])
268                .unwrap()
269                .get(),
270            2
271        );
272        assert_eq!(
273            metrics
274                .get_metric_with_label_values(&["test_host_2"])
275                .unwrap()
276                .get(),
277            4
278        );
279        assert_eq!(
280            metrics
281                .get_metric_with_label_values(&["test_host_3"])
282                .unwrap()
283                .get(),
284            3
285        );
286    }
287
288    #[tokio::test]
289    async fn test_scoring_subdag() {
290        telemetry_subscribers::init_for_testing();
291        let context = Arc::new(Context::new_for_test(4).0);
292
293        // Populate fully connected test blocks for round 0 ~ 3, authorities 0 ~ 3.
294        let mut dag_builder = DagBuilder::new(context.clone());
295        dag_builder.layers(1..=3).build();
296        // Build round 4 but with just the leader block
297        dag_builder
298            .layer(4)
299            .authorities(vec![
300                AuthorityIndex::new_for_test(1),
301                AuthorityIndex::new_for_test(2),
302                AuthorityIndex::new_for_test(3),
303            ])
304            .skip_block()
305            .build();
306
307        let mut scoring_subdag = ScoringSubdag::new(context.clone());
308
309        for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) {
310            scoring_subdag.add_subdags(vec![sub_dag]);
311        }
312
313        let scores = scoring_subdag.calculate_distributed_vote_scores();
314        assert_eq!(scores.scores_per_authority, vec![5, 5, 5, 5]);
315        assert_eq!(scores.commit_range, (1..=4).into());
316    }
317}