1use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19 block::{BlockAPI, ExtendedBlock},
20 context::Context,
21};
22
23pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct RoundTracker {
41 context: Arc<Context>,
42
43 block_accepted_rounds: Vec<Vec<Round>>,
46 probed_accepted_rounds: Vec<Vec<Round>>,
48 probed_received_rounds: Vec<Vec<Round>>,
50
51 local_highest_received_rounds: Vec<Round>,
55}
56
57impl RoundTracker {
58 pub(crate) fn new(context: Arc<Context>, initial_received_rounds: Vec<Round>) -> Self {
59 let size = context.committee.size();
60 let local_highest_received_rounds = if initial_received_rounds.is_empty() {
61 vec![0; size]
62 } else {
63 assert_eq!(
64 initial_received_rounds.len(),
65 size,
66 "initial_received_rounds must be empty or have the same size as the committee"
67 );
68 initial_received_rounds
69 };
70 Self {
71 context,
72 block_accepted_rounds: vec![vec![0; size]; size],
73 probed_accepted_rounds: vec![vec![0; size]; size],
74 probed_received_rounds: vec![vec![0; size]; size],
75 local_highest_received_rounds,
76 }
77 }
78
79 pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
84 let block = &extended_block.block;
85 let excluded_ancestors = &extended_block.excluded_ancestors;
86 let author = block.author();
87
88 self.local_highest_received_rounds[author] =
90 self.local_highest_received_rounds[author].max(block.round());
91
92 self.block_accepted_rounds[author][author] =
94 self.block_accepted_rounds[author][author].max(block.round());
95
96 for ancestor in block.ancestors() {
98 self.block_accepted_rounds[author][ancestor.author] =
99 self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
100 }
101
102 for excluded_ancestor in excluded_ancestors {
104 self.block_accepted_rounds[author][excluded_ancestor.author] = self
105 .block_accepted_rounds[author][excluded_ancestor.author]
106 .max(excluded_ancestor.round);
107 }
108 }
109
110 pub(crate) fn update_from_probe(
112 &mut self,
113 accepted_rounds: Vec<Vec<Round>>,
114 received_rounds: Vec<Vec<Round>>,
115 ) {
116 self.probed_accepted_rounds = accepted_rounds;
117 self.probed_received_rounds = received_rounds;
118 }
119
120 pub(crate) fn local_highest_received_rounds(&self) -> Vec<Round> {
122 self.local_highest_received_rounds.clone()
123 }
124
125 pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
127 let own_index = self.context.own_index;
128 let node_metrics = &self.context.metrics.node_metrics;
129 let received_quorum_rounds = self.compute_received_quorum_rounds();
130 let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
131 for ((low, high), (_, authority)) in received_quorum_rounds
132 .iter()
133 .zip(self.context.committee.authorities())
134 {
135 node_metrics
136 .round_tracker_received_quorum_round_gaps
137 .with_label_values(&[&authority.hostname])
138 .set((high - low) as i64);
139 node_metrics
140 .round_tracker_low_received_quorum_round
141 .with_label_values(&[&authority.hostname])
142 .set(*low as i64);
143 node_metrics
145 .round_tracker_current_received_round_gaps
146 .with_label_values(&[&authority.hostname])
147 .set(last_proposed_round as i64 - *low as i64);
148 }
149
150 for ((low, high), (_, authority)) in accepted_quorum_rounds
151 .iter()
152 .zip(self.context.committee.authorities())
153 {
154 node_metrics
155 .round_tracker_accepted_quorum_round_gaps
156 .with_label_values(&[&authority.hostname])
157 .set((high - low) as i64);
158 node_metrics
159 .round_tracker_low_accepted_quorum_round
160 .with_label_values(&[&authority.hostname])
161 .set(*low as i64);
162 node_metrics
164 .round_tracker_current_accepted_round_gaps
165 .with_label_values(&[&authority.hostname])
166 .set(last_proposed_round as i64 - *low as i64);
167 }
168
169 let propagation_delay = last_proposed_round
184 .saturating_sub(received_quorum_rounds[own_index].0)
185 .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
186
187 node_metrics
188 .round_tracker_propagation_delays
189 .observe(propagation_delay as f64);
190 node_metrics
191 .round_tracker_last_propagation_delay
192 .set(propagation_delay as i64);
193
194 debug!(
195 "Computed propagation delay of {propagation_delay} based on last proposed \
196 round ({last_proposed_round})."
197 );
198
199 propagation_delay
200 }
201
202 pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
203 let highest_accepted_rounds = self
204 .probed_accepted_rounds
205 .iter()
206 .zip(self.block_accepted_rounds.iter())
207 .map(|(probed_rounds, block_rounds)| {
208 probed_rounds
209 .iter()
210 .zip(block_rounds.iter())
211 .map(|(probed_round, block_round)| *probed_round.max(block_round))
212 .collect::<Vec<Round>>()
213 })
214 .collect::<Vec<Vec<Round>>>();
215 let accepted_quorum_rounds = self
216 .context
217 .committee
218 .authorities()
219 .map(|(peer, _)| {
220 compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
221 })
222 .collect::<Vec<_>>();
223
224 trace!(
225 "Computed accepted quorum round per authority: {}",
226 self.context
227 .committee
228 .authorities()
229 .zip(accepted_quorum_rounds.iter())
230 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
231 .join(", ")
232 );
233
234 accepted_quorum_rounds
235 }
236
237 fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
238 let received_quorum_rounds = self
239 .context
240 .committee
241 .authorities()
242 .map(|(peer, _)| {
243 compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
244 })
245 .collect::<Vec<_>>();
246
247 trace!(
248 "Computed received quorum round per authority: {}",
249 self.context
250 .committee
251 .authorities()
252 .zip(received_quorum_rounds.iter())
253 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
254 .join(", ")
255 );
256
257 received_quorum_rounds
258 }
259}
260
261fn compute_quorum_round(
263 committee: &Committee,
264 target_index: AuthorityIndex,
265 highest_rounds: &[Vec<Round>],
266) -> QuorumRound {
267 let mut rounds_with_stake = highest_rounds
268 .iter()
269 .zip(committee.authorities())
270 .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
271 .collect::<Vec<_>>();
272 rounds_with_stake.sort();
273
274 let mut total_stake = 0;
278 let mut low = 0;
279 for (round, stake) in rounds_with_stake.iter().rev() {
280 total_stake += stake;
281 if total_stake >= committee.quorum_threshold() {
282 low = *round;
283 break;
284 }
285 }
286
287 let mut total_stake = 0;
288 let mut high = 0;
289 for (round, stake) in rounds_with_stake.iter() {
290 total_stake += stake;
291 if total_stake >= committee.quorum_threshold() {
292 high = *round;
293 break;
294 }
295 }
296
297 (low, high)
298}
299
300#[cfg(test)]
301mod test {
302 use std::sync::Arc;
303
304 use consensus_config::AuthorityIndex;
305 use consensus_types::block::{BlockDigest, BlockRef};
306
307 use crate::{
308 TestBlock, VerifiedBlock,
309 block::ExtendedBlock,
310 context::Context,
311 round_tracker::{RoundTracker, compute_quorum_round},
312 };
313
314 #[tokio::test]
315 async fn test_compute_quorum_round() {
316 let (context, _) = Context::new_for_test(4);
317
318 let highest_received_rounds = vec![
320 vec![10, 11, 12, 13],
321 vec![5, 2, 7, 4],
322 vec![0, 0, 0, 0],
323 vec![3, 4, 5, 6],
324 ];
325
326 let round = compute_quorum_round(
327 &context.committee,
328 AuthorityIndex::new_for_test(0),
329 &highest_received_rounds,
330 );
331 assert_eq!(round, (3, 5));
332
333 let round = compute_quorum_round(
334 &context.committee,
335 AuthorityIndex::new_for_test(1),
336 &highest_received_rounds,
337 );
338 assert_eq!(round, (2, 4));
339
340 let round = compute_quorum_round(
341 &context.committee,
342 AuthorityIndex::new_for_test(2),
343 &highest_received_rounds,
344 );
345 assert_eq!(round, (5, 7));
346
347 let round = compute_quorum_round(
348 &context.committee,
349 AuthorityIndex::new_for_test(3),
350 &highest_received_rounds,
351 );
352 assert_eq!(round, (4, 6));
353 }
354
355 #[tokio::test]
356 async fn test_compute_received_quorum_round() {
357 telemetry_subscribers::init_for_testing();
358 let (context, _) = Context::new_for_test(4);
359 let context = Arc::new(context);
360 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
361
362 let highest_received_rounds = vec![
364 vec![10, 11, 12, 13],
365 vec![5, 2, 7, 4],
366 vec![0, 0, 0, 0],
367 vec![3, 4, 5, 6],
368 ];
369
370 let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
371
372 round_tracker.update_from_probe(vec![], highest_received_rounds);
373
374 let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
375
376 assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
377 }
378
379 #[tokio::test]
380 async fn test_compute_accepted_quorum_round() {
381 const NUM_AUTHORITIES: usize = 4;
382 let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
383 let context = Arc::new(context);
384 let own_index = context.own_index.value() as u32;
385 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
386
387 let highest_accepted_rounds = vec![
389 vec![10, 11, 12, 13],
390 vec![5, 2, 7, 4],
391 vec![0, 0, 0, 0],
392 vec![3, 4, 5, 6],
393 ];
394
395 round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
396
397 let test_block = TestBlock::new(7, 2)
399 .set_ancestors(vec![BlockRef::new(
400 6,
401 AuthorityIndex::new_for_test(3),
402 BlockDigest::MIN,
403 )])
404 .build();
405 let block = VerifiedBlock::new_for_test(test_block);
406 round_tracker.update_from_verified_block(&ExtendedBlock {
407 block,
408 excluded_ancestors: vec![BlockRef::new(
409 8,
410 AuthorityIndex::new_for_test(1),
411 BlockDigest::MIN,
412 )],
413 });
414
415 let test_block = TestBlock::new(11, own_index)
419 .set_ancestors(vec![
420 BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
421 BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
422 ])
423 .build();
424 let block = VerifiedBlock::new_for_test(test_block);
425 round_tracker.update_from_verified_block(&ExtendedBlock {
426 block,
427 excluded_ancestors: vec![BlockRef::new(
428 8,
429 AuthorityIndex::new_for_test(1),
430 BlockDigest::MIN,
431 )],
432 });
433
434 let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
442 let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
443
444 assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
445 }
446
447 #[tokio::test]
448 async fn test_quorum_round_manager() {
449 const NUM_AUTHORITIES: usize = 7;
450 let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
451
452 let highest_received_rounds = vec![
453 vec![110, 120, 130, 140, 150, 160, 170],
454 vec![109, 121, 131, 0, 151, 161, 171],
455 vec![101, 0, 103, 104, 105, 166, 107],
456 vec![0, 0, 0, 0, 0, 0, 0],
457 vec![100, 102, 133, 0, 155, 106, 177],
458 vec![105, 115, 103, 0, 125, 126, 127],
459 vec![0, 0, 0, 0, 0, 0, 0],
460 ];
461
462 let highest_accepted_rounds = vec![
463 vec![110, 120, 130, 140, 150, 160, 170],
464 vec![0, 121, 131, 0, 151, 161, 171],
465 vec![1, 0, 103, 104, 105, 166, 107],
466 vec![0, 0, 0, 0, 0, 0, 0],
467 vec![0, 102, 133, 0, 155, 106, 177],
468 vec![1, 115, 103, 0, 125, 126, 127],
469 vec![0, 0, 0, 0, 0, 0, 0],
470 ];
471
472 let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
473
474 round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
475
476 for authority in 0..NUM_AUTHORITIES {
478 let round = 110 + (authority as u32 * 10);
479 let block =
480 VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
481 round_tracker.update_from_verified_block(&ExtendedBlock {
482 block,
483 excluded_ancestors: vec![],
484 });
485 }
486
487 let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
498 let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
499 assert_eq!(
500 received_quorum_rounds,
501 vec![
502 (100, 105),
503 (0, 115),
504 (103, 130),
505 (0, 0),
506 (105, 150),
507 (106, 160),
508 (107, 170)
509 ]
510 );
511
512 assert_eq!(
523 accepted_quorum_rounds,
524 vec![
525 (0, 1),
526 (0, 115),
527 (103, 130),
528 (0, 104),
529 (105, 150),
530 (106, 160),
531 (127, 170)
532 ]
533 );
534
535 let propagation_delay = round_tracker.calculate_propagation_delay(110);
536
537 assert_eq!(propagation_delay, 10);
539 }
540}