consensus_core/
threshold_clock.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{cmp::Ordering, sync::Arc};

use consensus_types::block::{BlockRef, Round};
use tokio::time::Instant;
use tracing::{debug, info};

use crate::{
    context::Context,
    stake_aggregator::{QuorumThreshold, StakeAggregator},
};

pub(crate) struct ThresholdClock {
    context: Arc<Context>,
    aggregator: StakeAggregator<QuorumThreshold>,
    round: Round,
    // Timestamp when the last quorum was form and the current round started.
    quorum_ts: Instant,
}

impl ThresholdClock {
    pub(crate) fn new(round: Round, context: Arc<Context>) -> Self {
        info!("Recovered ThresholdClock at round {}", round);
        Self {
            context,
            aggregator: StakeAggregator::new(),
            round,
            quorum_ts: Instant::now(),
        }
    }

    /// Adds the block reference that have been accepted and advance the round accordingly.
    /// Returns true when the round has advanced.
    pub(crate) fn add_block(&mut self, block: BlockRef) -> bool {
        match block.round.cmp(&self.round) {
            // Blocks with round less then what we currently build are irrelevant here
            Ordering::Less => false,
            Ordering::Equal => {
                let now = Instant::now();
                if self.aggregator.add(block.author, &self.context.committee) {
                    self.aggregator.clear();
                    // We have seen 2f+1 blocks for current round, advance
                    self.round = block.round + 1;
                    // Record the time of last quorum and new round start.
                    self.quorum_ts = now;
                    debug!(
                        "ThresholdClock advanced to round {} with block {} completing quorum",
                        self.round, block
                    );
                    return true;
                }
                // Record delay from the start of the round.
                let hostname = &self.context.committee.authority(block.author).hostname;
                self.context
                    .metrics
                    .node_metrics
                    .block_receive_delay
                    .with_label_values(&[hostname])
                    .inc_by(now.duration_since(self.quorum_ts).as_millis() as u64);
                false
            }
            // If we processed block for round r, we also have stored 2f+1 blocks from r-1
            Ordering::Greater => {
                self.aggregator.clear();
                if self.aggregator.add(block.author, &self.context.committee) {
                    // Even though this is the first block of the round, there is still a quorum at block.round.
                    self.round = block.round + 1;
                } else {
                    // There is a quorum at block.round - 1 but not block.round.
                    self.round = block.round;
                };
                self.quorum_ts = Instant::now();
                debug!(
                    "ThresholdClock advanced to round {} with block {} catching up round",
                    self.round, block
                );
                true
            }
        }
    }

    /// Add the block references that have been successfully processed and advance the round accordingly. If the round
    /// has indeed advanced then the new round is returned, otherwise None is returned.
    #[cfg(test)]
    fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
        let previous_round = self.round;
        for block_ref in blocks {
            self.add_block(block_ref);
        }
        (self.round > previous_round).then_some(self.round)
    }

    pub(crate) fn get_round(&self) -> Round {
        self.round
    }

    pub(crate) fn get_quorum_ts(&self) -> Instant {
        self.quorum_ts
    }
}

#[cfg(test)]
mod tests {
    use consensus_types::block::BlockDigest;

    use super::*;
    use consensus_config::AuthorityIndex;

    #[tokio::test]
    async fn test_threshold_clock_add_block() {
        let context = Arc::new(Context::new_for_test(4).0);
        let mut aggregator = ThresholdClock::new(0, context);

        aggregator.add_block(BlockRef::new(
            0,
            AuthorityIndex::new_for_test(0),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 0);
        aggregator.add_block(BlockRef::new(
            0,
            AuthorityIndex::new_for_test(1),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 0);
        aggregator.add_block(BlockRef::new(
            0,
            AuthorityIndex::new_for_test(2),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 1);
        aggregator.add_block(BlockRef::new(
            1,
            AuthorityIndex::new_for_test(0),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 1);
        aggregator.add_block(BlockRef::new(
            1,
            AuthorityIndex::new_for_test(3),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 1);
        aggregator.add_block(BlockRef::new(
            2,
            AuthorityIndex::new_for_test(1),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 2);
        aggregator.add_block(BlockRef::new(
            1,
            AuthorityIndex::new_for_test(1),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 2);
        aggregator.add_block(BlockRef::new(
            5,
            AuthorityIndex::new_for_test(2),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 5);
    }

    #[tokio::test]
    async fn test_threshold_clock_add_blocks() {
        let context = Arc::new(Context::new_for_test(4).0);
        let mut aggregator = ThresholdClock::new(0, context);

        let block_refs = vec![
            BlockRef::new(0, AuthorityIndex::new_for_test(0), BlockDigest::default()),
            BlockRef::new(0, AuthorityIndex::new_for_test(1), BlockDigest::default()),
            BlockRef::new(0, AuthorityIndex::new_for_test(2), BlockDigest::default()),
            BlockRef::new(1, AuthorityIndex::new_for_test(0), BlockDigest::default()),
            BlockRef::new(1, AuthorityIndex::new_for_test(3), BlockDigest::default()),
            BlockRef::new(2, AuthorityIndex::new_for_test(1), BlockDigest::default()),
            BlockRef::new(1, AuthorityIndex::new_for_test(1), BlockDigest::default()),
            BlockRef::new(5, AuthorityIndex::new_for_test(2), BlockDigest::default()),
        ];

        let result = aggregator.add_blocks(block_refs);
        assert_eq!(Some(5), result);
    }

    #[tokio::test]
    async fn test_threshold_clock_add_block_min_committee() {
        let context = Arc::new(Context::new_for_test(1).0);
        let mut aggregator = ThresholdClock::new(10, context);

        // Adding a past block should not advance the round.
        assert!(!aggregator.add_block(BlockRef::new(
            9,
            AuthorityIndex::new_for_test(0),
            BlockDigest::default(),
        )));
        assert_eq!(aggregator.get_round(), 10);

        // Adding one block is enough to complete the quorum.
        assert!(aggregator.add_block(BlockRef::new(
            10,
            AuthorityIndex::new_for_test(0),
            BlockDigest::default(),
        )));
        assert_eq!(aggregator.get_round(), 11);

        // Adding a catch up block should both advance the round and complete the quorum.
        aggregator.add_block(BlockRef::new(
            20,
            AuthorityIndex::new_for_test(0),
            BlockDigest::default(),
        ));
        assert_eq!(aggregator.get_round(), 21);
    }
}