1use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19 block::{BlockAPI, ExtendedBlock},
20 context::Context,
21};
22
23pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct PeerRoundTracker {
41 context: Arc<Context>,
42 block_accepted_rounds: Vec<Vec<Round>>,
44 probed_accepted_rounds: Vec<Vec<Round>>,
46 probed_received_rounds: Vec<Vec<Round>>,
48}
49
50impl PeerRoundTracker {
51 pub(crate) fn new(context: Arc<Context>) -> Self {
52 let size = context.committee.size();
53 Self {
54 context,
55 block_accepted_rounds: vec![vec![0; size]; size],
56 probed_accepted_rounds: vec![vec![0; size]; size],
57 probed_received_rounds: vec![vec![0; size]; size],
58 }
59 }
60
61 pub(crate) fn update_from_accepted_block(&mut self, extended_block: &ExtendedBlock) {
64 let block = &extended_block.block;
65 let excluded_ancestors = &extended_block.excluded_ancestors;
66 let author = block.author();
67
68 self.block_accepted_rounds[author][author] =
70 self.block_accepted_rounds[author][author].max(block.round());
71
72 for ancestor in block.ancestors() {
74 self.block_accepted_rounds[author][ancestor.author] =
75 self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
76 }
77
78 for excluded_ancestor in excluded_ancestors {
80 self.block_accepted_rounds[author][excluded_ancestor.author] = self
81 .block_accepted_rounds[author][excluded_ancestor.author]
82 .max(excluded_ancestor.round);
83 }
84 }
85
86 pub(crate) fn update_from_probe(
88 &mut self,
89 accepted_rounds: Vec<Vec<Round>>,
90 received_rounds: Vec<Vec<Round>>,
91 ) {
92 self.probed_accepted_rounds = accepted_rounds;
93 self.probed_received_rounds = received_rounds;
94 }
95
96 pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
98 let own_index = self.context.own_index;
99 let node_metrics = &self.context.metrics.node_metrics;
100 let received_quorum_rounds = self.compute_received_quorum_rounds();
101 let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
102 for ((low, high), (_, authority)) in received_quorum_rounds
103 .iter()
104 .zip(self.context.committee.authorities())
105 {
106 node_metrics
107 .round_tracker_received_quorum_round_gaps
108 .with_label_values(&[&authority.hostname])
109 .set((high - low) as i64);
110 node_metrics
111 .round_tracker_low_received_quorum_round
112 .with_label_values(&[&authority.hostname])
113 .set(*low as i64);
114 node_metrics
116 .round_tracker_current_received_round_gaps
117 .with_label_values(&[&authority.hostname])
118 .set(last_proposed_round as i64 - *low as i64);
119 }
120
121 for ((low, high), (_, authority)) in accepted_quorum_rounds
122 .iter()
123 .zip(self.context.committee.authorities())
124 {
125 node_metrics
126 .round_tracker_accepted_quorum_round_gaps
127 .with_label_values(&[&authority.hostname])
128 .set((high - low) as i64);
129 node_metrics
130 .round_tracker_low_accepted_quorum_round
131 .with_label_values(&[&authority.hostname])
132 .set(*low as i64);
133 node_metrics
135 .round_tracker_current_accepted_round_gaps
136 .with_label_values(&[&authority.hostname])
137 .set(last_proposed_round as i64 - *low as i64);
138 }
139
140 let propagation_delay = last_proposed_round
155 .saturating_sub(received_quorum_rounds[own_index].0)
156 .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
157
158 node_metrics
159 .round_tracker_propagation_delays
160 .observe(propagation_delay as f64);
161 node_metrics
162 .round_tracker_last_propagation_delay
163 .set(propagation_delay as i64);
164
165 debug!(
166 "Computed propagation delay of {propagation_delay} based on last proposed \
167 round ({last_proposed_round})."
168 );
169
170 propagation_delay
171 }
172
173 pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
174 let highest_accepted_rounds = self
175 .probed_accepted_rounds
176 .iter()
177 .zip(self.block_accepted_rounds.iter())
178 .map(|(probed_rounds, block_rounds)| {
179 probed_rounds
180 .iter()
181 .zip(block_rounds.iter())
182 .map(|(probed_round, block_round)| *probed_round.max(block_round))
183 .collect::<Vec<Round>>()
184 })
185 .collect::<Vec<Vec<Round>>>();
186 let accepted_quorum_rounds = self
187 .context
188 .committee
189 .authorities()
190 .map(|(peer, _)| {
191 compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
192 })
193 .collect::<Vec<_>>();
194
195 trace!(
196 "Computed accepted quorum round per authority: {}",
197 self.context
198 .committee
199 .authorities()
200 .zip(accepted_quorum_rounds.iter())
201 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
202 .join(", ")
203 );
204
205 accepted_quorum_rounds
206 }
207
208 fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
209 let received_quorum_rounds = self
210 .context
211 .committee
212 .authorities()
213 .map(|(peer, _)| {
214 compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
215 })
216 .collect::<Vec<_>>();
217
218 trace!(
219 "Computed received quorum round per authority: {}",
220 self.context
221 .committee
222 .authorities()
223 .zip(received_quorum_rounds.iter())
224 .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
225 .join(", ")
226 );
227
228 received_quorum_rounds
229 }
230}
231
232fn compute_quorum_round(
234 committee: &Committee,
235 target_index: AuthorityIndex,
236 highest_rounds: &[Vec<Round>],
237) -> QuorumRound {
238 let mut rounds_with_stake = highest_rounds
239 .iter()
240 .zip(committee.authorities())
241 .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
242 .collect::<Vec<_>>();
243 rounds_with_stake.sort();
244
245 let mut total_stake = 0;
249 let mut low = 0;
250 for (round, stake) in rounds_with_stake.iter().rev() {
251 total_stake += stake;
252 if total_stake >= committee.quorum_threshold() {
253 low = *round;
254 break;
255 }
256 }
257
258 let mut total_stake = 0;
259 let mut high = 0;
260 for (round, stake) in rounds_with_stake.iter() {
261 total_stake += stake;
262 if total_stake >= committee.quorum_threshold() {
263 high = *round;
264 break;
265 }
266 }
267
268 (low, high)
269}
270
271#[cfg(test)]
272mod test {
273 use std::sync::Arc;
274
275 use consensus_config::AuthorityIndex;
276 use consensus_types::block::{BlockDigest, BlockRef};
277
278 use crate::{
279 TestBlock, VerifiedBlock,
280 block::ExtendedBlock,
281 context::Context,
282 round_tracker::{PeerRoundTracker, compute_quorum_round},
283 };
284
285 #[tokio::test]
286 async fn test_compute_quorum_round() {
287 let (context, _) = Context::new_for_test(4);
288
289 let highest_received_rounds = vec![
291 vec![10, 11, 12, 13],
292 vec![5, 2, 7, 4],
293 vec![0, 0, 0, 0],
294 vec![3, 4, 5, 6],
295 ];
296
297 let round = compute_quorum_round(
298 &context.committee,
299 AuthorityIndex::new_for_test(0),
300 &highest_received_rounds,
301 );
302 assert_eq!(round, (3, 5));
303
304 let round = compute_quorum_round(
305 &context.committee,
306 AuthorityIndex::new_for_test(1),
307 &highest_received_rounds,
308 );
309 assert_eq!(round, (2, 4));
310
311 let round = compute_quorum_round(
312 &context.committee,
313 AuthorityIndex::new_for_test(2),
314 &highest_received_rounds,
315 );
316 assert_eq!(round, (5, 7));
317
318 let round = compute_quorum_round(
319 &context.committee,
320 AuthorityIndex::new_for_test(3),
321 &highest_received_rounds,
322 );
323 assert_eq!(round, (4, 6));
324 }
325
326 #[tokio::test]
327 async fn test_compute_received_quorum_round() {
328 telemetry_subscribers::init_for_testing();
329 let (context, _) = Context::new_for_test(4);
330 let context = Arc::new(context);
331 let mut round_tracker = PeerRoundTracker::new(context);
332
333 let highest_received_rounds = vec![
335 vec![10, 11, 12, 13],
336 vec![5, 2, 7, 4],
337 vec![0, 0, 0, 0],
338 vec![3, 4, 5, 6],
339 ];
340
341 let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
342
343 round_tracker.update_from_probe(vec![], highest_received_rounds);
344
345 let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
346
347 assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
348 }
349
350 #[tokio::test]
351 async fn test_compute_accepted_quorum_round() {
352 const NUM_AUTHORITIES: usize = 4;
353 let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
354 let context = Arc::new(context);
355 let own_index = context.own_index.value() as u32;
356 let mut round_tracker = PeerRoundTracker::new(context);
357
358 let highest_accepted_rounds = vec![
360 vec![10, 11, 12, 13],
361 vec![5, 2, 7, 4],
362 vec![0, 0, 0, 0],
363 vec![3, 4, 5, 6],
364 ];
365
366 round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
367
368 let test_block = TestBlock::new(7, 2)
370 .set_ancestors(vec![BlockRef::new(
371 6,
372 AuthorityIndex::new_for_test(3),
373 BlockDigest::MIN,
374 )])
375 .build();
376 let block = VerifiedBlock::new_for_test(test_block);
377 round_tracker.update_from_accepted_block(&ExtendedBlock {
378 block,
379 excluded_ancestors: vec![BlockRef::new(
380 8,
381 AuthorityIndex::new_for_test(1),
382 BlockDigest::MIN,
383 )],
384 });
385
386 let test_block = TestBlock::new(11, own_index)
390 .set_ancestors(vec![
391 BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
392 BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
393 ])
394 .build();
395 let block = VerifiedBlock::new_for_test(test_block);
396 round_tracker.update_from_accepted_block(&ExtendedBlock {
397 block,
398 excluded_ancestors: vec![BlockRef::new(
399 8,
400 AuthorityIndex::new_for_test(1),
401 BlockDigest::MIN,
402 )],
403 });
404
405 let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
413 let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
414
415 assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
416 }
417
418 #[tokio::test]
419 async fn test_quorum_round_manager() {
420 const NUM_AUTHORITIES: usize = 7;
421 let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
422
423 let highest_received_rounds = vec![
424 vec![110, 120, 130, 140, 150, 160, 170],
425 vec![109, 121, 131, 0, 151, 161, 171],
426 vec![101, 0, 103, 104, 105, 166, 107],
427 vec![0, 0, 0, 0, 0, 0, 0],
428 vec![100, 102, 133, 0, 155, 106, 177],
429 vec![105, 115, 103, 0, 125, 126, 127],
430 vec![0, 0, 0, 0, 0, 0, 0],
431 ];
432
433 let highest_accepted_rounds = vec![
434 vec![110, 120, 130, 140, 150, 160, 170],
435 vec![0, 121, 131, 0, 151, 161, 171],
436 vec![1, 0, 103, 104, 105, 166, 107],
437 vec![0, 0, 0, 0, 0, 0, 0],
438 vec![0, 102, 133, 0, 155, 106, 177],
439 vec![1, 115, 103, 0, 125, 126, 127],
440 vec![0, 0, 0, 0, 0, 0, 0],
441 ];
442
443 let mut round_tracker = PeerRoundTracker::new(context.clone());
444
445 round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
446
447 for authority in 0..NUM_AUTHORITIES {
449 let round = 110 + (authority as u32 * 10);
450 let block =
451 VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
452 round_tracker.update_from_accepted_block(&ExtendedBlock {
453 block,
454 excluded_ancestors: vec![],
455 });
456 }
457
458 let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
469 let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
470 assert_eq!(
471 received_quorum_rounds,
472 vec![
473 (100, 105),
474 (0, 115),
475 (103, 130),
476 (0, 0),
477 (105, 150),
478 (106, 160),
479 (107, 170)
480 ]
481 );
482
483 assert_eq!(
494 accepted_quorum_rounds,
495 vec![
496 (0, 1),
497 (0, 115),
498 (103, 130),
499 (0, 104),
500 (105, 150),
501 (106, 160),
502 (127, 170)
503 ]
504 );
505
506 let propagation_delay = round_tracker.calculate_propagation_delay(110);
507
508 assert_eq!(propagation_delay, 10);
510 }
511}