consensus_core/
round_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RoundTracker computes quorum rounds for the latest received and accepted rounds.
5//! This round data is gathered from peers via RoundProber or via new Blocks received. Also
6//! local accepted rounds are updated from new blocks proposed from this authority.
7//!
8//! Quorum rounds provides insight into how effectively each authority's blocks are propagated
9//! and accepted across the network.
10
11use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19    block::{BlockAPI, ExtendedBlock},
20    context::Context,
21};
22
23/// A [`QuorumRound`] is a round range [low, high]. It is computed from
24/// highest received or accepted rounds of an authority reported by all
25/// authorities.
26/// The bounds represent:
27/// - the highest round lower or equal to rounds from a quorum (low)
28/// - the lowest round higher or equal to rounds from a quorum (high)
29///
30/// [`QuorumRound`] is useful because:
31/// - [low, high] range is BFT, always between the lowest and highest rounds
32///   of honest validators, with < validity threshold of malicious stake.
33/// - It provides signals about how well blocks from an authority propagates
34///   in the network. If low bound for an authority is lower than its last
35///   proposed round, the last proposed block has not propagated to a quorum.
36///   If a new block is proposed from the authority, it will not get accepted
37///   immediately by a quorum.
38pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct PeerRoundTracker {
41    context: Arc<Context>,
42    /// Highest accepted round per authority from received blocks (included/excluded ancestors)
43    block_accepted_rounds: Vec<Vec<Round>>,
44    /// Highest accepted round per authority from round prober
45    probed_accepted_rounds: Vec<Vec<Round>>,
46    /// Highest received round per authority from round prober
47    probed_received_rounds: Vec<Vec<Round>>,
48}
49
50impl PeerRoundTracker {
51    pub(crate) fn new(context: Arc<Context>) -> Self {
52        let size = context.committee.size();
53        Self {
54            context,
55            block_accepted_rounds: vec![vec![0; size]; size],
56            probed_accepted_rounds: vec![vec![0; size]; size],
57            probed_received_rounds: vec![vec![0; size]; size],
58        }
59    }
60
61    /// Update accepted rounds based on a new block created locally or received from the network
62    /// and its excluded ancestors.
63    /// Assumes the block and the excluded ancestors have been verified for validity.
64    pub(crate) fn update_from_verified_block(&mut self, extended_block: &ExtendedBlock) {
65        let block = &extended_block.block;
66        let excluded_ancestors = &extended_block.excluded_ancestors;
67        let author = block.author();
68
69        // Update author accepted round from block round
70        self.block_accepted_rounds[author][author] =
71            self.block_accepted_rounds[author][author].max(block.round());
72
73        // Update accepted rounds from included ancestors
74        for ancestor in block.ancestors() {
75            self.block_accepted_rounds[author][ancestor.author] =
76                self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
77        }
78
79        // Update accepted rounds from excluded ancestors
80        for excluded_ancestor in excluded_ancestors {
81            self.block_accepted_rounds[author][excluded_ancestor.author] = self
82                .block_accepted_rounds[author][excluded_ancestor.author]
83                .max(excluded_ancestor.round);
84        }
85    }
86
87    /// Update accepted & received rounds based on probing results
88    pub(crate) fn update_from_probe(
89        &mut self,
90        accepted_rounds: Vec<Vec<Round>>,
91        received_rounds: Vec<Vec<Round>>,
92    ) {
93        self.probed_accepted_rounds = accepted_rounds;
94        self.probed_received_rounds = received_rounds;
95    }
96
97    // Returns the propagation delay of own blocks.
98    pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
99        let own_index = self.context.own_index;
100        let node_metrics = &self.context.metrics.node_metrics;
101        let received_quorum_rounds = self.compute_received_quorum_rounds();
102        let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
103        for ((low, high), (_, authority)) in received_quorum_rounds
104            .iter()
105            .zip(self.context.committee.authorities())
106        {
107            node_metrics
108                .round_tracker_received_quorum_round_gaps
109                .with_label_values(&[&authority.hostname])
110                .set((high - low) as i64);
111            node_metrics
112                .round_tracker_low_received_quorum_round
113                .with_label_values(&[&authority.hostname])
114                .set(*low as i64);
115            // The gap can be negative if this validator is lagging behind the network.
116            node_metrics
117                .round_tracker_current_received_round_gaps
118                .with_label_values(&[&authority.hostname])
119                .set(last_proposed_round as i64 - *low as i64);
120        }
121
122        for ((low, high), (_, authority)) in accepted_quorum_rounds
123            .iter()
124            .zip(self.context.committee.authorities())
125        {
126            node_metrics
127                .round_tracker_accepted_quorum_round_gaps
128                .with_label_values(&[&authority.hostname])
129                .set((high - low) as i64);
130            node_metrics
131                .round_tracker_low_accepted_quorum_round
132                .with_label_values(&[&authority.hostname])
133                .set(*low as i64);
134            // The gap can be negative if this validator is lagging behind the network.
135            node_metrics
136                .round_tracker_current_accepted_round_gaps
137                .with_label_values(&[&authority.hostname])
138                .set(last_proposed_round as i64 - *low as i64);
139        }
140
141        // TODO: consider using own quorum round gap to control proposing in addition to
142        // propagation delay. For now they seem to be about the same.
143
144        // It is possible more blocks arrive at a quorum of peers before the get_latest_rounds
145        // requests arrive.
146        // Using the lower bound to increase sensitivity about block propagation issues
147        // that can reduce round rate.
148        // Because of the nature of TCP and block streaming, propagation delay is expected to be
149        // 0 in most cases, even when the actual latency of broadcasting blocks is high.
150        // We will use the min propagation delay from either accepted or received rounds.
151        // As stated above new blocks can arrive after the rounds have been probed, so its
152        // likely accepted rounds from new blocks will provide us with the more accurate
153        // propagation delay which is important because we now calculate the propagation
154        // delay more frequently then before.
155        let propagation_delay = last_proposed_round
156            .saturating_sub(received_quorum_rounds[own_index].0)
157            .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
158
159        node_metrics
160            .round_tracker_propagation_delays
161            .observe(propagation_delay as f64);
162        node_metrics
163            .round_tracker_last_propagation_delay
164            .set(propagation_delay as i64);
165
166        debug!(
167            "Computed propagation delay of {propagation_delay} based on last proposed \
168                round ({last_proposed_round})."
169        );
170
171        propagation_delay
172    }
173
174    pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
175        let highest_accepted_rounds = self
176            .probed_accepted_rounds
177            .iter()
178            .zip(self.block_accepted_rounds.iter())
179            .map(|(probed_rounds, block_rounds)| {
180                probed_rounds
181                    .iter()
182                    .zip(block_rounds.iter())
183                    .map(|(probed_round, block_round)| *probed_round.max(block_round))
184                    .collect::<Vec<Round>>()
185            })
186            .collect::<Vec<Vec<Round>>>();
187        let accepted_quorum_rounds = self
188            .context
189            .committee
190            .authorities()
191            .map(|(peer, _)| {
192                compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
193            })
194            .collect::<Vec<_>>();
195
196        trace!(
197            "Computed accepted quorum round per authority: {}",
198            self.context
199                .committee
200                .authorities()
201                .zip(accepted_quorum_rounds.iter())
202                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
203                .join(", ")
204        );
205
206        accepted_quorum_rounds
207    }
208
209    fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
210        let received_quorum_rounds = self
211            .context
212            .committee
213            .authorities()
214            .map(|(peer, _)| {
215                compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
216            })
217            .collect::<Vec<_>>();
218
219        trace!(
220            "Computed received quorum round per authority: {}",
221            self.context
222                .committee
223                .authorities()
224                .zip(received_quorum_rounds.iter())
225                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
226                .join(", ")
227        );
228
229        received_quorum_rounds
230    }
231}
232
233/// For the peer specified with target_index, compute and return its [`QuorumRound`].
234fn compute_quorum_round(
235    committee: &Committee,
236    target_index: AuthorityIndex,
237    highest_rounds: &[Vec<Round>],
238) -> QuorumRound {
239    let mut rounds_with_stake = highest_rounds
240        .iter()
241        .zip(committee.authorities())
242        .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
243        .collect::<Vec<_>>();
244    rounds_with_stake.sort();
245
246    // Forward iteration and stopping at validity threshold would produce the same result currently,
247    // with fault tolerance of f/3f+1 votes. But it is not semantically correct, and will provide an
248    // incorrect value when fault tolerance and validity threshold are different.
249    let mut total_stake = 0;
250    let mut low = 0;
251    for (round, stake) in rounds_with_stake.iter().rev() {
252        total_stake += stake;
253        if total_stake >= committee.quorum_threshold() {
254            low = *round;
255            break;
256        }
257    }
258
259    let mut total_stake = 0;
260    let mut high = 0;
261    for (round, stake) in rounds_with_stake.iter() {
262        total_stake += stake;
263        if total_stake >= committee.quorum_threshold() {
264            high = *round;
265            break;
266        }
267    }
268
269    (low, high)
270}
271
272#[cfg(test)]
273mod test {
274    use std::sync::Arc;
275
276    use consensus_config::AuthorityIndex;
277    use consensus_types::block::{BlockDigest, BlockRef};
278
279    use crate::{
280        TestBlock, VerifiedBlock,
281        block::ExtendedBlock,
282        context::Context,
283        round_tracker::{PeerRoundTracker, compute_quorum_round},
284    };
285
286    #[tokio::test]
287    async fn test_compute_quorum_round() {
288        let (context, _) = Context::new_for_test(4);
289
290        // Observe latest rounds from peers.
291        let highest_received_rounds = vec![
292            vec![10, 11, 12, 13],
293            vec![5, 2, 7, 4],
294            vec![0, 0, 0, 0],
295            vec![3, 4, 5, 6],
296        ];
297
298        let round = compute_quorum_round(
299            &context.committee,
300            AuthorityIndex::new_for_test(0),
301            &highest_received_rounds,
302        );
303        assert_eq!(round, (3, 5));
304
305        let round = compute_quorum_round(
306            &context.committee,
307            AuthorityIndex::new_for_test(1),
308            &highest_received_rounds,
309        );
310        assert_eq!(round, (2, 4));
311
312        let round = compute_quorum_round(
313            &context.committee,
314            AuthorityIndex::new_for_test(2),
315            &highest_received_rounds,
316        );
317        assert_eq!(round, (5, 7));
318
319        let round = compute_quorum_round(
320            &context.committee,
321            AuthorityIndex::new_for_test(3),
322            &highest_received_rounds,
323        );
324        assert_eq!(round, (4, 6));
325    }
326
327    #[tokio::test]
328    async fn test_compute_received_quorum_round() {
329        telemetry_subscribers::init_for_testing();
330        let (context, _) = Context::new_for_test(4);
331        let context = Arc::new(context);
332        let mut round_tracker = PeerRoundTracker::new(context);
333
334        // Observe latest rounds from peers.
335        let highest_received_rounds = vec![
336            vec![10, 11, 12, 13],
337            vec![5, 2, 7, 4],
338            vec![0, 0, 0, 0],
339            vec![3, 4, 5, 6],
340        ];
341
342        let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
343
344        round_tracker.update_from_probe(vec![], highest_received_rounds);
345
346        let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
347
348        assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
349    }
350
351    #[tokio::test]
352    async fn test_compute_accepted_quorum_round() {
353        const NUM_AUTHORITIES: usize = 4;
354        let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
355        let context = Arc::new(context);
356        let own_index = context.own_index.value() as u32;
357        let mut round_tracker = PeerRoundTracker::new(context);
358
359        // Observe latest rounds from peers.
360        let highest_accepted_rounds = vec![
361            vec![10, 11, 12, 13],
362            vec![5, 2, 7, 4],
363            vec![0, 0, 0, 0],
364            vec![3, 4, 5, 6],
365        ];
366
367        round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
368
369        // Simulate accepting a block from authority 3
370        let test_block = TestBlock::new(7, 2)
371            .set_ancestors(vec![BlockRef::new(
372                6,
373                AuthorityIndex::new_for_test(3),
374                BlockDigest::MIN,
375            )])
376            .build();
377        let block = VerifiedBlock::new_for_test(test_block);
378        round_tracker.update_from_verified_block(&ExtendedBlock {
379            block,
380            excluded_ancestors: vec![BlockRef::new(
381                8,
382                AuthorityIndex::new_for_test(1),
383                BlockDigest::MIN,
384            )],
385        });
386
387        // Simulate proposing a new block
388        // note: not valid rounds, but tests that the max value will always be
389        // considered in calculations.
390        let test_block = TestBlock::new(11, own_index)
391            .set_ancestors(vec![
392                BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
393                BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
394            ])
395            .build();
396        let block = VerifiedBlock::new_for_test(test_block);
397        round_tracker.update_from_verified_block(&ExtendedBlock {
398            block,
399            excluded_ancestors: vec![BlockRef::new(
400                8,
401                AuthorityIndex::new_for_test(1),
402                BlockDigest::MIN,
403            )],
404        });
405
406        // Compute quorum rounds based on highest accepted rounds (max from prober
407        // or from blocks):
408        // 11, 11, 12, 13
409        //  5,  2,  7,  4
410        //  0,  8,  7,  6
411        //  3,  4,  5,  6
412
413        let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
414        let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
415
416        assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
417    }
418
419    #[tokio::test]
420    async fn test_quorum_round_manager() {
421        const NUM_AUTHORITIES: usize = 7;
422        let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
423
424        let highest_received_rounds = vec![
425            vec![110, 120, 130, 140, 150, 160, 170],
426            vec![109, 121, 131, 0, 151, 161, 171],
427            vec![101, 0, 103, 104, 105, 166, 107],
428            vec![0, 0, 0, 0, 0, 0, 0],
429            vec![100, 102, 133, 0, 155, 106, 177],
430            vec![105, 115, 103, 0, 125, 126, 127],
431            vec![0, 0, 0, 0, 0, 0, 0],
432        ];
433
434        let highest_accepted_rounds = vec![
435            vec![110, 120, 130, 140, 150, 160, 170],
436            vec![0, 121, 131, 0, 151, 161, 171],
437            vec![1, 0, 103, 104, 105, 166, 107],
438            vec![0, 0, 0, 0, 0, 0, 0],
439            vec![0, 102, 133, 0, 155, 106, 177],
440            vec![1, 115, 103, 0, 125, 126, 127],
441            vec![0, 0, 0, 0, 0, 0, 0],
442        ];
443
444        let mut round_tracker = PeerRoundTracker::new(context.clone());
445
446        round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
447
448        // Create test blocks for each authority with incrementing rounds starting at 110
449        for authority in 0..NUM_AUTHORITIES {
450            let round = 110 + (authority as u32 * 10);
451            let block =
452                VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
453            round_tracker.update_from_verified_block(&ExtendedBlock {
454                block,
455                excluded_ancestors: vec![],
456            });
457        }
458
459        // Compute quorum rounds and propagation delay based on last proposed round = 110,
460        // and highest received rounds:
461        // 110, 120, 130, 140, 150, 160, 170,
462        // 109, 121, 131, 0,   151, 161, 171,
463        // 101, 0,   103, 104, 105, 166, 107,
464        // 0,   0,   0,   0,   0,   0,   0,
465        // 100, 102, 133, 0,   155, 106, 177,
466        // 105, 115, 103, 0,   125, 126, 127,
467        // 0,   0,   0,   0,   0,   0,   0,
468
469        let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
470        let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
471        assert_eq!(
472            received_quorum_rounds,
473            vec![
474                (100, 105),
475                (0, 115),
476                (103, 130),
477                (0, 0),
478                (105, 150),
479                (106, 160),
480                (107, 170)
481            ]
482        );
483
484        // Compute quorum rounds based on highest accepted rounds (max from prober
485        // or from blocks):
486        // 110, 120, 130, 140, 150, 160, 170,
487        //   0, 121, 131,   0, 151, 161, 171,
488        //   1,   0, 130, 104, 105, 166, 107,
489        //   0,   0,   0, 140,   0,   0,   0,
490        //   0, 102, 133,   0, 155, 106, 177,
491        //   1, 115, 103,   0, 125, 160, 127,
492        //   0,   0,   0,   0,   0,   0, 170,
493
494        assert_eq!(
495            accepted_quorum_rounds,
496            vec![
497                (0, 1),
498                (0, 115),
499                (103, 130),
500                (0, 104),
501                (105, 150),
502                (106, 160),
503                (127, 170)
504            ]
505        );
506
507        let propagation_delay = round_tracker.calculate_propagation_delay(110);
508
509        // 110 - 100 = 10
510        assert_eq!(propagation_delay, 10);
511    }
512}