consensus_core/
leader_timeout.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{sync::Arc, time::Duration};
4
5use consensus_types::block::Round;
6use tokio::{
7    sync::{
8        oneshot::{Receiver, Sender},
9        watch,
10    },
11    task::JoinHandle,
12    time::{Instant, sleep_until},
13};
14use tracing::{debug, warn};
15
16use crate::{context::Context, core::CoreSignalsReceivers, core_thread::CoreThreadDispatcher};
17
18pub(crate) struct LeaderTimeoutTaskHandle {
19    handle: JoinHandle<()>,
20    stop: Sender<()>,
21}
22
23impl LeaderTimeoutTaskHandle {
24    pub async fn stop(self) {
25        self.stop.send(()).ok();
26        self.handle.await.ok();
27    }
28}
29
30pub(crate) struct LeaderTimeoutTask<D: CoreThreadDispatcher> {
31    dispatcher: Arc<D>,
32    new_round_receiver: watch::Receiver<Round>,
33    leader_timeout: Duration,
34    min_round_delay: Duration,
35    stop: Receiver<()>,
36}
37
38impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
39    pub fn start(
40        dispatcher: Arc<D>,
41        signals_receivers: &CoreSignalsReceivers,
42        context: Arc<Context>,
43    ) -> LeaderTimeoutTaskHandle {
44        let (stop_sender, stop) = tokio::sync::oneshot::channel();
45        let mut me = Self {
46            dispatcher,
47            stop,
48            new_round_receiver: signals_receivers.new_round_receiver(),
49            leader_timeout: context.parameters.leader_timeout,
50            min_round_delay: context.parameters.min_round_delay,
51        };
52        let handle = tokio::spawn(async move { me.run().await });
53
54        LeaderTimeoutTaskHandle {
55            handle,
56            stop: stop_sender,
57        }
58    }
59
60    async fn run(&mut self) {
61        let new_round = &mut self.new_round_receiver;
62        let mut leader_round: Round = *new_round.borrow_and_update();
63        let mut min_leader_round_timed_out = false;
64        let mut max_leader_round_timed_out = false;
65        let timer_start = Instant::now();
66        let min_leader_timeout = sleep_until(timer_start + self.min_round_delay);
67        let max_leader_timeout = sleep_until(timer_start + self.leader_timeout);
68
69        tokio::pin!(min_leader_timeout);
70        tokio::pin!(max_leader_timeout);
71
72        loop {
73            tokio::select! {
74                // when the min leader timer expires then we attempt to trigger the creation of a new block.
75                // If we already timed out before then the branch gets disabled so we don't attempt
76                // all the time to produce already produced blocks for that round.
77                () = &mut min_leader_timeout, if !min_leader_round_timed_out => {
78                    if let Err(err) = self.dispatcher.new_block(leader_round, false).await {
79                        warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
80                        return;
81                    }
82                    min_leader_round_timed_out = true;
83                },
84                // When the max leader timer expires then we attempt to trigger the creation of a new block. This
85                // call is made with `force = true` to bypass any checks that allow to propose immediately if block
86                // not already produced.
87                // Keep in mind that first the min timeout should get triggered and then the max timeout, only
88                // if the round has not advanced in the meantime. Otherwise, the max timeout will not get
89                // triggered at all.
90                () = &mut max_leader_timeout, if !max_leader_round_timed_out => {
91                    if let Err(err) = self.dispatcher.new_block(leader_round, true).await {
92                        warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
93                        return;
94                    }
95                    max_leader_round_timed_out = true;
96                }
97
98                // a new round has been produced. Reset the leader timeout.
99                Ok(_) = new_round.changed() => {
100                    leader_round = *new_round.borrow_and_update();
101                    debug!("New round has been received {leader_round}, resetting timer");
102
103                    min_leader_round_timed_out = false;
104                    max_leader_round_timed_out = false;
105
106                    let now = Instant::now();
107                    min_leader_timeout
108                    .as_mut()
109                    .reset(now + self.min_round_delay);
110                    max_leader_timeout
111                    .as_mut()
112                    .reset(now + self.leader_timeout);
113                },
114                _ = &mut self.stop => {
115                    debug!("Stop signal has been received, now shutting down");
116                    return;
117                }
118            }
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::{collections::BTreeSet, sync::Arc, time::Duration};
126
127    use async_trait::async_trait;
128    use consensus_config::Parameters;
129    use consensus_types::block::{BlockRef, Round};
130    use parking_lot::Mutex;
131    use tokio::time::{Instant, sleep};
132
133    use crate::{
134        block::VerifiedBlock,
135        commit::CertifiedCommits,
136        context::Context,
137        core::CoreSignals,
138        core_thread::{CoreError, CoreThreadDispatcher},
139        leader_timeout::LeaderTimeoutTask,
140    };
141
142    #[derive(Clone, Default)]
143    struct MockCoreThreadDispatcher {
144        new_block_calls: Arc<Mutex<Vec<(Round, bool, Instant)>>>,
145    }
146
147    impl MockCoreThreadDispatcher {
148        async fn get_new_block_calls(&self) -> Vec<(Round, bool, Instant)> {
149            let mut binding = self.new_block_calls.lock();
150            let all_calls = binding.drain(0..);
151            all_calls.into_iter().collect()
152        }
153    }
154
155    #[async_trait]
156    impl CoreThreadDispatcher for MockCoreThreadDispatcher {
157        async fn add_blocks(
158            &self,
159            _blocks: Vec<VerifiedBlock>,
160        ) -> Result<BTreeSet<BlockRef>, CoreError> {
161            todo!()
162        }
163
164        async fn check_block_refs(
165            &self,
166            _block_refs: Vec<BlockRef>,
167        ) -> Result<BTreeSet<BlockRef>, CoreError> {
168            todo!()
169        }
170
171        async fn add_certified_commits(
172            &self,
173            _commits: CertifiedCommits,
174        ) -> Result<BTreeSet<BlockRef>, CoreError> {
175            todo!()
176        }
177
178        async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
179            self.new_block_calls
180                .lock()
181                .push((round, force, Instant::now()));
182            Ok(())
183        }
184
185        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
186            todo!()
187        }
188
189        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
190            todo!()
191        }
192
193        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
194            todo!()
195        }
196    }
197
198    #[tokio::test(flavor = "current_thread", start_paused = true)]
199    async fn basic_leader_timeout() {
200        let (context, _signers) = Context::new_for_test(4);
201        let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
202        let leader_timeout = Duration::from_millis(500);
203        let min_round_delay = Duration::from_millis(50);
204        let parameters = Parameters {
205            leader_timeout,
206            min_round_delay,
207            ..Default::default()
208        };
209        let context = Arc::new(context.with_parameters(parameters));
210        let start = Instant::now();
211
212        let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
213
214        // spawn the task
215        let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
216
217        // send a signal that a new round has been produced.
218        signals.new_round(10);
219
220        // wait enough until the min round delay has passed and a new_block call is triggered
221        sleep(2 * min_round_delay).await;
222        let all_calls = dispatcher.get_new_block_calls().await;
223        assert_eq!(all_calls.len(), 1);
224
225        let (round, force, timestamp) = all_calls[0];
226        assert_eq!(round, 10);
227        assert!(!force);
228        assert!(
229            min_round_delay <= timestamp - start,
230            "Leader timeout min setting {:?} should be less than actual time difference {:?}",
231            min_round_delay,
232            timestamp - start
233        );
234
235        // wait enough until a new_block has been received
236        sleep(2 * leader_timeout).await;
237        let all_calls = dispatcher.get_new_block_calls().await;
238        assert_eq!(all_calls.len(), 1);
239
240        let (round, force, timestamp) = all_calls[0];
241        assert_eq!(round, 10);
242        assert!(force);
243        assert!(
244            leader_timeout <= timestamp - start,
245            "Leader timeout setting {:?} should be less than actual time difference {:?}",
246            leader_timeout,
247            timestamp - start
248        );
249
250        // now wait another 2 * leader_timeout, no other call should be received
251        sleep(2 * leader_timeout).await;
252        let all_calls = dispatcher.get_new_block_calls().await;
253
254        assert_eq!(all_calls.len(), 0);
255    }
256
257    #[tokio::test(flavor = "current_thread", start_paused = true)]
258    async fn multiple_leader_timeouts() {
259        let (context, _signers) = Context::new_for_test(4);
260        let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
261        let leader_timeout = Duration::from_millis(500);
262        let min_round_delay = Duration::from_millis(50);
263        let parameters = Parameters {
264            leader_timeout,
265            min_round_delay,
266            ..Default::default()
267        };
268        let context = Arc::new(context.with_parameters(parameters));
269        let now = Instant::now();
270
271        let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
272
273        // spawn the task
274        let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
275
276        // now send some signals with some small delay between them, but not enough so every round
277        // manages to timeout and call the force new block method.
278        signals.new_round(13);
279        sleep(min_round_delay / 2).await;
280        signals.new_round(14);
281        sleep(min_round_delay / 2).await;
282        signals.new_round(15);
283        sleep(2 * leader_timeout).await;
284
285        // only the last one should be received
286        let all_calls = dispatcher.get_new_block_calls().await;
287        let (round, force, timestamp) = all_calls[0];
288        assert_eq!(round, 15);
289        assert!(!force);
290        assert!(min_round_delay < timestamp - now);
291
292        let (round, force, timestamp) = all_calls[1];
293        assert_eq!(round, 15);
294        assert!(force);
295        assert!(leader_timeout < timestamp - now);
296    }
297}