consensus_core/
round_tracker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! RoundTracker computes quorum rounds for the latest received and accepted rounds.
5//! This round data is gathered from peers via RoundProber or via new Blocks received. Also
6//! local accepted rounds are updated from new blocks proposed from this authority.
7//!
8//! Quorum rounds provides insight into how effectively each authority's blocks are propagated
9//! and accepted across the network.
10
11use std::sync::Arc;
12
13use consensus_config::{AuthorityIndex, Committee};
14use consensus_types::block::Round;
15use itertools::Itertools;
16use tracing::{debug, trace};
17
18use crate::{
19    block::{BlockAPI, ExtendedBlock},
20    context::Context,
21};
22
23/// A [`QuorumRound`] is a round range [low, high]. It is computed from
24/// highest received or accepted rounds of an authority reported by all
25/// authorities.
26/// The bounds represent:
27/// - the highest round lower or equal to rounds from a quorum (low)
28/// - the lowest round higher or equal to rounds from a quorum (high)
29///
30/// [`QuorumRound`] is useful because:
31/// - [low, high] range is BFT, always between the lowest and highest rounds
32///   of honest validators, with < validity threshold of malicious stake.
33/// - It provides signals about how well blocks from an authority propagates
34///   in the network. If low bound for an authority is lower than its last
35///   proposed round, the last proposed block has not propagated to a quorum.
36///   If a new block is proposed from the authority, it will not get accepted
37///   immediately by a quorum.
38pub(crate) type QuorumRound = (Round, Round);
39
40pub(crate) struct PeerRoundTracker {
41    context: Arc<Context>,
42    /// Highest accepted round per authority from received blocks (included/excluded ancestors)
43    block_accepted_rounds: Vec<Vec<Round>>,
44    /// Highest accepted round per authority from round prober
45    probed_accepted_rounds: Vec<Vec<Round>>,
46    /// Highest received round per authority from round prober
47    probed_received_rounds: Vec<Vec<Round>>,
48}
49
50impl PeerRoundTracker {
51    pub(crate) fn new(context: Arc<Context>) -> Self {
52        let size = context.committee.size();
53        Self {
54            context,
55            block_accepted_rounds: vec![vec![0; size]; size],
56            probed_accepted_rounds: vec![vec![0; size]; size],
57            probed_received_rounds: vec![vec![0; size]; size],
58        }
59    }
60
61    /// Update accepted rounds based on a new block created locally or received from the network
62    /// and its excluded ancestors
63    pub(crate) fn update_from_accepted_block(&mut self, extended_block: &ExtendedBlock) {
64        let block = &extended_block.block;
65        let excluded_ancestors = &extended_block.excluded_ancestors;
66        let author = block.author();
67
68        // Update author accepted round from block round
69        self.block_accepted_rounds[author][author] =
70            self.block_accepted_rounds[author][author].max(block.round());
71
72        // Update accepted rounds from included ancestors
73        for ancestor in block.ancestors() {
74            self.block_accepted_rounds[author][ancestor.author] =
75                self.block_accepted_rounds[author][ancestor.author].max(ancestor.round);
76        }
77
78        // Update accepted rounds from excluded ancestors
79        for excluded_ancestor in excluded_ancestors {
80            self.block_accepted_rounds[author][excluded_ancestor.author] = self
81                .block_accepted_rounds[author][excluded_ancestor.author]
82                .max(excluded_ancestor.round);
83        }
84    }
85
86    /// Update accepted & received rounds based on probing results
87    pub(crate) fn update_from_probe(
88        &mut self,
89        accepted_rounds: Vec<Vec<Round>>,
90        received_rounds: Vec<Vec<Round>>,
91    ) {
92        self.probed_accepted_rounds = accepted_rounds;
93        self.probed_received_rounds = received_rounds;
94    }
95
96    // Returns the propagation delay of own blocks.
97    pub(crate) fn calculate_propagation_delay(&self, last_proposed_round: Round) -> Round {
98        let own_index = self.context.own_index;
99        let node_metrics = &self.context.metrics.node_metrics;
100        let received_quorum_rounds = self.compute_received_quorum_rounds();
101        let accepted_quorum_rounds = self.compute_accepted_quorum_rounds();
102        for ((low, high), (_, authority)) in received_quorum_rounds
103            .iter()
104            .zip(self.context.committee.authorities())
105        {
106            node_metrics
107                .round_tracker_received_quorum_round_gaps
108                .with_label_values(&[&authority.hostname])
109                .set((high - low) as i64);
110            node_metrics
111                .round_tracker_low_received_quorum_round
112                .with_label_values(&[&authority.hostname])
113                .set(*low as i64);
114            // The gap can be negative if this validator is lagging behind the network.
115            node_metrics
116                .round_tracker_current_received_round_gaps
117                .with_label_values(&[&authority.hostname])
118                .set(last_proposed_round as i64 - *low as i64);
119        }
120
121        for ((low, high), (_, authority)) in accepted_quorum_rounds
122            .iter()
123            .zip(self.context.committee.authorities())
124        {
125            node_metrics
126                .round_tracker_accepted_quorum_round_gaps
127                .with_label_values(&[&authority.hostname])
128                .set((high - low) as i64);
129            node_metrics
130                .round_tracker_low_accepted_quorum_round
131                .with_label_values(&[&authority.hostname])
132                .set(*low as i64);
133            // The gap can be negative if this validator is lagging behind the network.
134            node_metrics
135                .round_tracker_current_accepted_round_gaps
136                .with_label_values(&[&authority.hostname])
137                .set(last_proposed_round as i64 - *low as i64);
138        }
139
140        // TODO: consider using own quorum round gap to control proposing in addition to
141        // propagation delay. For now they seem to be about the same.
142
143        // It is possible more blocks arrive at a quorum of peers before the get_latest_rounds
144        // requests arrive.
145        // Using the lower bound to increase sensitivity about block propagation issues
146        // that can reduce round rate.
147        // Because of the nature of TCP and block streaming, propagation delay is expected to be
148        // 0 in most cases, even when the actual latency of broadcasting blocks is high.
149        // We will use the min propagation delay from either accepted or received rounds.
150        // As stated above new blocks can arrive after the rounds have been probed, so its
151        // likely accepted rounds from new blocks will provide us with the more accurate
152        // propagation delay which is important because we now calculate the propagation
153        // delay more frequently then before.
154        let propagation_delay = last_proposed_round
155            .saturating_sub(received_quorum_rounds[own_index].0)
156            .min(last_proposed_round.saturating_sub(accepted_quorum_rounds[own_index].0));
157
158        node_metrics
159            .round_tracker_propagation_delays
160            .observe(propagation_delay as f64);
161        node_metrics
162            .round_tracker_last_propagation_delay
163            .set(propagation_delay as i64);
164
165        debug!(
166            "Computed propagation delay of {propagation_delay} based on last proposed \
167                round ({last_proposed_round})."
168        );
169
170        propagation_delay
171    }
172
173    pub(crate) fn compute_accepted_quorum_rounds(&self) -> Vec<QuorumRound> {
174        let highest_accepted_rounds = self
175            .probed_accepted_rounds
176            .iter()
177            .zip(self.block_accepted_rounds.iter())
178            .map(|(probed_rounds, block_rounds)| {
179                probed_rounds
180                    .iter()
181                    .zip(block_rounds.iter())
182                    .map(|(probed_round, block_round)| *probed_round.max(block_round))
183                    .collect::<Vec<Round>>()
184            })
185            .collect::<Vec<Vec<Round>>>();
186        let accepted_quorum_rounds = self
187            .context
188            .committee
189            .authorities()
190            .map(|(peer, _)| {
191                compute_quorum_round(&self.context.committee, peer, &highest_accepted_rounds)
192            })
193            .collect::<Vec<_>>();
194
195        trace!(
196            "Computed accepted quorum round per authority: {}",
197            self.context
198                .committee
199                .authorities()
200                .zip(accepted_quorum_rounds.iter())
201                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
202                .join(", ")
203        );
204
205        accepted_quorum_rounds
206    }
207
208    fn compute_received_quorum_rounds(&self) -> Vec<QuorumRound> {
209        let received_quorum_rounds = self
210            .context
211            .committee
212            .authorities()
213            .map(|(peer, _)| {
214                compute_quorum_round(&self.context.committee, peer, &self.probed_received_rounds)
215            })
216            .collect::<Vec<_>>();
217
218        trace!(
219            "Computed received quorum round per authority: {}",
220            self.context
221                .committee
222                .authorities()
223                .zip(received_quorum_rounds.iter())
224                .map(|((i, _), rounds)| format!("{i}: {rounds:?}"))
225                .join(", ")
226        );
227
228        received_quorum_rounds
229    }
230}
231
232/// For the peer specified with target_index, compute and return its [`QuorumRound`].
233fn compute_quorum_round(
234    committee: &Committee,
235    target_index: AuthorityIndex,
236    highest_rounds: &[Vec<Round>],
237) -> QuorumRound {
238    let mut rounds_with_stake = highest_rounds
239        .iter()
240        .zip(committee.authorities())
241        .map(|(rounds, (_, authority))| (rounds[target_index], authority.stake))
242        .collect::<Vec<_>>();
243    rounds_with_stake.sort();
244
245    // Forward iteration and stopping at validity threshold would produce the same result currently,
246    // with fault tolerance of f/3f+1 votes. But it is not semantically correct, and will provide an
247    // incorrect value when fault tolerance and validity threshold are different.
248    let mut total_stake = 0;
249    let mut low = 0;
250    for (round, stake) in rounds_with_stake.iter().rev() {
251        total_stake += stake;
252        if total_stake >= committee.quorum_threshold() {
253            low = *round;
254            break;
255        }
256    }
257
258    let mut total_stake = 0;
259    let mut high = 0;
260    for (round, stake) in rounds_with_stake.iter() {
261        total_stake += stake;
262        if total_stake >= committee.quorum_threshold() {
263            high = *round;
264            break;
265        }
266    }
267
268    (low, high)
269}
270
271#[cfg(test)]
272mod test {
273    use std::sync::Arc;
274
275    use consensus_config::AuthorityIndex;
276    use consensus_types::block::{BlockDigest, BlockRef};
277
278    use crate::{
279        TestBlock, VerifiedBlock,
280        block::ExtendedBlock,
281        context::Context,
282        round_tracker::{PeerRoundTracker, compute_quorum_round},
283    };
284
285    #[tokio::test]
286    async fn test_compute_quorum_round() {
287        let (context, _) = Context::new_for_test(4);
288
289        // Observe latest rounds from peers.
290        let highest_received_rounds = vec![
291            vec![10, 11, 12, 13],
292            vec![5, 2, 7, 4],
293            vec![0, 0, 0, 0],
294            vec![3, 4, 5, 6],
295        ];
296
297        let round = compute_quorum_round(
298            &context.committee,
299            AuthorityIndex::new_for_test(0),
300            &highest_received_rounds,
301        );
302        assert_eq!(round, (3, 5));
303
304        let round = compute_quorum_round(
305            &context.committee,
306            AuthorityIndex::new_for_test(1),
307            &highest_received_rounds,
308        );
309        assert_eq!(round, (2, 4));
310
311        let round = compute_quorum_round(
312            &context.committee,
313            AuthorityIndex::new_for_test(2),
314            &highest_received_rounds,
315        );
316        assert_eq!(round, (5, 7));
317
318        let round = compute_quorum_round(
319            &context.committee,
320            AuthorityIndex::new_for_test(3),
321            &highest_received_rounds,
322        );
323        assert_eq!(round, (4, 6));
324    }
325
326    #[tokio::test]
327    async fn test_compute_received_quorum_round() {
328        telemetry_subscribers::init_for_testing();
329        let (context, _) = Context::new_for_test(4);
330        let context = Arc::new(context);
331        let mut round_tracker = PeerRoundTracker::new(context);
332
333        // Observe latest rounds from peers.
334        let highest_received_rounds = vec![
335            vec![10, 11, 12, 13],
336            vec![5, 2, 7, 4],
337            vec![0, 0, 0, 0],
338            vec![3, 4, 5, 6],
339        ];
340
341        let expected_received_quorum_rounds = vec![(3, 5), (2, 4), (5, 7), (4, 6)];
342
343        round_tracker.update_from_probe(vec![], highest_received_rounds);
344
345        let received_quourum_rounds = round_tracker.compute_received_quorum_rounds();
346
347        assert_eq!(expected_received_quorum_rounds, received_quourum_rounds);
348    }
349
350    #[tokio::test]
351    async fn test_compute_accepted_quorum_round() {
352        const NUM_AUTHORITIES: usize = 4;
353        let (context, _) = Context::new_for_test(NUM_AUTHORITIES);
354        let context = Arc::new(context);
355        let own_index = context.own_index.value() as u32;
356        let mut round_tracker = PeerRoundTracker::new(context);
357
358        // Observe latest rounds from peers.
359        let highest_accepted_rounds = vec![
360            vec![10, 11, 12, 13],
361            vec![5, 2, 7, 4],
362            vec![0, 0, 0, 0],
363            vec![3, 4, 5, 6],
364        ];
365
366        round_tracker.update_from_probe(highest_accepted_rounds, vec![]);
367
368        // Simulate accepting a block from authority 3
369        let test_block = TestBlock::new(7, 2)
370            .set_ancestors(vec![BlockRef::new(
371                6,
372                AuthorityIndex::new_for_test(3),
373                BlockDigest::MIN,
374            )])
375            .build();
376        let block = VerifiedBlock::new_for_test(test_block);
377        round_tracker.update_from_accepted_block(&ExtendedBlock {
378            block,
379            excluded_ancestors: vec![BlockRef::new(
380                8,
381                AuthorityIndex::new_for_test(1),
382                BlockDigest::MIN,
383            )],
384        });
385
386        // Simulate proposing a new block
387        // note: not valid rounds, but tests that the max value will always be
388        // considered in calculations.
389        let test_block = TestBlock::new(11, own_index)
390            .set_ancestors(vec![
391                BlockRef::new(7, AuthorityIndex::new_for_test(2), BlockDigest::MIN),
392                BlockRef::new(6, AuthorityIndex::new_for_test(3), BlockDigest::MIN),
393            ])
394            .build();
395        let block = VerifiedBlock::new_for_test(test_block);
396        round_tracker.update_from_accepted_block(&ExtendedBlock {
397            block,
398            excluded_ancestors: vec![BlockRef::new(
399                8,
400                AuthorityIndex::new_for_test(1),
401                BlockDigest::MIN,
402            )],
403        });
404
405        // Compute quorum rounds based on highest accepted rounds (max from prober
406        // or from blocks):
407        // 11, 11, 12, 13
408        //  5,  2,  7,  4
409        //  0,  8,  7,  6
410        //  3,  4,  5,  6
411
412        let expected_accepted_quorum_rounds = vec![(3, 5), (4, 8), (7, 7), (6, 6)];
413        let accepted_quourum_rounds = round_tracker.compute_accepted_quorum_rounds();
414
415        assert_eq!(expected_accepted_quorum_rounds, accepted_quourum_rounds);
416    }
417
418    #[tokio::test]
419    async fn test_quorum_round_manager() {
420        const NUM_AUTHORITIES: usize = 7;
421        let context = Arc::new(Context::new_for_test(NUM_AUTHORITIES).0);
422
423        let highest_received_rounds = vec![
424            vec![110, 120, 130, 140, 150, 160, 170],
425            vec![109, 121, 131, 0, 151, 161, 171],
426            vec![101, 0, 103, 104, 105, 166, 107],
427            vec![0, 0, 0, 0, 0, 0, 0],
428            vec![100, 102, 133, 0, 155, 106, 177],
429            vec![105, 115, 103, 0, 125, 126, 127],
430            vec![0, 0, 0, 0, 0, 0, 0],
431        ];
432
433        let highest_accepted_rounds = vec![
434            vec![110, 120, 130, 140, 150, 160, 170],
435            vec![0, 121, 131, 0, 151, 161, 171],
436            vec![1, 0, 103, 104, 105, 166, 107],
437            vec![0, 0, 0, 0, 0, 0, 0],
438            vec![0, 102, 133, 0, 155, 106, 177],
439            vec![1, 115, 103, 0, 125, 126, 127],
440            vec![0, 0, 0, 0, 0, 0, 0],
441        ];
442
443        let mut round_tracker = PeerRoundTracker::new(context.clone());
444
445        round_tracker.update_from_probe(highest_accepted_rounds, highest_received_rounds);
446
447        // Create test blocks for each authority with incrementing rounds starting at 110
448        for authority in 0..NUM_AUTHORITIES {
449            let round = 110 + (authority as u32 * 10);
450            let block =
451                VerifiedBlock::new_for_test(TestBlock::new(round, authority as u32).build());
452            round_tracker.update_from_accepted_block(&ExtendedBlock {
453                block,
454                excluded_ancestors: vec![],
455            });
456        }
457
458        // Compute quorum rounds and propagation delay based on last proposed round = 110,
459        // and highest received rounds:
460        // 110, 120, 130, 140, 150, 160, 170,
461        // 109, 121, 131, 0,   151, 161, 171,
462        // 101, 0,   103, 104, 105, 166, 107,
463        // 0,   0,   0,   0,   0,   0,   0,
464        // 100, 102, 133, 0,   155, 106, 177,
465        // 105, 115, 103, 0,   125, 126, 127,
466        // 0,   0,   0,   0,   0,   0,   0,
467
468        let received_quorum_rounds = round_tracker.compute_received_quorum_rounds();
469        let accepted_quorum_rounds = round_tracker.compute_accepted_quorum_rounds();
470        assert_eq!(
471            received_quorum_rounds,
472            vec![
473                (100, 105),
474                (0, 115),
475                (103, 130),
476                (0, 0),
477                (105, 150),
478                (106, 160),
479                (107, 170)
480            ]
481        );
482
483        // Compute quorum rounds based on highest accepted rounds (max from prober
484        // or from blocks):
485        // 110, 120, 130, 140, 150, 160, 170,
486        //   0, 121, 131,   0, 151, 161, 171,
487        //   1,   0, 130, 104, 105, 166, 107,
488        //   0,   0,   0, 140,   0,   0,   0,
489        //   0, 102, 133,   0, 155, 106, 177,
490        //   1, 115, 103,   0, 125, 160, 127,
491        //   0,   0,   0,   0,   0,   0, 170,
492
493        assert_eq!(
494            accepted_quorum_rounds,
495            vec![
496                (0, 1),
497                (0, 115),
498                (103, 130),
499                (0, 104),
500                (105, 150),
501                (106, 160),
502                (127, 170)
503            ]
504        );
505
506        let propagation_delay = round_tracker.calculate_propagation_delay(110);
507
508        // 110 - 100 = 10
509        assert_eq!(propagation_delay, 10);
510    }
511}