consensus_core/
commit_vote_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use mysten_common::ZipDebugEqIteratorExt;
7use parking_lot::Mutex;
8
9use crate::{
10    CommitIndex,
11    block::{BlockAPI as _, VerifiedBlock},
12    commit::GENESIS_COMMIT_INDEX,
13    context::Context,
14};
15
16// Is used to calculate the threshold for blocking blocks or triggering sync
17// when the local commit index is lagging too far from the quorum commit index.
18pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
19
20/// When a node is lagging behind peers in commits, we expect commit sync to be
21/// triggered to catch up.
22pub(crate) fn is_commit_lagging(
23    context: &Context,
24    local_commit_index: CommitIndex,
25    quorum_commit_index: CommitIndex,
26) -> bool {
27    local_commit_index + context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
28        < quorum_commit_index
29}
30
31/// Monitors the progress of consensus commits across the network.
32pub(crate) struct CommitVoteMonitor {
33    context: Arc<Context>,
34    // Highest commit index voted by each authority.
35    highest_voted_commits: Mutex<Vec<CommitIndex>>,
36}
37
38impl CommitVoteMonitor {
39    pub(crate) fn new(context: Arc<Context>) -> Self {
40        let highest_voted_commits = Mutex::new(vec![0; context.committee.size()]);
41        Self {
42            context,
43            highest_voted_commits,
44        }
45    }
46
47    /// Keeps track of the highest commit voted by each authority.
48    pub(crate) fn observe_block(&self, block: &VerifiedBlock) {
49        let mut highest_voted_commits = self.highest_voted_commits.lock();
50        for vote in block.commit_votes() {
51            if vote.index > highest_voted_commits[block.author()] {
52                highest_voted_commits[block.author()] = vote.index;
53            }
54        }
55    }
56
57    // Finds the highest commit index certified by a quorum.
58    // When an authority votes for commit index S, it is also voting for all commit indices 1 <= i < S.
59    // So the quorum commit index is the smallest index S such that the sum of stakes of authorities
60    // voting for commit indices >= S passes the quorum threshold.
61    pub(crate) fn quorum_commit_index(&self) -> CommitIndex {
62        let highest_voted_commits = self.highest_voted_commits.lock();
63        let mut highest_voted_commits = highest_voted_commits
64            .iter()
65            .zip_debug_eq(self.context.committee.authorities())
66            .map(|(commit_index, (_, a))| (*commit_index, a.stake))
67            .collect::<Vec<_>>();
68        // Sort by commit index then stake, in descending order.
69        highest_voted_commits.sort_by(|a, b| a.cmp(b).reverse());
70        let mut total_stake = 0;
71        for (commit_index, stake) in highest_voted_commits {
72            total_stake += stake;
73            if total_stake >= self.context.committee.quorum_threshold() {
74                return commit_index;
75            }
76        }
77        GENESIS_COMMIT_INDEX
78    }
79}
80
81#[cfg(test)]
82mod test {
83    use std::sync::Arc;
84
85    use super::CommitVoteMonitor;
86    use crate::{
87        block::{TestBlock, VerifiedBlock},
88        commit::{CommitDigest, CommitRef},
89        context::Context,
90    };
91
92    #[tokio::test]
93    async fn test_commit_vote_monitor() {
94        let context = Arc::new(Context::new_for_test(4).0);
95        let monitor = CommitVoteMonitor::new(context.clone());
96
97        // Observe commit votes for indices 5, 6, 7, 8 from blocks.
98        let blocks = (0..4)
99            .map(|i| {
100                VerifiedBlock::new_for_test(
101                    TestBlock::new(10, i)
102                        .set_commit_votes(vec![CommitRef::new(5 + i, CommitDigest::MIN)])
103                        .build(),
104                )
105            })
106            .collect::<Vec<_>>();
107        for b in blocks {
108            monitor.observe_block(&b);
109        }
110
111        // CommitIndex 6 is the highest index supported by a quorum.
112        assert_eq!(monitor.quorum_commit_index(), 6);
113
114        // Observe new blocks with new votes from authority 0 and 1.
115        let blocks = (0..2)
116            .map(|i| {
117                VerifiedBlock::new_for_test(
118                    TestBlock::new(11, i)
119                        .set_commit_votes(vec![
120                            CommitRef::new(6 + i, CommitDigest::MIN),
121                            CommitRef::new(7 + i, CommitDigest::MIN),
122                        ])
123                        .build(),
124                )
125            })
126            .collect::<Vec<_>>();
127        for b in blocks {
128            monitor.observe_block(&b);
129        }
130
131        // Highest commit index per authority should be 7, 8, 7, 8 now.
132        assert_eq!(monitor.quorum_commit_index(), 7);
133    }
134}