1use std::{sync::Arc, time::Duration};
4
5use consensus_types::block::Round;
6use tokio::{
7 sync::{
8 oneshot::{Receiver, Sender},
9 watch,
10 },
11 task::JoinHandle,
12 time::{Instant, sleep_until},
13};
14use tracing::{debug, warn};
15
16use crate::{context::Context, core::CoreSignalsReceivers, core_thread::CoreThreadDispatcher};
17
18pub(crate) struct LeaderTimeoutTaskHandle {
19 handle: JoinHandle<()>,
20 stop: Sender<()>,
21}
22
23impl LeaderTimeoutTaskHandle {
24 pub async fn stop(self) {
25 self.stop.send(()).ok();
26 self.handle.await.ok();
27 }
28}
29
30pub(crate) struct LeaderTimeoutTask<D: CoreThreadDispatcher> {
31 dispatcher: Arc<D>,
32 new_round_receiver: watch::Receiver<Round>,
33 leader_timeout: Duration,
34 min_round_delay: Duration,
35 stop: Receiver<()>,
36}
37
38impl<D: CoreThreadDispatcher> LeaderTimeoutTask<D> {
39 pub fn start(
40 dispatcher: Arc<D>,
41 signals_receivers: &CoreSignalsReceivers,
42 context: Arc<Context>,
43 ) -> LeaderTimeoutTaskHandle {
44 let (stop_sender, stop) = tokio::sync::oneshot::channel();
45 let mut me = Self {
46 dispatcher,
47 stop,
48 new_round_receiver: signals_receivers.new_round_receiver(),
49 leader_timeout: context.parameters.leader_timeout,
50 min_round_delay: context.parameters.min_round_delay,
51 };
52 let handle = tokio::spawn(async move { me.run().await });
53
54 LeaderTimeoutTaskHandle {
55 handle,
56 stop: stop_sender,
57 }
58 }
59
60 async fn run(&mut self) {
61 let new_round = &mut self.new_round_receiver;
62 let mut leader_round: Round = *new_round.borrow_and_update();
63 let mut min_leader_round_timed_out = false;
64 let mut max_leader_round_timed_out = false;
65 let timer_start = Instant::now();
66 let min_leader_timeout = sleep_until(timer_start + self.min_round_delay);
67 let max_leader_timeout = sleep_until(timer_start + self.leader_timeout);
68
69 tokio::pin!(min_leader_timeout);
70 tokio::pin!(max_leader_timeout);
71
72 loop {
73 tokio::select! {
74 () = &mut min_leader_timeout, if !min_leader_round_timed_out => {
78 if let Err(err) = self.dispatcher.new_block(leader_round, false).await {
79 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
80 return;
81 }
82 min_leader_round_timed_out = true;
83 },
84 () = &mut max_leader_timeout, if !max_leader_round_timed_out => {
91 if let Err(err) = self.dispatcher.new_block(leader_round, true).await {
92 warn!("Error received while calling dispatcher, probably dispatcher is shutting down, will now exit: {err:?}");
93 return;
94 }
95 max_leader_round_timed_out = true;
96 }
97
98 Ok(_) = new_round.changed() => {
100 leader_round = *new_round.borrow_and_update();
101 debug!("New round has been received {leader_round}, resetting timer");
102
103 min_leader_round_timed_out = false;
104 max_leader_round_timed_out = false;
105
106 let now = Instant::now();
107 min_leader_timeout
108 .as_mut()
109 .reset(now + self.min_round_delay);
110 max_leader_timeout
111 .as_mut()
112 .reset(now + self.leader_timeout);
113 },
114 _ = &mut self.stop => {
115 debug!("Stop signal has been received, now shutting down");
116 return;
117 }
118 }
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::{collections::BTreeSet, sync::Arc, time::Duration};
126
127 use async_trait::async_trait;
128 use consensus_config::Parameters;
129 use consensus_types::block::{BlockRef, Round};
130 use parking_lot::Mutex;
131 use tokio::time::{Instant, sleep};
132
133 use crate::{
134 block::VerifiedBlock,
135 commit::CertifiedCommits,
136 context::Context,
137 core::CoreSignals,
138 core_thread::{CoreError, CoreThreadDispatcher},
139 leader_timeout::LeaderTimeoutTask,
140 };
141
142 #[derive(Clone, Default)]
143 struct MockCoreThreadDispatcher {
144 new_block_calls: Arc<Mutex<Vec<(Round, bool, Instant)>>>,
145 }
146
147 impl MockCoreThreadDispatcher {
148 async fn get_new_block_calls(&self) -> Vec<(Round, bool, Instant)> {
149 let mut binding = self.new_block_calls.lock();
150 let all_calls = binding.drain(0..);
151 all_calls.into_iter().collect()
152 }
153 }
154
155 #[async_trait]
156 impl CoreThreadDispatcher for MockCoreThreadDispatcher {
157 async fn add_blocks(
158 &self,
159 _blocks: Vec<VerifiedBlock>,
160 ) -> Result<BTreeSet<BlockRef>, CoreError> {
161 todo!()
162 }
163
164 async fn check_block_refs(
165 &self,
166 _block_refs: Vec<BlockRef>,
167 ) -> Result<BTreeSet<BlockRef>, CoreError> {
168 todo!()
169 }
170
171 async fn add_certified_commits(
172 &self,
173 _commits: CertifiedCommits,
174 ) -> Result<BTreeSet<BlockRef>, CoreError> {
175 todo!()
176 }
177
178 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
179 self.new_block_calls
180 .lock()
181 .push((round, force, Instant::now()));
182 Ok(())
183 }
184
185 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
186 todo!()
187 }
188
189 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
190 todo!()
191 }
192
193 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
194 todo!()
195 }
196 }
197
198 #[tokio::test(flavor = "current_thread", start_paused = true)]
199 async fn basic_leader_timeout() {
200 let (context, _signers) = Context::new_for_test(4);
201 let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
202 let leader_timeout = Duration::from_millis(500);
203 let min_round_delay = Duration::from_millis(50);
204 let parameters = Parameters {
205 leader_timeout,
206 min_round_delay,
207 ..Default::default()
208 };
209 let context = Arc::new(context.with_parameters(parameters));
210 let start = Instant::now();
211
212 let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
213
214 let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
216
217 signals.new_round(10);
219
220 sleep(2 * min_round_delay).await;
222 let all_calls = dispatcher.get_new_block_calls().await;
223 assert_eq!(all_calls.len(), 1);
224
225 let (round, force, timestamp) = all_calls[0];
226 assert_eq!(round, 10);
227 assert!(!force);
228 assert!(
229 min_round_delay <= timestamp - start,
230 "Leader timeout min setting {:?} should be less than actual time difference {:?}",
231 min_round_delay,
232 timestamp - start
233 );
234
235 sleep(2 * leader_timeout).await;
237 let all_calls = dispatcher.get_new_block_calls().await;
238 assert_eq!(all_calls.len(), 1);
239
240 let (round, force, timestamp) = all_calls[0];
241 assert_eq!(round, 10);
242 assert!(force);
243 assert!(
244 leader_timeout <= timestamp - start,
245 "Leader timeout setting {:?} should be less than actual time difference {:?}",
246 leader_timeout,
247 timestamp - start
248 );
249
250 sleep(2 * leader_timeout).await;
252 let all_calls = dispatcher.get_new_block_calls().await;
253
254 assert_eq!(all_calls.len(), 0);
255 }
256
257 #[tokio::test(flavor = "current_thread", start_paused = true)]
258 async fn multiple_leader_timeouts() {
259 let (context, _signers) = Context::new_for_test(4);
260 let dispatcher = Arc::new(MockCoreThreadDispatcher::default());
261 let leader_timeout = Duration::from_millis(500);
262 let min_round_delay = Duration::from_millis(50);
263 let parameters = Parameters {
264 leader_timeout,
265 min_round_delay,
266 ..Default::default()
267 };
268 let context = Arc::new(context.with_parameters(parameters));
269 let now = Instant::now();
270
271 let (mut signals, signal_receivers) = CoreSignals::new(context.clone());
272
273 let _handle = LeaderTimeoutTask::start(dispatcher.clone(), &signal_receivers, context);
275
276 signals.new_round(13);
279 sleep(min_round_delay / 2).await;
280 signals.new_round(14);
281 sleep(min_round_delay / 2).await;
282 signals.new_round(15);
283 sleep(2 * leader_timeout).await;
284
285 let all_calls = dispatcher.get_new_block_calls().await;
287 let (round, force, timestamp) = all_calls[0];
288 assert_eq!(round, 15);
289 assert!(!force);
290 assert!(min_round_delay < timestamp - now);
291
292 let (round, force, timestamp) = all_calls[1];
293 assert_eq!(round, 15);
294 assert!(force);
295 assert!(leader_timeout < timestamp - now);
296 }
297}