1use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use mysten_common::ZipDebugEqIteratorExt;
17use tracing::{debug, trace};
18
19use crate::{
20 block::{BlockAPI, ExtendedBlock},
21 context::Context,
22};
23
24pub(crate) type QuorumRound = (Round, Round);
40
41pub(crate) struct RoundTracker {
42 context: Arc<Context>,
43
44 block_accepted_rounds: Vec<Vec<Round>>,
47 probed_accepted_rounds: Vec<Vec<Round>>,
49 probed_received_rounds: Vec<Vec<Round>>,
51
52 local_highest_received_rounds: Vec<Round>,
56}
57
58impl RoundTracker {
59 pub(crate) fn new(context: Arc<Context>, initial_received_rounds: Vec<Round>) -> Self {
60 let size = context.committee.size();
61 let local_highest_received_rounds = if initial_received_rounds.is_empty() {
62 vec![0; size]
63 } else {
64 assert_eq!(
65 initial_received_rounds.len(),
66 size,
67 "initial_received_rounds must be empty or have the same size as the committee"
68 );
69 initial_received_rounds
70 };
71 Self {
72 context,
73 block_accepted_rounds: vec![vec![0; size]; size],
74 probed_accepted_rounds: vec![vec![0; size]; size],
75 probed_received_rounds: vec![vec![0; size]; size],
76 local_highest_received_rounds,
77 }
78 }
79
80 pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
85 let block = &extended_block.block;
86 let excluded_ancestors = &extended_block.excluded_ancestors;
87 let author = block.author();
88
89 self.local_highest_received_rounds[author] =
91 self.local_highest_received_rounds[author].max(block.round());
92
93 self.block_accepted_rounds[author][author] =
95 self.block_accepted_rounds[author][author].max(block.round());
96
97 for ancestor in block.ancestors() {
99 self.block_accepted_rounds[author][ancestor.author] =
100 self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
101 }
102
103 for excluded_ancestor in excluded_ancestors {
105 self.block_accepted_rounds[author][excluded_ancestor.author] = self
106 .block_accepted_rounds[author][excluded_ancestor.author]
107 .max(excluded_ancestor.round);
108 }
109 }
110
111 pub(crate) fn update_from_probe(
113 &mut self,
114 accepted_rounds: Vec<Vec<Round>>,
115 received_rounds: Vec<Vec<Round>>,
116 ) {
117 self.probed_accepted_rounds = accepted_rounds;
118 self.probed_received_rounds = received_rounds;
119 }
120
121 pub(crate) fn local_highest_received_rounds(&self) -> Vec<Round> {
123 self.local_highest_received_rounds.clone()
124 }
125
126 pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
128 let own_index = self.context.own_index;
129 let node_metrics = &self.context.metrics.node_metrics;
130 let received_quorum_rounds = self.compute_received_quorum_rounds();
131 let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
132 for ((low, high), (_, authority)) in received_quorum_rounds
133 .iter()
134 .zip_debug_eq(self.context.committee.authorities())
135 {
136 node_metrics
137 .round_tracker_received_quorum_round_gaps
138 .with_label_values(&[&authority.hostname])
139 .set((high - low) as i64);
140 node_metrics
141 .round_tracker_low_received_quorum_round
142 .with_label_values(&[&authority.hostname])
143 .set(*low as i64);
144 node_metrics
146 .round_tracker_current_received_round_gaps
147 .with_label_values(&[&authority.hostname])
148 .set(last_proposed_round as i64 - *low as i64);
149 }
150
151 for ((low, high), (_, authority)) in accepted_quorum_rounds
152 .iter()
153 .zip_debug_eq(self.context.committee.authorities())
154 {
155 node_metrics
156 .round_tracker_accepted_quorum_round_gaps
157 .with_label_values(&[&authority.hostname])
158 .set((high - low) as i64);
159 node_metrics
160 .round_tracker_low_accepted_quorum_round
161 .with_label_values(&[&authority.hostname])
162 .set(*low as i64);
163 node_metrics
165 .round_tracker_current_accepted_round_gaps
166 .with_label_values(&[&authority.hostname])
167 .set(last_proposed_round as i64 - *low as i64);
168 }
169
170 let propagation_delay = last_proposed_round
185 .saturating_sub(received_quorum_rounds[own_index].0)
186 .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
187
188 node_metrics
189 .round_tracker_propagation_delays
190 .observe(propagation_delay as f64);
191 node_metrics
192 .round_tracker_last_propagation_delay
193 .set(propagation_delay as i64);
194
195 debug!(
196 "Computed propagation delay of {propagation_delay} based on last proposed \
197 round ({last_proposed_round})."
198 );
199
200 propagation_delay
201 }
202
203 pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
204 let highest_accepted_rounds = self
205 .probed_accepted_rounds
206 .iter()
207 .zip_debug_eq(self.block_accepted_rounds.iter())
208 .map(|(probed_rounds, block_rounds)| {
209 probed_rounds
210 .iter()
211 .zip_debug_eq(block_rounds.iter())
212 .map(|(probed_round, block_round)| *probed_round.max(block_round))
213 .collect::<Vec<Round>>()
214 })
215 .collect::<Vec<Vec<Round>>>();
216 let accepted_quorum_rounds = self
217 .context
218 .committee
219 .authorities()
220 .map(|(peer, _)| {
221 compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
222 })
223 .collect::<Vec<_>>();
224
225 trace!(
226 "Computed accepted quorum round per authority: {}",
227 self.context
228 .committee
229 .authorities()
230 .zip_debug_eq(accepted_quorum_rounds.iter())
231 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
232 .join(", ")
233 );
234
235 accepted_quorum_rounds
236 }
237
238 fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
239 let received_quorum_rounds = self
240 .context
241 .committee
242 .authorities()
243 .map(|(peer, _)| {
244 compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
245 })
246 .collect::<Vec<_>>();
247
248 trace!(
249 "Computed received quorum round per authority: {}",
250 self.context
251 .committee
252 .authorities()
253 .zip_debug_eq(received_quorum_rounds.iter())
254 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
255 .join(", ")
256 );
257
258 received_quorum_rounds
259 }
260}
261
262fn compute_quorum_round(
264 committee: &Committee,
265 target_index: AuthorityIndex,
266 highest_rounds: &[Vec<Round>],
267) -> QuorumRound {
268 let mut rounds_with_stake = highest_rounds
269 .iter()
270 .zip_debug_eq(committee.authorities())
271 .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
272 .collect::<Vec<_>>();
273 rounds_with_stake.sort();
274
275 let mut total_stake = 0;
279 let mut low = 0;
280 for (round, stake) in rounds_with_stake.iter().rev() {
281 total_stake += stake;
282 if total_stake >= committee.quorum_threshold() {
283 low = *round;
284 break;
285 }
286 }
287
288 let mut total_stake = 0;
289 let mut high = 0;
290 for (round, stake) in rounds_with_stake.iter() {
291 total_stake += stake;
292 if total_stake >= committee.quorum_threshold() {
293 high = *round;
294 break;
295 }
296 }
297
298 (low, high)
299}
300
301#[cfg(test)]
302mod test {
303 use std::sync::Arc;
304
305 use consensus_config::AuthorityIndex;
306 use consensus_types::block::{BlockDigest, BlockRef};
307
308 use crate::{
309 TestBlock, VerifiedBlock,
310 block::ExtendedBlock,
311 context::Context,
312 round_tracker::{RoundTracker, compute_quorum_round},
313 };
314
315 #[tokio::test]
316 async fn test_compute_quorum_round() {
317 let (context, _) = Context::new_for_test(4);
318
319 let highest_received_rounds = vec![
321 vec![10, 11, 12, 13],
322 vec![5, 2, 7, 4],
323 vec![0, 0, 0, 0],
324 vec![3, 4, 5, 6],
325 ];
326
327 let round = compute_quorum_round(
328 &context.committee,
329 AuthorityIndex::new_for_test(0),
330 &highest_received_rounds,
331 );
332 assert_eq!(round, (3, 5));
333
334 let round = compute_quorum_round(
335 &context.committee,
336 AuthorityIndex::new_for_test(1),
337 &highest_received_rounds,
338 );
339 assert_eq!(round, (2, 4));
340
341 let round = compute_quorum_round(
342 &context.committee,
343 AuthorityIndex::new_for_test(2),
344 &highest_received_rounds,
345 );
346 assert_eq!(round, (5, 7));
347
348 let round = compute_quorum_round(
349 &context.committee,
350 AuthorityIndex::new_for_test(3),
351 &highest_received_rounds,
352 );
353 assert_eq!(round, (4, 6));
354 }
355
356 #[tokio::test]
357 async fn test_compute_received_quorum_round() {
358 telemetry_subscribers::init_for_testing();
359 let (context, _) = Context::new_for_test(4);
360 let context = Arc::new(context);
361 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
362
363 let highest_received_rounds = vec![
365 vec![10, 11, 12, 13],
366 vec![5, 2, 7, 4],
367 vec![0, 0, 0, 0],
368 vec![3, 4, 5, 6],
369 ];
370
371 let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
372
373 round_tracker.update_from_probe(vec![], highest_received_rounds);
374
375 let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
376
377 assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
378 }
379
380 #[tokio::test]
381 async fn test_compute_accepted_quorum_round() {
382 const NUM_AUTHORITIES: usize = 4;
383 let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
384 let context = Arc::new(context);
385 let own_index = context.own_index.value() as u32;
386 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
387
388 let highest_accepted_rounds = vec![
390 vec![10, 11, 12, 13],
391 vec![5, 2, 7, 4],
392 vec![0, 0, 0, 0],
393 vec![3, 4, 5, 6],
394 ];
395
396 round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
397
398 let test_block = TestBlock::new(7, 2)
400 .set_ancestors(vec![BlockRef::new(
401 6,
402 AuthorityIndex::new_for_test(3),
403 BlockDigest::MIN,
404 )])
405 .build();
406 let block = VerifiedBlock::new_for_test(test_block);
407 round_tracker.update_from_verified_block(&ExtendedBlock {
408 block,
409 excluded_ancestors: vec![BlockRef::new(
410 8,
411 AuthorityIndex::new_for_test(1),
412 BlockDigest::MIN,
413 )],
414 });
415
416 let test_block = TestBlock::new(11, own_index)
420 .set_ancestors(vec![
421 BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
422 BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
423 ])
424 .build();
425 let block = VerifiedBlock::new_for_test(test_block);
426 round_tracker.update_from_verified_block(&ExtendedBlock {
427 block,
428 excluded_ancestors: vec![BlockRef::new(
429 8,
430 AuthorityIndex::new_for_test(1),
431 BlockDigest::MIN,
432 )],
433 });
434
435 let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
443 let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
444
445 assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
446 }
447
448 #[tokio::test]
449 async fn test_quorum_round_manager() {
450 const NUM_AUTHORITIES: usize = 7;
451 let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
452
453 let highest_received_rounds = vec![
454 vec![110, 120, 130, 140, 150, 160, 170],
455 vec![109, 121, 131, 0, 151, 161, 171],
456 vec![101, 0, 103, 104, 105, 166, 107],
457 vec![0, 0, 0, 0, 0, 0, 0],
458 vec![100, 102, 133, 0, 155, 106, 177],
459 vec![105, 115, 103, 0, 125, 126, 127],
460 vec![0, 0, 0, 0, 0, 0, 0],
461 ];
462
463 let highest_accepted_rounds = vec![
464 vec![110, 120, 130, 140, 150, 160, 170],
465 vec![0, 121, 131, 0, 151, 161, 171],
466 vec![1, 0, 103, 104, 105, 166, 107],
467 vec![0, 0, 0, 0, 0, 0, 0],
468 vec![0, 102, 133, 0, 155, 106, 177],
469 vec![1, 115, 103, 0, 125, 126, 127],
470 vec![0, 0, 0, 0, 0, 0, 0],
471 ];
472
473 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
474
475 round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
476
477 for authority in 0..NUM_AUTHORITIES {
479 let round = 110 + (authority as u32 * 10);
480 let block =
481 VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
482 round_tracker.update_from_verified_block(&ExtendedBlock {
483 block,
484 excluded_ancestors: vec![],
485 });
486 }
487
488 let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
499 let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
500 assert_eq!(
501 received_quorum_rounds,
502 vec![
503 (100, 105),
504 (0, 115),
505 (103, 130),
506 (0, 0),
507 (105, 150),
508 (106, 160),
509 (107, 170)
510 ]
511 );
512
513 assert_eq!(
524 accepted_quorum_rounds,
525 vec![
526 (0, 1),
527 (0, 115),
528 (103, 130),
529 (0, 104),
530 (105, 150),
531 (106, 160),
532 (127, 170)
533 ]
534 );
535
536 let propagation_delay = round_tracker.calculate_propagation_delay(110);
537
538 assert_eq!(propagation_delay, 10);
540 }
541}