consensus_core/
threshold_clock.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{cmp::Ordering, sync::Arc};
5
6use consensus_types::block::{BlockRef, Round};
7use tokio::time::Instant;
8use tracing::{debug, info};
9
10use crate::{
11    context::Context,
12    stake_aggregator::{QuorumThreshold, StakeAggregator},
13};
14
15pub(crate) struct ThresholdClock {
16    context: Arc<Context>,
17    aggregator: StakeAggregator<QuorumThreshold>,
18    round: Round,
19    // Timestamp when the last quorum was form and the current round started.
20    quorum_ts: Instant,
21}
22
23impl ThresholdClock {
24    pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
25        info!("Recovered ThresholdClock at round {}", round);
26        Self {
27            context,
28            aggregator: StakeAggregator::new(),
29            round,
30            quorum_ts: Instant::now(),
31        }
32    }
33
34    /// Adds the block reference that have been accepted and advance the round accordingly.
35    /// Returns true when the round has advanced.
36    pub(crate) fn add_block(&mut self, block: BlockRef) -> bool {
37        match block.round.cmp(&self.round) {
38            // Blocks with round less then what we currently build are irrelevant here
39            Ordering::Less => false,
40            Ordering::Equal => {
41                let now = Instant::now();
42                if self.aggregator.add(block.author, &self.context.committee) {
43                    self.aggregator.clear();
44                    // We have seen 2f+1 blocks for current round, advance
45                    self.round = block.round + 1;
46                    // Record the time of last quorum and new round start.
47                    self.quorum_ts = now;
48                    debug!(
49                        "ThresholdClock advanced to round {} with block {} completing quorum",
50                        self.round, block
51                    );
52                    return true;
53                }
54                // Record delay from the start of the round.
55                let hostname = &self.context.committee.authority(block.author).hostname;
56                self.context
57                    .metrics
58                    .node_metrics
59                    .block_receive_delay
60                    .with_label_values(&[hostname])
61                    .inc_by(now.duration_since(self.quorum_ts).as_millis() as u64);
62                false
63            }
64            // If we processed block for round r, we also have stored 2f+1 blocks from r-1
65            Ordering::Greater => {
66                self.aggregator.clear();
67                if self.aggregator.add(block.author, &self.context.committee) {
68                    // Even though this is the first block of the round, there is still a quorum at block.round.
69                    self.round = block.round + 1;
70                } else {
71                    // There is a quorum at block.round - 1 but not block.round.
72                    self.round = block.round;
73                };
74                self.quorum_ts = Instant::now();
75                debug!(
76                    "ThresholdClock advanced to round {} with block {} catching up round",
77                    self.round, block
78                );
79                true
80            }
81        }
82    }
83
84    /// Add the block references that have been successfully processed and advance the round accordingly. If the round
85    /// has indeed advanced then the new round is returned, otherwise None is returned.
86    #[cfg(test)]
87    fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
88        let previous_round = self.round;
89        for block_ref in blocks {
90            self.add_block(block_ref);
91        }
92        (self.round > previous_round).then_some(self.round)
93    }
94
95    pub(crate) fn get_round(&self) -> Round {
96        self.round
97    }
98
99    pub(crate) fn get_quorum_ts(&self) -> Instant {
100        self.quorum_ts
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use consensus_types::block::BlockDigest;
107
108    use super::*;
109    use consensus_config::AuthorityIndex;
110
111    #[tokio::test]
112    async fn test_threshold_clock_add_block() {
113        let context = Arc::new(Context::new_for_test(4).0);
114        let mut aggregator = ThresholdClock::new(0, context);
115
116        aggregator.add_block(BlockRef::new(
117            0,
118            AuthorityIndex::new_for_test(0),
119            BlockDigest::default(),
120        ));
121        assert_eq!(aggregator.get_round(), 0);
122        aggregator.add_block(BlockRef::new(
123            0,
124            AuthorityIndex::new_for_test(1),
125            BlockDigest::default(),
126        ));
127        assert_eq!(aggregator.get_round(), 0);
128        aggregator.add_block(BlockRef::new(
129            0,
130            AuthorityIndex::new_for_test(2),
131            BlockDigest::default(),
132        ));
133        assert_eq!(aggregator.get_round(), 1);
134        aggregator.add_block(BlockRef::new(
135            1,
136            AuthorityIndex::new_for_test(0),
137            BlockDigest::default(),
138        ));
139        assert_eq!(aggregator.get_round(), 1);
140        aggregator.add_block(BlockRef::new(
141            1,
142            AuthorityIndex::new_for_test(3),
143            BlockDigest::default(),
144        ));
145        assert_eq!(aggregator.get_round(), 1);
146        aggregator.add_block(BlockRef::new(
147            2,
148            AuthorityIndex::new_for_test(1),
149            BlockDigest::default(),
150        ));
151        assert_eq!(aggregator.get_round(), 2);
152        aggregator.add_block(BlockRef::new(
153            1,
154            AuthorityIndex::new_for_test(1),
155            BlockDigest::default(),
156        ));
157        assert_eq!(aggregator.get_round(), 2);
158        aggregator.add_block(BlockRef::new(
159            5,
160            AuthorityIndex::new_for_test(2),
161            BlockDigest::default(),
162        ));
163        assert_eq!(aggregator.get_round(), 5);
164    }
165
166    #[tokio::test]
167    async fn test_threshold_clock_add_blocks() {
168        let context = Arc::new(Context::new_for_test(4).0);
169        let mut aggregator = ThresholdClock::new(0, context);
170
171        let block_refs = vec![
172            BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
173            BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
174            BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
175            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
176            BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
177            BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
178            BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
179            BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
180        ];
181
182        let result = aggregator.add_blocks(block_refs);
183        assert_eq!(Some(5), result);
184    }
185
186    #[tokio::test]
187    async fn test_threshold_clock_add_block_min_committee() {
188        let context = Arc::new(Context::new_for_test(1).0);
189        let mut aggregator = ThresholdClock::new(10, context);
190
191        // Adding a past block should not advance the round.
192        assert!(!aggregator.add_block(BlockRef::new(
193            9,
194            AuthorityIndex::new_for_test(0),
195            BlockDigest::default(),
196        )));
197        assert_eq!(aggregator.get_round(), 10);
198
199        // Adding one block is enough to complete the quorum.
200        assert!(aggregator.add_block(BlockRef::new(
201            10,
202            AuthorityIndex::new_for_test(0),
203            BlockDigest::default(),
204        )));
205        assert_eq!(aggregator.get_round(), 11);
206
207        // Adding a catch up block should both advance the round and complete the quorum.
208        aggregator.add_block(BlockRef::new(
209            20,
210            AuthorityIndex::new_for_test(0),
211            BlockDigest::default(),
212        ));
213        assert_eq!(aggregator.get_round(), 21);
214    }
215}