1use std::{cmp::Ordering, sync::Arc};
5
6use consensus_types::block::{BlockRef, Round};
7use tokio::time::Instant;
8use tracing::{debug, info};
9
10use crate::{
11 context::Context,
12 stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16 context: Arc<Context>,
17 aggregator: StakeAggregator<QuorumThreshold>,
18 round: Round,
19 quorum_ts: Instant,
21}
22
23impl ThresholdClock {
24 pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
25 info!("Recovered ThresholdClock at round {}", round);
26 Self {
27 context,
28 aggregator: StakeAggregator::new(),
29 round,
30 quorum_ts: Instant::now(),
31 }
32 }
33
34 pub(crate) fn add_block(&mut self, block: BlockRef) -> bool {
37 match block.round.cmp(&self.round) {
38 Ordering::Less => false,
40 Ordering::Equal => {
41 let now = Instant::now();
42 if self.aggregator.add(block.author, &self.context.committee) {
43 self.aggregator.clear();
44 self.round = block.round + 1;
46 self.quorum_ts = now;
48 debug!(
49 "ThresholdClock advanced to round {} with block {} completing quorum",
50 self.round, block
51 );
52 return true;
53 }
54 let hostname = &self.context.committee.authority(block.author).hostname;
56 self.context
57 .metrics
58 .node_metrics
59 .block_receive_delay
60 .with_label_values(&[hostname])
61 .inc_by(now.duration_since(self.quorum_ts).as_millis() as u64);
62 false
63 }
64 Ordering::Greater => {
66 self.aggregator.clear();
67 if self.aggregator.add(block.author, &self.context.committee) {
68 self.round = block.round + 1;
70 } else {
71 self.round = block.round;
73 };
74 self.quorum_ts = Instant::now();
75 debug!(
76 "ThresholdClock advanced to round {} with block {} catching up round",
77 self.round, block
78 );
79 true
80 }
81 }
82 }
83
84 #[cfg(test)]
87 fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
88 let previous_round = self.round;
89 for block_ref in blocks {
90 self.add_block(block_ref);
91 }
92 (self.round > previous_round).then_some(self.round)
93 }
94
95 pub(crate) fn get_round(&self) -> Round {
96 self.round
97 }
98
99 pub(crate) fn get_quorum_ts(&self) -> Instant {
100 self.quorum_ts
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use consensus_types::block::BlockDigest;
107
108 use super::*;
109 use consensus_config::AuthorityIndex;
110
111 #[tokio::test]
112 async fn test_threshold_clock_add_block() {
113 let context = Arc::new(Context::new_for_test(4).0);
114 let mut aggregator = ThresholdClock::new(0, context);
115
116 aggregator.add_block(BlockRef::new(
117 0,
118 AuthorityIndex::new_for_test(0),
119 BlockDigest::default(),
120 ));
121 assert_eq!(aggregator.get_round(), 0);
122 aggregator.add_block(BlockRef::new(
123 0,
124 AuthorityIndex::new_for_test(1),
125 BlockDigest::default(),
126 ));
127 assert_eq!(aggregator.get_round(), 0);
128 aggregator.add_block(BlockRef::new(
129 0,
130 AuthorityIndex::new_for_test(2),
131 BlockDigest::default(),
132 ));
133 assert_eq!(aggregator.get_round(), 1);
134 aggregator.add_block(BlockRef::new(
135 1,
136 AuthorityIndex::new_for_test(0),
137 BlockDigest::default(),
138 ));
139 assert_eq!(aggregator.get_round(), 1);
140 aggregator.add_block(BlockRef::new(
141 1,
142 AuthorityIndex::new_for_test(3),
143 BlockDigest::default(),
144 ));
145 assert_eq!(aggregator.get_round(), 1);
146 aggregator.add_block(BlockRef::new(
147 2,
148 AuthorityIndex::new_for_test(1),
149 BlockDigest::default(),
150 ));
151 assert_eq!(aggregator.get_round(), 2);
152 aggregator.add_block(BlockRef::new(
153 1,
154 AuthorityIndex::new_for_test(1),
155 BlockDigest::default(),
156 ));
157 assert_eq!(aggregator.get_round(), 2);
158 aggregator.add_block(BlockRef::new(
159 5,
160 AuthorityIndex::new_for_test(2),
161 BlockDigest::default(),
162 ));
163 assert_eq!(aggregator.get_round(), 5);
164 }
165
166 #[tokio::test]
167 async fn test_threshold_clock_add_blocks() {
168 let context = Arc::new(Context::new_for_test(4).0);
169 let mut aggregator = ThresholdClock::new(0, context);
170
171 let block_refs = vec![
172 BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
173 BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
174 BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
175 BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
176 BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
177 BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
178 BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
179 BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
180 ];
181
182 let result = aggregator.add_blocks(block_refs);
183 assert_eq!(Some(5), result);
184 }
185
186 #[tokio::test]
187 async fn test_threshold_clock_add_block_min_committee() {
188 let context = Arc::new(Context::new_for_test(1).0);
189 let mut aggregator = ThresholdClock::new(10, context);
190
191 assert!(!aggregator.add_block(BlockRef::new(
193 9,
194 AuthorityIndex::new_for_test(0),
195 BlockDigest::default(),
196 )));
197 assert_eq!(aggregator.get_round(), 10);
198
199 assert!(aggregator.add_block(BlockRef::new(
201 10,
202 AuthorityIndex::new_for_test(0),
203 BlockDigest::default(),
204 )));
205 assert_eq!(aggregator.get_round(), 11);
206
207 aggregator.add_block(BlockRef::new(
209 20,
210 AuthorityIndex::new_for_test(0),
211 BlockDigest::default(),
212 ));
213 assert_eq!(aggregator.get_round(), 21);
214 }
215}