1use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19 block::{BlockAPI, ExtendedBlock},
20 context::Context,
21};
22
23pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct PeerRoundTracker {
41 context: Arc<Context>,
42 block_accepted_rounds: Vec<Vec<Round>>,
44 probed_accepted_rounds: Vec<Vec<Round>>,
46 probed_received_rounds: Vec<Vec<Round>>,
48}
49
50impl PeerRoundTracker {
51 pub(crate) fn new(context: Arc<Context>) -> Self {
52 let size = context.committee.size();
53 Self {
54 context,
55 block_accepted_rounds: vec![vec![0; size]; size],
56 probed_accepted_rounds: vec![vec![0; size]; size],
57 probed_received_rounds: vec![vec![0; size]; size],
58 }
59 }
60
61 pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
65 let block = &extended_block.block;
66 let excluded_ancestors = &extended_block.excluded_ancestors;
67 let author = block.author();
68
69 self.block_accepted_rounds[author][author] =
71 self.block_accepted_rounds[author][author].max(block.round());
72
73 for ancestor in block.ancestors() {
75 self.block_accepted_rounds[author][ancestor.author] =
76 self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
77 }
78
79 for excluded_ancestor in excluded_ancestors {
81 self.block_accepted_rounds[author][excluded_ancestor.author] = self
82 .block_accepted_rounds[author][excluded_ancestor.author]
83 .max(excluded_ancestor.round);
84 }
85 }
86
87 pub(crate) fn update_from_probe(
89 &mut self,
90 accepted_rounds: Vec<Vec<Round>>,
91 received_rounds: Vec<Vec<Round>>,
92 ) {
93 self.probed_accepted_rounds = accepted_rounds;
94 self.probed_received_rounds = received_rounds;
95 }
96
97 pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
99 let own_index = self.context.own_index;
100 let node_metrics = &self.context.metrics.node_metrics;
101 let received_quorum_rounds = self.compute_received_quorum_rounds();
102 let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
103 for ((low, high), (_, authority)) in received_quorum_rounds
104 .iter()
105 .zip(self.context.committee.authorities())
106 {
107 node_metrics
108 .round_tracker_received_quorum_round_gaps
109 .with_label_values(&[&authority.hostname])
110 .set((high - low) as i64);
111 node_metrics
112 .round_tracker_low_received_quorum_round
113 .with_label_values(&[&authority.hostname])
114 .set(*low as i64);
115 node_metrics
117 .round_tracker_current_received_round_gaps
118 .with_label_values(&[&authority.hostname])
119 .set(last_proposed_round as i64 - *low as i64);
120 }
121
122 for ((low, high), (_, authority)) in accepted_quorum_rounds
123 .iter()
124 .zip(self.context.committee.authorities())
125 {
126 node_metrics
127 .round_tracker_accepted_quorum_round_gaps
128 .with_label_values(&[&authority.hostname])
129 .set((high - low) as i64);
130 node_metrics
131 .round_tracker_low_accepted_quorum_round
132 .with_label_values(&[&authority.hostname])
133 .set(*low as i64);
134 node_metrics
136 .round_tracker_current_accepted_round_gaps
137 .with_label_values(&[&authority.hostname])
138 .set(last_proposed_round as i64 - *low as i64);
139 }
140
141 let propagation_delay = last_proposed_round
156 .saturating_sub(received_quorum_rounds[own_index].0)
157 .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
158
159 node_metrics
160 .round_tracker_propagation_delays
161 .observe(propagation_delay as f64);
162 node_metrics
163 .round_tracker_last_propagation_delay
164 .set(propagation_delay as i64);
165
166 debug!(
167 "Computed propagation delay of {propagation_delay} based on last proposed \
168 round ({last_proposed_round})."
169 );
170
171 propagation_delay
172 }
173
174 pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
175 let highest_accepted_rounds = self
176 .probed_accepted_rounds
177 .iter()
178 .zip(self.block_accepted_rounds.iter())
179 .map(|(probed_rounds, block_rounds)| {
180 probed_rounds
181 .iter()
182 .zip(block_rounds.iter())
183 .map(|(probed_round, block_round)| *probed_round.max(block_round))
184 .collect::<Vec<Round>>()
185 })
186 .collect::<Vec<Vec<Round>>>();
187 let accepted_quorum_rounds = self
188 .context
189 .committee
190 .authorities()
191 .map(|(peer, _)| {
192 compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
193 })
194 .collect::<Vec<_>>();
195
196 trace!(
197 "Computed accepted quorum round per authority: {}",
198 self.context
199 .committee
200 .authorities()
201 .zip(accepted_quorum_rounds.iter())
202 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
203 .join(", ")
204 );
205
206 accepted_quorum_rounds
207 }
208
209 fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
210 let received_quorum_rounds = self
211 .context
212 .committee
213 .authorities()
214 .map(|(peer, _)| {
215 compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
216 })
217 .collect::<Vec<_>>();
218
219 trace!(
220 "Computed received quorum round per authority: {}",
221 self.context
222 .committee
223 .authorities()
224 .zip(received_quorum_rounds.iter())
225 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
226 .join(", ")
227 );
228
229 received_quorum_rounds
230 }
231}
232
233fn compute_quorum_round(
235 committee: &Committee,
236 target_index: AuthorityIndex,
237 highest_rounds: &[Vec<Round>],
238) -> QuorumRound {
239 let mut rounds_with_stake = highest_rounds
240 .iter()
241 .zip(committee.authorities())
242 .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
243 .collect::<Vec<_>>();
244 rounds_with_stake.sort();
245
246 let mut total_stake = 0;
250 let mut low = 0;
251 for (round, stake) in rounds_with_stake.iter().rev() {
252 total_stake += stake;
253 if total_stake >= committee.quorum_threshold() {
254 low = *round;
255 break;
256 }
257 }
258
259 let mut total_stake = 0;
260 let mut high = 0;
261 for (round, stake) in rounds_with_stake.iter() {
262 total_stake += stake;
263 if total_stake >= committee.quorum_threshold() {
264 high = *round;
265 break;
266 }
267 }
268
269 (low, high)
270}
271
272#[cfg(test)]
273mod test {
274 use std::sync::Arc;
275
276 use consensus_config::AuthorityIndex;
277 use consensus_types::block::{BlockDigest, BlockRef};
278
279 use crate::{
280 TestBlock, VerifiedBlock,
281 block::ExtendedBlock,
282 context::Context,
283 round_tracker::{PeerRoundTracker, compute_quorum_round},
284 };
285
286 #[tokio::test]
287 async fn test_compute_quorum_round() {
288 let (context, _) = Context::new_for_test(4);
289
290 let highest_received_rounds = vec![
292 vec![10, 11, 12, 13],
293 vec![5, 2, 7, 4],
294 vec![0, 0, 0, 0],
295 vec![3, 4, 5, 6],
296 ];
297
298 let round = compute_quorum_round(
299 &context.committee,
300 AuthorityIndex::new_for_test(0),
301 &highest_received_rounds,
302 );
303 assert_eq!(round, (3, 5));
304
305 let round = compute_quorum_round(
306 &context.committee,
307 AuthorityIndex::new_for_test(1),
308 &highest_received_rounds,
309 );
310 assert_eq!(round, (2, 4));
311
312 let round = compute_quorum_round(
313 &context.committee,
314 AuthorityIndex::new_for_test(2),
315 &highest_received_rounds,
316 );
317 assert_eq!(round, (5, 7));
318
319 let round = compute_quorum_round(
320 &context.committee,
321 AuthorityIndex::new_for_test(3),
322 &highest_received_rounds,
323 );
324 assert_eq!(round, (4, 6));
325 }
326
327 #[tokio::test]
328 async fn test_compute_received_quorum_round() {
329 telemetry_subscribers::init_for_testing();
330 let (context, _) = Context::new_for_test(4);
331 let context = Arc::new(context);
332 let mut round_tracker = PeerRoundTracker::new(context);
333
334 let highest_received_rounds = vec![
336 vec![10, 11, 12, 13],
337 vec![5, 2, 7, 4],
338 vec![0, 0, 0, 0],
339 vec![3, 4, 5, 6],
340 ];
341
342 let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
343
344 round_tracker.update_from_probe(vec![], highest_received_rounds);
345
346 let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
347
348 assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
349 }
350
351 #[tokio::test]
352 async fn test_compute_accepted_quorum_round() {
353 const NUM_AUTHORITIES: usize = 4;
354 let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
355 let context = Arc::new(context);
356 let own_index = context.own_index.value() as u32;
357 let mut round_tracker = PeerRoundTracker::new(context);
358
359 let highest_accepted_rounds = vec![
361 vec![10, 11, 12, 13],
362 vec![5, 2, 7, 4],
363 vec![0, 0, 0, 0],
364 vec![3, 4, 5, 6],
365 ];
366
367 round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
368
369 let test_block = TestBlock::new(7, 2)
371 .set_ancestors(vec![BlockRef::new(
372 6,
373 AuthorityIndex::new_for_test(3),
374 BlockDigest::MIN,
375 )])
376 .build();
377 let block = VerifiedBlock::new_for_test(test_block);
378 round_tracker.update_from_verified_block(&ExtendedBlock {
379 block,
380 excluded_ancestors: vec![BlockRef::new(
381 8,
382 AuthorityIndex::new_for_test(1),
383 BlockDigest::MIN,
384 )],
385 });
386
387 let test_block = TestBlock::new(11, own_index)
391 .set_ancestors(vec![
392 BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
393 BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
394 ])
395 .build();
396 let block = VerifiedBlock::new_for_test(test_block);
397 round_tracker.update_from_verified_block(&ExtendedBlock {
398 block,
399 excluded_ancestors: vec![BlockRef::new(
400 8,
401 AuthorityIndex::new_for_test(1),
402 BlockDigest::MIN,
403 )],
404 });
405
406 let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
414 let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
415
416 assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
417 }
418
419 #[tokio::test]
420 async fn test_quorum_round_manager() {
421 const NUM_AUTHORITIES: usize = 7;
422 let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
423
424 let highest_received_rounds = vec![
425 vec![110, 120, 130, 140, 150, 160, 170],
426 vec![109, 121, 131, 0, 151, 161, 171],
427 vec![101, 0, 103, 104, 105, 166, 107],
428 vec![0, 0, 0, 0, 0, 0, 0],
429 vec![100, 102, 133, 0, 155, 106, 177],
430 vec![105, 115, 103, 0, 125, 126, 127],
431 vec![0, 0, 0, 0, 0, 0, 0],
432 ];
433
434 let highest_accepted_rounds = vec![
435 vec![110, 120, 130, 140, 150, 160, 170],
436 vec![0, 121, 131, 0, 151, 161, 171],
437 vec![1, 0, 103, 104, 105, 166, 107],
438 vec![0, 0, 0, 0, 0, 0, 0],
439 vec![0, 102, 133, 0, 155, 106, 177],
440 vec![1, 115, 103, 0, 125, 126, 127],
441 vec![0, 0, 0, 0, 0, 0, 0],
442 ];
443
444 let mut round_tracker = PeerRoundTracker::new(context.clone());
445
446 round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
447
448 for authority in 0..NUM_AUTHORITIES {
450 let round = 110 + (authority as u32 * 10);
451 let block =
452 VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
453 round_tracker.update_from_verified_block(&ExtendedBlock {
454 block,
455 excluded_ancestors: vec![],
456 });
457 }
458
459 let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
470 let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
471 assert_eq!(
472 received_quorum_rounds,
473 vec![
474 (100, 105),
475 (0, 115),
476 (103, 130),
477 (0, 0),
478 (105, 150),
479 (106, 160),
480 (107, 170)
481 ]
482 );
483
484 assert_eq!(
495 accepted_quorum_rounds,
496 vec![
497 (0, 1),
498 (0, 115),
499 (103, 130),
500 (0, 104),
501 (105, 150),
502 (106, 160),
503 (127, 170)
504 ]
505 );
506
507 let propagation_delay = round_tracker.calculate_propagation_delay(110);
508
509 assert_eq!(propagation_delay, 10);
511 }
512}