consensus_core/
commit_vote_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use parking_lot::Mutex;
7
8use crate::{
9    CommitIndex,
10    block::{BlockAPI as _, VerifiedBlock},
11    commit::GENESIS_COMMIT_INDEX,
12    context::Context,
13};
14
15/// Monitors the progress of consensus commits across the network.
16pub(crate) struct CommitVoteMonitor {
17    context: Arc<Context>,
18    // Highest commit index voted by each authority.
19    highest_voted_commits: Mutex<Vec<CommitIndex>>,
20}
21
22impl CommitVoteMonitor {
23    pub(crate) fn new(context: Arc<Context>) -> Self {
24        let highest_voted_commits = Mutex::new(vec![0; context.committee.size()]);
25        Self {
26            context,
27            highest_voted_commits,
28        }
29    }
30
31    /// Keeps track of the highest commit voted by each authority.
32    pub(crate) fn observe_block(&self, block: &VerifiedBlock) {
33        let mut highest_voted_commits = self.highest_voted_commits.lock();
34        for vote in block.commit_votes() {
35            if vote.index > highest_voted_commits[block.author()] {
36                highest_voted_commits[block.author()] = vote.index;
37            }
38        }
39    }
40
41    // Finds the highest commit index certified by a quorum.
42    // When an authority votes for commit index S, it is also voting for all commit indices 1 <= i < S.
43    // So the quorum commit index is the smallest index S such that the sum of stakes of authorities
44    // voting for commit indices >= S passes the quorum threshold.
45    pub(crate) fn quorum_commit_index(&self) -> CommitIndex {
46        let highest_voted_commits = self.highest_voted_commits.lock();
47        let mut highest_voted_commits = highest_voted_commits
48            .iter()
49            .zip(self.context.committee.authorities())
50            .map(|(commit_index, (_, a))| (*commit_index, a.stake))
51            .collect::<Vec<_>>();
52        // Sort by commit index then stake, in descending order.
53        highest_voted_commits.sort_by(|a, b| a.cmp(b).reverse());
54        let mut total_stake = 0;
55        for (commit_index, stake) in highest_voted_commits {
56            total_stake += stake;
57            if total_stake >= self.context.committee.quorum_threshold() {
58                return commit_index;
59            }
60        }
61        GENESIS_COMMIT_INDEX
62    }
63}
64
65#[cfg(test)]
66mod test {
67    use std::sync::Arc;
68
69    use super::CommitVoteMonitor;
70    use crate::{
71        block::{TestBlock, VerifiedBlock},
72        commit::{CommitDigest, CommitRef},
73        context::Context,
74    };
75
76    #[tokio::test]
77    async fn test_commit_vote_monitor() {
78        let context = Arc::new(Context::new_for_test(4).0);
79        let monitor = CommitVoteMonitor::new(context.clone());
80
81        // Observe commit votes for indices 5, 6, 7, 8 from blocks.
82        let blocks = (0..4)
83            .map(|i| {
84                VerifiedBlock::new_for_test(
85                    TestBlock::new(10, i)
86                        .set_commit_votes(vec![CommitRef::new(5 + i, CommitDigest::MIN)])
87                        .build(),
88                )
89            })
90            .collect::<Vec<_>>();
91        for b in blocks {
92            monitor.observe_block(&b);
93        }
94
95        // CommitIndex 6 is the highest index supported by a quorum.
96        assert_eq!(monitor.quorum_commit_index(), 6);
97
98        // Observe new blocks with new votes from authority 0 and 1.
99        let blocks = (0..2)
100            .map(|i| {
101                VerifiedBlock::new_for_test(
102                    TestBlock::new(11, i)
103                        .set_commit_votes(vec![
104                            CommitRef::new(6 + i, CommitDigest::MIN),
105                            CommitRef::new(7 + i, CommitDigest::MIN),
106                        ])
107                        .build(),
108                )
109            })
110            .collect::<Vec<_>>();
111        for b in blocks {
112            monitor.observe_block(&b);
113        }
114
115        // Highest commit index per authority should be 7, 8, 7, 8 now.
116        assert_eq!(monitor.quorum_commit_index(), 7);
117    }
118}