consensus_core/
commit_vote_monitor.rs1use std::sync::Arc;
5
6use mysten_common::ZipDebugEqIteratorExt;
7use parking_lot::Mutex;
8
9use crate::{
10 CommitIndex,
11 block::{BlockAPI as _, VerifiedBlock},
12 commit::GENESIS_COMMIT_INDEX,
13 context::Context,
14};
15
16pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
19
20pub(crate) fn is_commit_lagging(
23 context: &Context,
24 local_commit_index: CommitIndex,
25 quorum_commit_index: CommitIndex,
26) -> bool {
27 local_commit_index + context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
28 < quorum_commit_index
29}
30
31pub(crate) struct CommitVoteMonitor {
33 context: Arc<Context>,
34 highest_voted_commits: Mutex<Vec<CommitIndex>>,
36}
37
38impl CommitVoteMonitor {
39 pub(crate) fn new(context: Arc<Context>) -> Self {
40 let highest_voted_commits = Mutex::new(vec![0; context.committee.size()]);
41 Self {
42 context,
43 highest_voted_commits,
44 }
45 }
46
47 pub(crate) fn observe_block(&self, block: &VerifiedBlock) {
49 let mut highest_voted_commits = self.highest_voted_commits.lock();
50 for vote in block.commit_votes() {
51 if vote.index > highest_voted_commits[block.author()] {
52 highest_voted_commits[block.author()] = vote.index;
53 }
54 }
55 }
56
57 pub(crate) fn quorum_commit_index(&self) -> CommitIndex {
62 let highest_voted_commits = self.highest_voted_commits.lock();
63 let mut highest_voted_commits = highest_voted_commits
64 .iter()
65 .zip_debug_eq(self.context.committee.authorities())
66 .map(|(commit_index, (_, a))| (*commit_index, a.stake))
67 .collect::<Vec<_>>();
68 highest_voted_commits.sort_by(|a, b| a.cmp(b).reverse());
70 let mut total_stake = 0;
71 for (commit_index, stake) in highest_voted_commits {
72 total_stake += stake;
73 if total_stake >= self.context.committee.quorum_threshold() {
74 return commit_index;
75 }
76 }
77 GENESIS_COMMIT_INDEX
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use std::sync::Arc;
84
85 use super::CommitVoteMonitor;
86 use crate::{
87 block::{TestBlock, VerifiedBlock},
88 commit::{CommitDigest, CommitRef},
89 context::Context,
90 };
91
92 #[tokio::test]
93 async fn test_commit_vote_monitor() {
94 let context = Arc::new(Context::new_for_test(4).0);
95 let monitor = CommitVoteMonitor::new(context.clone());
96
97 let blocks = (0..4)
99 .map(|i| {
100 VerifiedBlock::new_for_test(
101 TestBlock::new(10, i)
102 .set_commit_votes(vec![CommitRef::new(5 + i, CommitDigest::MIN)])
103 .build(),
104 )
105 })
106 .collect::<Vec<_>>();
107 for b in blocks {
108 monitor.observe_block(&b);
109 }
110
111 assert_eq!(monitor.quorum_commit_index(), 6);
113
114 let blocks = (0..2)
116 .map(|i| {
117 VerifiedBlock::new_for_test(
118 TestBlock::new(11, i)
119 .set_commit_votes(vec![
120 CommitRef::new(6 + i, CommitDigest::MIN),
121 CommitRef::new(7 + i, CommitDigest::MIN),
122 ])
123 .build(),
124 )
125 })
126 .collect::<Vec<_>>();
127 for b in blocks {
128 monitor.observe_block(&b);
129 }
130
131 assert_eq!(monitor.quorum_commit_index(), 7);
133 }
134}