consensus_core/
round_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RoundTracker computes quorum rounds for the latest received and accepted rounds.
5//! This round data is gathered from peers via RoundProber or via new Blocks received. Also
6//! local accepted rounds are updated from new blocks proposed from this authority.
7//!
8//! Quorum rounds provides insight into how effectively each authority's blocks are propagated
9//! and accepted across the network.
10
11use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use mysten_common::ZipDebugEqIteratorExt;
17use tracing::{debug, trace};
18
19use crate::{
20    block::{BlockAPI, ExtendedBlock},
21    context::Context,
22};
23
24/// A [`QuorumRound`] is a round range [low, high]. It is computed from
25/// highest received or accepted rounds of an authority reported by all
26/// authorities.
27/// The bounds represent:
28/// - the highest round lower or equal to rounds from a quorum (low)
29/// - the lowest round higher or equal to rounds from a quorum (high)
30///
31/// [`QuorumRound`] is useful because:
32/// - [low, high] range is BFT, always between the lowest and highest rounds
33///   of honest validators, with < validity threshold of malicious stake.
34/// - It provides signals about how well blocks from an authority propagates
35///   in the network. If low bound for an authority is lower than its last
36///   proposed round, the last proposed block has not propagated to a quorum.
37///   If a new block is proposed from the authority, it will not get accepted
38///   immediately by a quorum.
39pub(crate) type QuorumRound = (Round, Round);
40
41pub(crate) struct RoundTracker {
42    context: Arc<Context>,
43
44    /// -------- Peer round tracking --------
45    /// Highest accepted round per authority from received blocks (included/excluded ancestors)
46    block_accepted_rounds: Vec<Vec<Round>>,
47    /// Highest accepted round per authority from round prober
48    probed_accepted_rounds: Vec<Vec<Round>>,
49    /// Highest received round per authority from round prober
50    probed_received_rounds: Vec<Vec<Round>>,
51
52    /// -------- Own round tracking --------
53    /// Highest received round per authority tracked locally.
54    /// Updated after blocks are received and verified.
55    local_highest_received_rounds: Vec<Round>,
56}
57
58impl RoundTracker {
59    pub(crate) fn new(context: Arc<Context>, initial_received_rounds: Vec<Round>) -> Self {
60        let size = context.committee.size();
61        let local_highest_received_rounds = if initial_received_rounds.is_empty() {
62            vec![0; size]
63        } else {
64            assert_eq!(
65                initial_received_rounds.len(),
66                size,
67                "initial_received_rounds must be empty or have the same size as the committee"
68            );
69            initial_received_rounds
70        };
71        Self {
72            context,
73            block_accepted_rounds: vec![vec![0; size]; size],
74            probed_accepted_rounds: vec![vec![0; size]; size],
75            probed_received_rounds: vec![vec![0; size]; size],
76            local_highest_received_rounds,
77        }
78    }
79
80    /// Update accepted rounds based on a new block created locally or received from the network
81    /// and its excluded ancestors.
82    /// Assumes the block and the excluded ancestors have been verified for validity.
83    /// Also updates the highest received round for the block's author.
84    pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
85        let block = &extended_block.block;
86        let excluded_ancestors = &extended_block.excluded_ancestors;
87        let author = block.author();
88
89        // Update highest received round for this author
90        self.local_highest_received_rounds[author] =
91            self.local_highest_received_rounds[author].max(block.round());
92
93        // Update author accepted round from block round
94        self.block_accepted_rounds[author][author] =
95            self.block_accepted_rounds[author][author].max(block.round());
96
97        // Update accepted rounds from included ancestors
98        for ancestor in block.ancestors() {
99            self.block_accepted_rounds[author][ancestor.author] =
100                self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
101        }
102
103        // Update accepted rounds from excluded ancestors
104        for excluded_ancestor in excluded_ancestors {
105            self.block_accepted_rounds[author][excluded_ancestor.author] = self
106                .block_accepted_rounds[author][excluded_ancestor.author]
107                .max(excluded_ancestor.round);
108        }
109    }
110
111    /// Update accepted & received rounds based on probing results
112    pub(crate) fn update_from_probe(
113        &mut self,
114        accepted_rounds: Vec<Vec<Round>>,
115        received_rounds: Vec<Vec<Round>>,
116    ) {
117        self.probed_accepted_rounds = accepted_rounds;
118        self.probed_received_rounds = received_rounds;
119    }
120
121    /// Returns the highest received round per authority tracked locally.
122    pub(crate) fn local_highest_received_rounds(&self) -> Vec<Round> {
123        self.local_highest_received_rounds.clone()
124    }
125
126    // Returns the propagation delay of own blocks.
127    pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
128        let own_index = self.context.own_index;
129        let node_metrics = &self.context.metrics.node_metrics;
130        let received_quorum_rounds = self.compute_received_quorum_rounds();
131        let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
132        for ((low, high), (_, authority)) in received_quorum_rounds
133            .iter()
134            .zip_debug_eq(self.context.committee.authorities())
135        {
136            node_metrics
137                .round_tracker_received_quorum_round_gaps
138                .with_label_values(&[&authority.hostname])
139                .set((high - low) as i64);
140            node_metrics
141                .round_tracker_low_received_quorum_round
142                .with_label_values(&[&authority.hostname])
143                .set(*low as i64);
144            // The gap can be negative if this validator is lagging behind the network.
145            node_metrics
146                .round_tracker_current_received_round_gaps
147                .with_label_values(&[&authority.hostname])
148                .set(last_proposed_round as i64 - *low as i64);
149        }
150
151        for ((low, high), (_, authority)) in accepted_quorum_rounds
152            .iter()
153            .zip_debug_eq(self.context.committee.authorities())
154        {
155            node_metrics
156                .round_tracker_accepted_quorum_round_gaps
157                .with_label_values(&[&authority.hostname])
158                .set((high - low) as i64);
159            node_metrics
160                .round_tracker_low_accepted_quorum_round
161                .with_label_values(&[&authority.hostname])
162                .set(*low as i64);
163            // The gap can be negative if this validator is lagging behind the network.
164            node_metrics
165                .round_tracker_current_accepted_round_gaps
166                .with_label_values(&[&authority.hostname])
167                .set(last_proposed_round as i64 - *low as i64);
168        }
169
170        // TODO: consider using own quorum round gap to control proposing in addition to
171        // propagation delay. For now they seem to be about the same.
172
173        // It is possible more blocks arrive at a quorum of peers before the get_latest_rounds
174        // requests arrive.
175        // Using the lower bound to increase sensitivity about block propagation issues
176        // that can reduce round rate.
177        // Because of the nature of TCP and block streaming, propagation delay is expected to be
178        // 0 in most cases, even when the actual latency of broadcasting blocks is high.
179        // We will use the min propagation delay from either accepted or received rounds.
180        // As stated above new blocks can arrive after the rounds have been probed, so its
181        // likely accepted rounds from new blocks will provide us with the more accurate
182        // propagation delay which is important because we now calculate the propagation
183        // delay more frequently then before.
184        let propagation_delay = last_proposed_round
185            .saturating_sub(received_quorum_rounds[own_index].0)
186            .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
187
188        node_metrics
189            .round_tracker_propagation_delays
190            .observe(propagation_delay as f64);
191        node_metrics
192            .round_tracker_last_propagation_delay
193            .set(propagation_delay as i64);
194
195        debug!(
196            "Computed propagation delay of {propagation_delay} based on last proposed \
197                round ({last_proposed_round})."
198        );
199
200        propagation_delay
201    }
202
203    pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
204        let highest_accepted_rounds = self
205            .probed_accepted_rounds
206            .iter()
207            .zip_debug_eq(self.block_accepted_rounds.iter())
208            .map(|(probed_rounds, block_rounds)| {
209                probed_rounds
210                    .iter()
211                    .zip_debug_eq(block_rounds.iter())
212                    .map(|(probed_round, block_round)| *probed_round.max(block_round))
213                    .collect::<Vec<Round>>()
214            })
215            .collect::<Vec<Vec<Round>>>();
216        let accepted_quorum_rounds = self
217            .context
218            .committee
219            .authorities()
220            .map(|(peer, _)| {
221                compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
222            })
223            .collect::<Vec<_>>();
224
225        trace!(
226            "Computed accepted quorum round per authority: {}",
227            self.context
228                .committee
229                .authorities()
230                .zip_debug_eq(accepted_quorum_rounds.iter())
231                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
232                .join(", ")
233        );
234
235        accepted_quorum_rounds
236    }
237
238    fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
239        let received_quorum_rounds = self
240            .context
241            .committee
242            .authorities()
243            .map(|(peer, _)| {
244                compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
245            })
246            .collect::<Vec<_>>();
247
248        trace!(
249            "Computed received quorum round per authority: {}",
250            self.context
251                .committee
252                .authorities()
253                .zip_debug_eq(received_quorum_rounds.iter())
254                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
255                .join(", ")
256        );
257
258        received_quorum_rounds
259    }
260}
261
262/// For the peer specified with target_index, compute and return its [`QuorumRound`].
263fn compute_quorum_round(
264    committee: &Committee,
265    target_index: AuthorityIndex,
266    highest_rounds: &[Vec<Round>],
267) -> QuorumRound {
268    let mut rounds_with_stake = highest_rounds
269        .iter()
270        .zip_debug_eq(committee.authorities())
271        .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
272        .collect::<Vec<_>>();
273    rounds_with_stake.sort();
274
275    // Forward iteration and stopping at validity threshold would produce the same result currently,
276    // with fault tolerance of f/3f+1 votes. But it is not semantically correct, and will provide an
277    // incorrect value when fault tolerance and validity threshold are different.
278    let mut total_stake = 0;
279    let mut low = 0;
280    for (round, stake) in rounds_with_stake.iter().rev() {
281        total_stake += stake;
282        if total_stake >= committee.quorum_threshold() {
283            low = *round;
284            break;
285        }
286    }
287
288    let mut total_stake = 0;
289    let mut high = 0;
290    for (round, stake) in rounds_with_stake.iter() {
291        total_stake += stake;
292        if total_stake >= committee.quorum_threshold() {
293            high = *round;
294            break;
295        }
296    }
297
298    (low, high)
299}
300
301#[cfg(test)]
302mod test {
303    use std::sync::Arc;
304
305    use consensus_config::AuthorityIndex;
306    use consensus_types::block::{BlockDigest, BlockRef};
307
308    use crate::{
309        TestBlock, VerifiedBlock,
310        block::ExtendedBlock,
311        context::Context,
312        round_tracker::{RoundTracker, compute_quorum_round},
313    };
314
315    #[tokio::test]
316    async fn test_compute_quorum_round() {
317        let (context, _) = Context::new_for_test(4);
318
319        // Observe latest rounds from peers.
320        let highest_received_rounds = vec![
321            vec![10, 11, 12, 13],
322            vec![5, 2, 7, 4],
323            vec![0, 0, 0, 0],
324            vec![3, 4, 5, 6],
325        ];
326
327        let round = compute_quorum_round(
328            &context.committee,
329            AuthorityIndex::new_for_test(0),
330            &highest_received_rounds,
331        );
332        assert_eq!(round, (3, 5));
333
334        let round = compute_quorum_round(
335            &context.committee,
336            AuthorityIndex::new_for_test(1),
337            &highest_received_rounds,
338        );
339        assert_eq!(round, (2, 4));
340
341        let round = compute_quorum_round(
342            &context.committee,
343            AuthorityIndex::new_for_test(2),
344            &highest_received_rounds,
345        );
346        assert_eq!(round, (5, 7));
347
348        let round = compute_quorum_round(
349            &context.committee,
350            AuthorityIndex::new_for_test(3),
351            &highest_received_rounds,
352        );
353        assert_eq!(round, (4, 6));
354    }
355
356    #[tokio::test]
357    async fn test_compute_received_quorum_round() {
358        telemetry_subscribers::init_for_testing();
359        let (context, _) = Context::new_for_test(4);
360        let context = Arc::new(context);
361        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
362
363        // Observe latest rounds from peers.
364        let highest_received_rounds = vec![
365            vec![10, 11, 12, 13],
366            vec![5, 2, 7, 4],
367            vec![0, 0, 0, 0],
368            vec![3, 4, 5, 6],
369        ];
370
371        let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
372
373        round_tracker.update_from_probe(vec![], highest_received_rounds);
374
375        let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
376
377        assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
378    }
379
380    #[tokio::test]
381    async fn test_compute_accepted_quorum_round() {
382        const NUM_AUTHORITIES: usize = 4;
383        let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
384        let context = Arc::new(context);
385        let own_index = context.own_index.value() as u32;
386        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
387
388        // Observe latest rounds from peers.
389        let highest_accepted_rounds = vec![
390            vec![10, 11, 12, 13],
391            vec![5, 2, 7, 4],
392            vec![0, 0, 0, 0],
393            vec![3, 4, 5, 6],
394        ];
395
396        round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
397
398        // Simulate accepting a block from authority 3
399        let test_block = TestBlock::new(7, 2)
400            .set_ancestors(vec![BlockRef::new(
401                6,
402                AuthorityIndex::new_for_test(3),
403                BlockDigest::MIN,
404            )])
405            .build();
406        let block = VerifiedBlock::new_for_test(test_block);
407        round_tracker.update_from_verified_block(&ExtendedBlock {
408            block,
409            excluded_ancestors: vec![BlockRef::new(
410                8,
411                AuthorityIndex::new_for_test(1),
412                BlockDigest::MIN,
413            )],
414        });
415
416        // Simulate proposing a new block
417        // note: not valid rounds, but tests that the max value will always be
418        // considered in calculations.
419        let test_block = TestBlock::new(11, own_index)
420            .set_ancestors(vec![
421                BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
422                BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
423            ])
424            .build();
425        let block = VerifiedBlock::new_for_test(test_block);
426        round_tracker.update_from_verified_block(&ExtendedBlock {
427            block,
428            excluded_ancestors: vec![BlockRef::new(
429                8,
430                AuthorityIndex::new_for_test(1),
431                BlockDigest::MIN,
432            )],
433        });
434
435        // Compute quorum rounds based on highest accepted rounds (max from prober
436        // or from blocks):
437        // 11, 11, 12, 13
438        //  5,  2,  7,  4
439        //  0,  8,  7,  6
440        //  3,  4,  5,  6
441
442        let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
443        let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
444
445        assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
446    }
447
448    #[tokio::test]
449    async fn test_quorum_round_manager() {
450        const NUM_AUTHORITIES: usize = 7;
451        let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
452
453        let highest_received_rounds = vec![
454            vec![110, 120, 130, 140, 150, 160, 170],
455            vec![109, 121, 131, 0, 151, 161, 171],
456            vec![101, 0, 103, 104, 105, 166, 107],
457            vec![0, 0, 0, 0, 0, 0, 0],
458            vec![100, 102, 133, 0, 155, 106, 177],
459            vec![105, 115, 103, 0, 125, 126, 127],
460            vec![0, 0, 0, 0, 0, 0, 0],
461        ];
462
463        let highest_accepted_rounds = vec![
464            vec![110, 120, 130, 140, 150, 160, 170],
465            vec![0, 121, 131, 0, 151, 161, 171],
466            vec![1, 0, 103, 104, 105, 166, 107],
467            vec![0, 0, 0, 0, 0, 0, 0],
468            vec![0, 102, 133, 0, 155, 106, 177],
469            vec![1, 115, 103, 0, 125, 126, 127],
470            vec![0, 0, 0, 0, 0, 0, 0],
471        ];
472
473        let mut round_tracker = RoundTracker::new(context.clone(), vec![]);
474
475        round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
476
477        // Create test blocks for each authority with incrementing rounds starting at 110
478        for authority in 0..NUM_AUTHORITIES {
479            let round = 110 + (authority as u32 * 10);
480            let block =
481                VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
482            round_tracker.update_from_verified_block(&ExtendedBlock {
483                block,
484                excluded_ancestors: vec![],
485            });
486        }
487
488        // Compute quorum rounds and propagation delay based on last proposed round = 110,
489        // and highest received rounds:
490        // 110, 120, 130, 140, 150, 160, 170,
491        // 109, 121, 131, 0,   151, 161, 171,
492        // 101, 0,   103, 104, 105, 166, 107,
493        // 0,   0,   0,   0,   0,   0,   0,
494        // 100, 102, 133, 0,   155, 106, 177,
495        // 105, 115, 103, 0,   125, 126, 127,
496        // 0,   0,   0,   0,   0,   0,   0,
497
498        let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
499        let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
500        assert_eq!(
501            received_quorum_rounds,
502            vec![
503                (100, 105),
504                (0, 115),
505                (103, 130),
506                (0, 0),
507                (105, 150),
508                (106, 160),
509                (107, 170)
510            ]
511        );
512
513        // Compute quorum rounds based on highest accepted rounds (max from prober
514        // or from blocks):
515        // 110, 120, 130, 140, 150, 160, 170,
516        //   0, 121, 131,   0, 151, 161, 171,
517        //   1,   0, 130, 104, 105, 166, 107,
518        //   0,   0,   0, 140,   0,   0,   0,
519        //   0, 102, 133,   0, 155, 106, 177,
520        //   1, 115, 103,   0, 125, 160, 127,
521        //   0,   0,   0,   0,   0,   0, 170,
522
523        assert_eq!(
524            accepted_quorum_rounds,
525            vec![
526                (0, 1),
527                (0, 115),
528                (103, 130),
529                (0, 104),
530                (105, 150),
531                (106, 160),
532                (127, 170)
533            ]
534        );
535
536        let propagation_delay = round_tracker.calculate_propagation_delay(110);
537
538        // 110 - 100 = 10
539        assert_eq!(propagation_delay, 10);
540    }
541}