consensus_core/
leader_timeout.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use std::{sync::Arc, time::Duration};
4
5use consensus_types::block::Round;
6use tokio::{
7    sync::{
8        oneshot::{Receiver, Sender},
9        watch,
10    },
11    task::JoinHandle,
12    time::{Instant, sleep_until},
13};
14use tracing::{debug, warn};
15
16use crate::{context::Context, core::CoreSignalsReceivers, core_thread::CoreThreadDispatcher};
17
18pub(crate) struct LeaderTimeoutTaskHandle {
19    handle: JoinHandle<()>,
20    stop: Sender<()>,
21}
22
23impl LeaderTimeoutTaskHandle {
24    pub async fn stop(self) {
25        self.stop.send(()).ok();
26        self.handle.await.ok();
27    }
28}
29
30pub(crate) struct LeaderTimeoutTask<D: CoreThreadDispatcher> {
31    dispatcher: Arc<D>,
32    new_round_receiver: watch::Receiver<Round>,
33    leader_timeout: Duration,
34    min_round_delay: Duration,
35    stop: Receiver<()>,
36}
37
38impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
39    pub fn start(
40        dispatcher: Arc<D>,
41        signals_receivers: &CoreSignalsReceivers,
42        context: Arc<Context>,
43    ) -> LeaderTimeoutTaskHandle {
44        let (stop_sender, stop) = tokio::sync::oneshot::channel();
45        let mut me = Self {
46            dispatcher,
47            stop,
48            new_round_receiver: signals_receivers.new_round_receiver(),
49            leader_timeout: context.parameters.leader_timeout,
50            min_round_delay: context.parameters.min_round_delay,
51        };
52        let handle = tokio::spawn(async move { me.run().await });
53
54        LeaderTimeoutTaskHandle {
55            handle,
56            stop: stop_sender,
57        }
58    }
59
60    async fn run(&mut self) {
61        let new_round = &mut self.new_round_receiver;
62        let mut leader_round: Round = *new_round.borrow_and_update();
63        let mut min_leader_round_timed_out = false;
64        let mut max_leader_round_timed_out = false;
65        let timer_start = Instant::now();
66        let min_leader_timeout = sleep_until(timer_start + self.min_round_delay);
67        let max_leader_timeout = sleep_until(timer_start + self.leader_timeout);
68
69        tokio::pin!(min_leader_timeout);
70        tokio::pin!(max_leader_timeout);
71
72        loop {
73            tokio::select! {
74                // when the min leader timer expires then we attempt to trigger the creation of a new block.
75                // If we already timed out before then the branch gets disabled so we don't attempt
76                // all the time to produce already produced blocks for that round.
77                () = &mut min_leader_timeout, if !min_leader_round_timed_out => {
78                    if let Err(err) = self.dispatcher.new_block(leader_round, false).await {
79                        warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
80                        return;
81                    }
82                    min_leader_round_timed_out = true;
83                },
84                // When the max leader timer expires then we attempt to trigger the creation of a new block. This
85                // call is made with `force = true` to bypass any checks that allow to propose immediately if block
86                // not already produced.
87                // Keep in mind that first the min timeout should get triggered and then the max timeout, only
88                // if the round has not advanced in the meantime. Otherwise, the max timeout will not get
89                // triggered at all.
90                () = &mut max_leader_timeout, if !max_leader_round_timed_out => {
91                    if let Err(err) = self.dispatcher.new_block(leader_round, true).await {
92                        warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
93                        return;
94                    }
95                    max_leader_round_timed_out = true;
96                }
97
98                // a new round has been produced. Reset the leader timeout.
99                Ok(_) = new_round.changed() => {
100                    leader_round = *new_round.borrow_and_update();
101                    debug!("New round has been received {leader_round}, resetting timer");
102
103                    min_leader_round_timed_out = false;
104                    max_leader_round_timed_out = false;
105
106                    let now = Instant::now();
107                    min_leader_timeout
108                    .as_mut()
109                    .reset(now + self.min_round_delay);
110                    max_leader_timeout
111                    .as_mut()
112                    .reset(now + self.leader_timeout);
113                },
114                _ = &mut self.stop => {
115                    debug!("Stop signal has been received, now shutting down");
116                    return;
117                }
118            }
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::{collections::BTreeSet, sync::Arc, time::Duration};
126
127    use async_trait::async_trait;
128    use consensus_config::Parameters;
129    use consensus_types::block::{BlockRef, Round};
130    use parking_lot::Mutex;
131    use tokio::time::{Instant, sleep};
132
133    use crate::{
134        block::VerifiedBlock,
135        commit::CertifiedCommits,
136        context::Context,
137        core::CoreSignals,
138        core_thread::{CoreError, CoreThreadDispatcher},
139        leader_timeout::LeaderTimeoutTask,
140    };
141
142    #[derive(Clone, Default)]
143    struct MockCoreThreadDispatcher {
144        new_block_calls: Arc<Mutex<Vec<(Round, bool, Instant)>>>,
145    }
146
147    impl MockCoreThreadDispatcher {
148        async fn get_new_block_calls(&self) -> Vec<(Round, bool, Instant)> {
149            let mut binding = self.new_block_calls.lock();
150            let all_calls = binding.drain(0..);
151            all_calls.into_iter().collect()
152        }
153    }
154
155    #[async_trait]
156    impl CoreThreadDispatcher for MockCoreThreadDispatcher {
157        async fn add_blocks(
158            &self,
159            _blocks: Vec<VerifiedBlock>,
160        ) -> Result<BTreeSet<BlockRef>, CoreError> {
161            todo!()
162        }
163
164        async fn check_block_refs(
165            &self,
166            _block_refs: Vec<BlockRef>,
167        ) -> Result<BTreeSet<BlockRef>, CoreError> {
168            todo!()
169        }
170
171        async fn add_certified_commits(
172            &self,
173            _commits: CertifiedCommits,
174        ) -> Result<BTreeSet<BlockRef>, CoreError> {
175            todo!()
176        }
177
178        async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
179            self.new_block_calls
180                .lock()
181                .push((round, force, Instant::now()));
182            Ok(())
183        }
184
185        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
186            todo!()
187        }
188
189        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
190            todo!()
191        }
192
193        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
194            todo!()
195        }
196
197        fn highest_received_rounds(&self) -> Vec<Round> {
198            todo!()
199        }
200    }
201
202    #[tokio::test(flavor = "current_thread", start_paused = true)]
203    async fn basic_leader_timeout() {
204        let (context, _signers) = Context::new_for_test(4);
205        let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
206        let leader_timeout = Duration::from_millis(500);
207        let min_round_delay = Duration::from_millis(50);
208        let parameters = Parameters {
209            leader_timeout,
210            min_round_delay,
211            ..Default::default()
212        };
213        let context = Arc::new(context.with_parameters(parameters));
214        let start = Instant::now();
215
216        let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
217
218        // spawn the task
219        let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
220
221        // send a signal that a new round has been produced.
222        signals.new_round(10);
223
224        // wait enough until the min round delay has passed and a new_block call is triggered
225        sleep(2 * min_round_delay).await;
226        let all_calls = dispatcher.get_new_block_calls().await;
227        assert_eq!(all_calls.len(), 1);
228
229        let (round, force, timestamp) = all_calls[0];
230        assert_eq!(round, 10);
231        assert!(!force);
232        assert!(
233            min_round_delay <= timestamp - start,
234            "Leader timeout min setting {:?} should be less than actual time difference {:?}",
235            min_round_delay,
236            timestamp - start
237        );
238
239        // wait enough until a new_block has been received
240        sleep(2 * leader_timeout).await;
241        let all_calls = dispatcher.get_new_block_calls().await;
242        assert_eq!(all_calls.len(), 1);
243
244        let (round, force, timestamp) = all_calls[0];
245        assert_eq!(round, 10);
246        assert!(force);
247        assert!(
248            leader_timeout <= timestamp - start,
249            "Leader timeout setting {:?} should be less than actual time difference {:?}",
250            leader_timeout,
251            timestamp - start
252        );
253
254        // now wait another 2 * leader_timeout, no other call should be received
255        sleep(2 * leader_timeout).await;
256        let all_calls = dispatcher.get_new_block_calls().await;
257
258        assert_eq!(all_calls.len(), 0);
259    }
260
261    #[tokio::test(flavor = "current_thread", start_paused = true)]
262    async fn multiple_leader_timeouts() {
263        let (context, _signers) = Context::new_for_test(4);
264        let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
265        let leader_timeout = Duration::from_millis(500);
266        let min_round_delay = Duration::from_millis(50);
267        let parameters = Parameters {
268            leader_timeout,
269            min_round_delay,
270            ..Default::default()
271        };
272        let context = Arc::new(context.with_parameters(parameters));
273        let now = Instant::now();
274
275        let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
276
277        // spawn the task
278        let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
279
280        // now send some signals with some small delay between them, but not enough so every round
281        // manages to timeout and call the force new block method.
282        signals.new_round(13);
283        sleep(min_round_delay / 2).await;
284        signals.new_round(14);
285        sleep(min_round_delay / 2).await;
286        signals.new_round(15);
287        sleep(2 * leader_timeout).await;
288
289        // only the last one should be received
290        let all_calls = dispatcher.get_new_block_calls().await;
291        let (round, force, timestamp) = all_calls[0];
292        assert_eq!(round, 15);
293        assert!(!force);
294        assert!(min_round_delay < timestamp - now);
295
296        let (round, force, timestamp) = all_calls[1];
297        assert_eq!(round, 15);
298        assert!(force);
299        assert!(leader_timeout < timestamp - now);
300    }
301}