consensus_core/
round_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RoundTracker computes quorum rounds for the latest received and accepted rounds.
5//! This round data is gathered from peers via RoundProber or via new Blocks received. Also
6//! local accepted rounds are updated from new blocks proposed from this authority.
7//!
8//! Quorum rounds provides insight into how effectively each authority's blocks are propagated
9//! and accepted across the network.
10
11use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19    block::{BlockAPI, ExtendedBlock},
20    context::Context,
21};
22
23/// A [`QuorumRound`] is a round range [low, high]. It is computed from
24/// highest received or accepted rounds of an authority reported by all
25/// authorities.
26/// The bounds represent:
27/// - the highest round lower or equal to rounds from a quorum (low)
28/// - the lowest round higher or equal to rounds from a quorum (high)
29///
30/// [`QuorumRound`] is useful because:
31/// - [low, high] range is BFT, always between the lowest and highest rounds
32///   of honest validators, with < validity threshold of malicious stake.
33/// - It provides signals about how well blocks from an authority propagates
34///   in the network. If low bound for an authority is lower than its last
35///   proposed round, the last proposed block has not propagated to a quorum.
36///   If a new block is proposed from the authority, it will not get accepted
37///   immediately by a quorum.
38pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct RoundTracker {
41    context: Arc<Context>,
42
43    /// -------- Peer round tracking --------
44    /// Highest accepted round per authority from received blocks (included/excluded ancestors)
45    block_accepted_rounds: Vec<Vec<Round>>,
46    /// Highest accepted round per authority from round prober
47    probed_accepted_rounds: Vec<Vec<Round>>,
48    /// Highest received round per authority from round prober
49    probed_received_rounds: Vec<Vec<Round>>,
50
51    /// -------- Own round tracking --------
52    /// Highest received round per authority tracked locally.
53    /// Updated after blocks are received and verified.
54    local_highest_received_rounds: Vec<Round>,
55}
56
57impl RoundTracker {
58    pub(crate) fn new(context: Arc<Context>, initial_received_rounds: Vec<Round>) -> Self {
59        let size = context.committee.size();
60        let local_highest_received_rounds = if initial_received_rounds.is_empty() {
61            vec![0; size]
62        } else {
63            assert_eq!(
64                initial_received_rounds.len(),
65                size,
66                "initial_received_rounds must be empty or have the same size as the committee"
67            );
68            initial_received_rounds
69        };
70        Self {
71            context,
72            block_accepted_rounds: vec![vec![0; size]; size],
73            probed_accepted_rounds: vec![vec![0; size]; size],
74            probed_received_rounds: vec![vec![0; size]; size],
75            local_highest_received_rounds,
76        }
77    }
78
79    /// Update accepted rounds based on a new block created locally or received from the network
80    /// and its excluded ancestors.
81    /// Assumes the block and the excluded ancestors have been verified for validity.
82    /// Also updates the highest received round for the block's author.
83    pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
84        let block = &extended_block.block;
85        let excluded_ancestors = &extended_block.excluded_ancestors;
86        let author = block.author();
87
88        // Update highest received round for this author
89        self.local_highest_received_rounds[author] =
90            self.local_highest_received_rounds[author].max(block.round());
91
92        // Update author accepted round from block round
93        self.block_accepted_rounds[author][author] =
94            self.block_accepted_rounds[author][author].max(block.round());
95
96        // Update accepted rounds from included ancestors
97        for ancestor in block.ancestors() {
98            self.block_accepted_rounds[author][ancestor.author] =
99                self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
100        }
101
102        // Update accepted rounds from excluded ancestors
103        for excluded_ancestor in excluded_ancestors {
104            self.block_accepted_rounds[author][excluded_ancestor.author] = self
105                .block_accepted_rounds[author][excluded_ancestor.author]
106                .max(excluded_ancestor.round);
107        }
108    }
109
110    /// Update accepted & received rounds based on probing results
111    pub(crate) fn update_from_probe(
112        &mut self,
113        accepted_rounds: Vec<Vec<Round>>,
114        received_rounds: Vec<Vec<Round>>,
115    ) {
116        self.probed_accepted_rounds = accepted_rounds;
117        self.probed_received_rounds = received_rounds;
118    }
119
120    /// Returns the highest received round per authority tracked locally.
121    pub(crate) fn local_highest_received_rounds(&self) -> Vec<Round> {
122        self.local_highest_received_rounds.clone()
123    }
124
125    // Returns the propagation delay of own blocks.
126    pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
127        let own_index = self.context.own_index;
128        let node_metrics = &self.context.metrics.node_metrics;
129        let received_quorum_rounds = self.compute_received_quorum_rounds();
130        let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
131        for ((low, high), (_, authority)) in received_quorum_rounds
132            .iter()
133            .zip(self.context.committee.authorities())
134        {
135            node_metrics
136                .round_tracker_received_quorum_round_gaps
137                .with_label_values(&[&authority.hostname])
138                .set((high - low) as i64);
139            node_metrics
140                .round_tracker_low_received_quorum_round
141                .with_label_values(&[&authority.hostname])
142                .set(*low as i64);
143            // The gap can be negative if this validator is lagging behind the network.
144            node_metrics
145                .round_tracker_current_received_round_gaps
146                .with_label_values(&[&authority.hostname])
147                .set(last_proposed_round as i64 - *low as i64);
148        }
149
150        for ((low, high), (_, authority)) in accepted_quorum_rounds
151            .iter()
152            .zip(self.context.committee.authorities())
153        {
154            node_metrics
155                .round_tracker_accepted_quorum_round_gaps
156                .with_label_values(&[&authority.hostname])
157                .set((high - low) as i64);
158            node_metrics
159                .round_tracker_low_accepted_quorum_round
160                .with_label_values(&[&authority.hostname])
161                .set(*low as i64);
162            // The gap can be negative if this validator is lagging behind the network.
163            node_metrics
164                .round_tracker_current_accepted_round_gaps
165                .with_label_values(&[&authority.hostname])
166                .set(last_proposed_round as i64 - *low as i64);
167        }
168
169        // TODO: consider using own quorum round gap to control proposing in addition to
170        // propagation delay. For now they seem to be about the same.
171
172        // It is possible more blocks arrive at a quorum of peers before the get_latest_rounds
173        // requests arrive.
174        // Using the lower bound to increase sensitivity about block propagation issues
175        // that can reduce round rate.
176        // Because of the nature of TCP and block streaming, propagation delay is expected to be
177        // 0 in most cases, even when the actual latency of broadcasting blocks is high.
178        // We will use the min propagation delay from either accepted or received rounds.
179        // As stated above new blocks can arrive after the rounds have been probed, so its
180        // likely accepted rounds from new blocks will provide us with the more accurate
181        // propagation delay which is important because we now calculate the propagation
182        // delay more frequently then before.
183        let propagation_delay = last_proposed_round
184            .saturating_sub(received_quorum_rounds[own_index].0)
185            .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
186
187        node_metrics
188            .round_tracker_propagation_delays
189            .observe(propagation_delay as f64);
190        node_metrics
191            .round_tracker_last_propagation_delay
192            .set(propagation_delay as i64);
193
194        debug!(
195            "Computed propagation delay of {propagation_delay} based on last proposed \
196                round ({last_proposed_round})."
197        );
198
199        propagation_delay
200    }
201
202    pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
203        let highest_accepted_rounds = self
204            .probed_accepted_rounds
205            .iter()
206            .zip(self.block_accepted_rounds.iter())
207            .map(|(probed_rounds, block_rounds)| {
208                probed_rounds
209                    .iter()
210                    .zip(block_rounds.iter())
211                    .map(|(probed_round, block_round)| *probed_round.max(block_round))
212                    .collect::<Vec<Round>>()
213            })
214            .collect::<Vec<Vec<Round>>>();
215        let accepted_quorum_rounds = self
216            .context
217            .committee
218            .authorities()
219            .map(|(peer, _)| {
220                compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
221            })
222            .collect::<Vec<_>>();
223
224        trace!(
225            "Computed accepted quorum round per authority: {}",
226            self.context
227                .committee
228                .authorities()
229                .zip(accepted_quorum_rounds.iter())
230                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
231                .join(", ")
232        );
233
234        accepted_quorum_rounds
235    }
236
237    fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
238        let received_quorum_rounds = self
239            .context
240            .committee
241            .authorities()
242            .map(|(peer, _)| {
243                compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
244            })
245            .collect::<Vec<_>>();
246
247        trace!(
248            "Computed received quorum round per authority: {}",
249            self.context
250                .committee
251                .authorities()
252                .zip(received_quorum_rounds.iter())
253                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
254                .join(", ")
255        );
256
257        received_quorum_rounds
258    }
259}
260
261/// For the peer specified with target_index, compute and return its [`QuorumRound`].
262fn compute_quorum_round(
263    committee: &Committee,
264    target_index: AuthorityIndex,
265    highest_rounds: &[Vec<Round>],
266) -> QuorumRound {
267    let mut rounds_with_stake = highest_rounds
268        .iter()
269        .zip(committee.authorities())
270        .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
271        .collect::<Vec<_>>();
272    rounds_with_stake.sort();
273
274    // Forward iteration and stopping at validity threshold would produce the same result currently,
275    // with fault tolerance of f/3f+1 votes. But it is not semantically correct, and will provide an
276    // incorrect value when fault tolerance and validity threshold are different.
277    let mut total_stake = 0;
278    let mut low = 0;
279    for (round, stake) in rounds_with_stake.iter().rev() {
280        total_stake += stake;
281        if total_stake >= committee.quorum_threshold() {
282            low = *round;
283            break;
284        }
285    }
286
287    let mut total_stake = 0;
288    let mut high = 0;
289    for (round, stake) in rounds_with_stake.iter() {
290        total_stake += stake;
291        if total_stake >= committee.quorum_threshold() {
292            high = *round;
293            break;
294        }
295    }
296
297    (low, high)
298}
299
300#[cfg(test)]
301mod test {
302    use std::sync::Arc;
303
304    use consensus_config::AuthorityIndex;
305    use consensus_types::block::{BlockDigest, BlockRef};
306
307    use crate::{
308        TestBlock, VerifiedBlock,
309        block::ExtendedBlock,
310        context::Context,
311        round_tracker::{RoundTracker, compute_quorum_round},
312    };
313
314    #[tokio::test]
315    async fn test_compute_quorum_round() {
316        let (context, _) = Context::new_for_test(4);
317
318        // Observe latest rounds from peers.
319        let highest_received_rounds = vec![
320            vec![10, 11, 12, 13],
321            vec![5, 2, 7, 4],
322            vec![0, 0, 0, 0],
323            vec![3, 4, 5, 6],
324        ];
325
326        let round = compute_quorum_round(
327            &context.committee,
328            AuthorityIndex::new_for_test(0),
329            &highest_received_rounds,
330        );
331        assert_eq!(round, (3, 5));
332
333        let round = compute_quorum_round(
334            &context.committee,
335            AuthorityIndex::new_for_test(1),
336            &highest_received_rounds,
337        );
338        assert_eq!(round, (2, 4));
339
340        let round = compute_quorum_round(
341            &context.committee,
342            AuthorityIndex::new_for_test(2),
343            &highest_received_rounds,
344        );
345        assert_eq!(round, (5, 7));
346
347        let round = compute_quorum_round(
348            &context.committee,
349            AuthorityIndex::new_for_test(3),
350            &highest_received_rounds,
351        );
352        assert_eq!(round, (4, 6));
353    }
354
355    #[tokio::test]
356    async fn test_compute_received_quorum_round() {
357        telemetry_subscribers::init_for_testing();
358        let (context, _) = Context::new_for_test(4);
359        let context = Arc::new(context);
360        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
361
362        // Observe latest rounds from peers.
363        let highest_received_rounds = vec![
364            vec![10, 11, 12, 13],
365            vec![5, 2, 7, 4],
366            vec![0, 0, 0, 0],
367            vec![3, 4, 5, 6],
368        ];
369
370        let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
371
372        round_tracker.update_from_probe(vec![], highest_received_rounds);
373
374        let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
375
376        assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
377    }
378
379    #[tokio::test]
380    async fn test_compute_accepted_quorum_round() {
381        const NUM_AUTHORITIES: usize = 4;
382        let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
383        let context = Arc::new(context);
384        let own_index = context.own_index.value() as u32;
385        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
386
387        // Observe latest rounds from peers.
388        let highest_accepted_rounds = vec![
389            vec![10, 11, 12, 13],
390            vec![5, 2, 7, 4],
391            vec![0, 0, 0, 0],
392            vec![3, 4, 5, 6],
393        ];
394
395        round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
396
397        // Simulate accepting a block from authority 3
398        let test_block = TestBlock::new(7, 2)
399            .set_ancestors(vec![BlockRef::new(
400                6,
401                AuthorityIndex::new_for_test(3),
402                BlockDigest::MIN,
403            )])
404            .build();
405        let block = VerifiedBlock::new_for_test(test_block);
406        round_tracker.update_from_verified_block(&ExtendedBlock {
407            block,
408            excluded_ancestors: vec![BlockRef::new(
409                8,
410                AuthorityIndex::new_for_test(1),
411                BlockDigest::MIN,
412            )],
413        });
414
415        // Simulate proposing a new block
416        // note: not valid rounds, but tests that the max value will always be
417        // considered in calculations.
418        let test_block = TestBlock::new(11, own_index)
419            .set_ancestors(vec![
420                BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
421                BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
422            ])
423            .build();
424        let block = VerifiedBlock::new_for_test(test_block);
425        round_tracker.update_from_verified_block(&ExtendedBlock {
426            block,
427            excluded_ancestors: vec![BlockRef::new(
428                8,
429                AuthorityIndex::new_for_test(1),
430                BlockDigest::MIN,
431            )],
432        });
433
434        // Compute quorum rounds based on highest accepted rounds (max from prober
435        // or from blocks):
436        // 11, 11, 12, 13
437        //  5,  2,  7,  4
438        //  0,  8,  7,  6
439        //  3,  4,  5,  6
440
441        let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
442        let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
443
444        assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
445    }
446
447    #[tokio::test]
448    async fn test_quorum_round_manager() {
449        const NUM_AUTHORITIES: usize = 7;
450        let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
451
452        let highest_received_rounds = vec![
453            vec![110, 120, 130, 140, 150, 160, 170],
454            vec![109, 121, 131, 0, 151, 161, 171],
455            vec![101, 0, 103, 104, 105, 166, 107],
456            vec![0, 0, 0, 0, 0, 0, 0],
457            vec![100, 102, 133, 0, 155, 106, 177],
458            vec![105, 115, 103, 0, 125, 126, 127],
459            vec![0, 0, 0, 0, 0, 0, 0],
460        ];
461
462        let highest_accepted_rounds = vec![
463            vec![110, 120, 130, 140, 150, 160, 170],
464            vec![0, 121, 131, 0, 151, 161, 171],
465            vec![1, 0, 103, 104, 105, 166, 107],
466            vec![0, 0, 0, 0, 0, 0, 0],
467            vec![0, 102, 133, 0, 155, 106, 177],
468            vec![1, 115, 103, 0, 125, 126, 127],
469            vec![0, 0, 0, 0, 0, 0, 0],
470        ];
471
472        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
473
474        round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
475
476        // Create test blocks for each authority with incrementing rounds starting at 110
477        for authority in 0..NUM_AUTHORITIES {
478            let round = 110 + (authority as u32 * 10);
479            let block =
480                VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
481            round_tracker.update_from_verified_block(&ExtendedBlock {
482                block,
483                excluded_ancestors: vec![],
484            });
485        }
486
487        // Compute quorum rounds and propagation delay based on last proposed round = 110,
488        // and highest received rounds:
489        // 110, 120, 130, 140, 150, 160, 170,
490        // 109, 121, 131, 0,   151, 161, 171,
491        // 101, 0,   103, 104, 105, 166, 107,
492        // 0,   0,   0,   0,   0,   0,   0,
493        // 100, 102, 133, 0,   155, 106, 177,
494        // 105, 115, 103, 0,   125, 126, 127,
495        // 0,   0,   0,   0,   0,   0,   0,
496
497        let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
498        let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
499        assert_eq!(
500            received_quorum_rounds,
501            vec![
502                (100, 105),
503                (0, 115),
504                (103, 130),
505                (0, 0),
506                (105, 150),
507                (106, 160),
508                (107, 170)
509            ]
510        );
511
512        // Compute quorum rounds based on highest accepted rounds (max from prober
513        // or from blocks):
514        // 110, 120, 130, 140, 150, 160, 170,
515        //   0, 121, 131,   0, 151, 161, 171,
516        //   1,   0, 130, 104, 105, 166, 107,
517        //   0,   0,   0, 140,   0,   0,   0,
518        //   0, 102, 133,   0, 155, 106, 177,
519        //   1, 115, 103,   0, 125, 160, 127,
520        //   0,   0,   0,   0,   0,   0, 170,
521
522        assert_eq!(
523            accepted_quorum_rounds,
524            vec![
525                (0, 1),
526                (0, 115),
527                (103, 130),
528                (0, 104),
529                (105, 150),
530                (106, 160),
531                (127, 170)
532            ]
533        );
534
535        let propagation_delay = round_tracker.calculate_propagation_delay(110);
536
537        // 110 - 100 = 10
538        assert_eq!(propagation_delay, 10);
539    }
540}