1use std::{collections::BTreeSet, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use consensus_types::block::{BlockRef, Round};
8use mysten_metrics::{
9 monitored_mpsc::{Receiver, Sender, WeakSender, channel},
10 monitored_scope, spawn_logged_monitored_task,
11};
12use parking_lot::RwLock;
13use thiserror::Error;
14use tokio::sync::{oneshot, watch};
15use tracing::warn;
16
17use crate::{
18 block::VerifiedBlock,
19 commit::CertifiedCommits,
20 context::Context,
21 core::Core,
22 core_thread::CoreError::Shutdown,
23 dag_state::DagState,
24 error::{ConsensusError, ConsensusResult},
25};
26
27const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
28
29enum CoreThreadCommand {
30 AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
32 CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
34 AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
38 NewBlock(Round, oneshot::Sender<()>, bool),
42 GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
44}
45
46#[derive(Error, Debug)]
47pub enum CoreError {
48 #[error("Core thread shutdown: {0}")]
49 Shutdown(String),
50}
51
52#[async_trait]
55pub trait CoreThreadDispatcher: Sync + Send + 'static {
56 async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
57 -> Result<BTreeSet<BlockRef>, CoreError>;
58
59 async fn check_block_refs(
60 &self,
61 block_refs: Vec<BlockRef>,
62 ) -> Result<BTreeSet<BlockRef>, CoreError>;
63
64 async fn add_certified_commits(
65 &self,
66 commits: CertifiedCommits,
67 ) -> Result<BTreeSet<BlockRef>, CoreError>;
68
69 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
70
71 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73 fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
76
77 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
78}
79
80pub(crate) struct CoreThreadHandle {
81 sender: Sender<CoreThreadCommand>,
82 join_handle: tokio::task::JoinHandle<()>,
83}
84
85impl CoreThreadHandle {
86 pub async fn stop(self) {
87 drop(self.sender);
89 self.join_handle.await.ok();
90 }
91}
92
93struct CoreThread {
94 core: Core,
95 receiver: Receiver<CoreThreadCommand>,
96 rx_propagation_delay: watch::Receiver<Round>,
97 rx_last_known_proposed_round: watch::Receiver<Round>,
98 context: Arc<Context>,
99}
100
101impl CoreThread {
102 pub async fn run(mut self) -> ConsensusResult<()> {
103 tracing::debug!("Started core thread");
104
105 loop {
106 tokio::select! {
107 command = self.receiver.recv() => {
108 let Some(command) = command else {
109 break;
110 };
111 self.context.metrics.node_metrics.core_lock_dequeued.inc();
112 match command {
113 CoreThreadCommand::AddBlocks(blocks, sender) => {
114 let _scope = monitored_scope("CoreThread::loop::add_blocks");
115 let missing_block_refs = self.core.add_blocks(blocks)?;
116 sender.send(missing_block_refs).ok();
117 }
118 CoreThreadCommand::CheckBlockRefs(block_refs, sender) => {
119 let _scope = monitored_scope("CoreThread::loop::check_block_refs");
120 let missing_block_refs = self.core.check_block_refs(block_refs)?;
121 sender.send(missing_block_refs).ok();
122 }
123 CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
124 let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
125 let missing_block_refs = self.core.add_certified_commits(commits)?;
126 sender.send(missing_block_refs).ok();
127 }
128 CoreThreadCommand::NewBlock(round, sender, force) => {
129 let _scope = monitored_scope("CoreThread::loop::new_block");
130 self.core.new_block(round, force)?;
131 sender.send(()).ok();
132 }
133 CoreThreadCommand::GetMissing(sender) => {
134 let _scope = monitored_scope("CoreThread::loop::get_missing");
135 sender.send(self.core.get_missing_blocks()).ok();
136 }
137 }
138 }
139 _ = self.rx_last_known_proposed_round.changed() => {
140 let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
141 let round = *self.rx_last_known_proposed_round.borrow();
142 self.core.set_last_known_proposed_round(round);
143 self.core.new_block(Round::MAX, true)?;
146 }
147 _ = self.rx_propagation_delay.changed() => {
148 let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
149 let should_propose_before = self.core.should_propose();
150 let propagation_delay = *self.rx_propagation_delay.borrow();
151 self.core.set_propagation_delay(
152 propagation_delay
153 );
154 if !should_propose_before && self.core.should_propose() {
155 self.core.new_block(Round::MAX, true)?;
158 }
159 }
160 }
161 }
162
163 Ok(())
164 }
165}
166
167#[derive(Clone)]
168pub(crate) struct ChannelCoreThreadDispatcher {
169 context: Arc<Context>,
170 sender: WeakSender<CoreThreadCommand>,
171 tx_propagation_delay: Arc<watch::Sender<Round>>,
172 tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
173}
174
175impl ChannelCoreThreadDispatcher {
176 pub(crate) fn start(
177 context: Arc<Context>,
178 _dag_state: &RwLock<DagState>,
179 core: Core,
180 ) -> (Self, CoreThreadHandle) {
181 let (sender, receiver) =
182 channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
183 let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
184 let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
185 rx_propagation_delay.mark_unchanged();
186 rx_last_known_proposed_round.mark_unchanged();
187 let core_thread = CoreThread {
188 core,
189 receiver,
190 rx_propagation_delay,
191 rx_last_known_proposed_round,
192 context: context.clone(),
193 };
194
195 let join_handle = spawn_logged_monitored_task!(
196 async move {
197 if let Err(err) = core_thread.run().await
198 && !matches!(err, ConsensusError::Shutdown)
199 {
200 panic!("Fatal error occurred: {err}");
201 }
202 },
203 "ConsensusCoreThread"
204 );
205
206 let dispatcher = ChannelCoreThreadDispatcher {
209 context,
210 sender: sender.downgrade(),
211 tx_propagation_delay: Arc::new(tx_propagation_delay),
212 tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
213 };
214 let handle = CoreThreadHandle {
215 join_handle,
216 sender,
217 };
218 (dispatcher, handle)
219 }
220
221 async fn send(&self, command: CoreThreadCommand) {
222 self.context.metrics.node_metrics.core_lock_enqueued.inc();
223 if let Some(sender) = self.sender.upgrade()
224 && let Err(err) = sender.send(command).await
225 {
226 warn!(
227 "Couldn't send command to core thread, probably is shutting down: {}",
228 err
229 );
230 }
231 }
232}
233
234#[async_trait]
235impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
236 async fn add_blocks(
237 &self,
238 blocks: Vec<VerifiedBlock>,
239 ) -> Result<BTreeSet<BlockRef>, CoreError> {
240 let (sender, receiver) = oneshot::channel();
241 self.send(CoreThreadCommand::AddBlocks(blocks, sender))
242 .await;
243 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
244
245 Ok(missing_block_refs)
246 }
247
248 async fn check_block_refs(
249 &self,
250 block_refs: Vec<BlockRef>,
251 ) -> Result<BTreeSet<BlockRef>, CoreError> {
252 let (sender, receiver) = oneshot::channel();
253 self.send(CoreThreadCommand::CheckBlockRefs(block_refs, sender))
254 .await;
255 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
256
257 Ok(missing_block_refs)
258 }
259
260 async fn add_certified_commits(
261 &self,
262 commits: CertifiedCommits,
263 ) -> Result<BTreeSet<BlockRef>, CoreError> {
264 let (sender, receiver) = oneshot::channel();
265 self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
266 .await;
267 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
268 Ok(missing_block_refs)
269 }
270
271 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
272 let (sender, receiver) = oneshot::channel();
273 self.send(CoreThreadCommand::NewBlock(round, sender, force))
274 .await;
275 receiver.await.map_err(|e| Shutdown(e.to_string()))
276 }
277
278 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
279 let (sender, receiver) = oneshot::channel();
280 self.send(CoreThreadCommand::GetMissing(sender)).await;
281 receiver.await.map_err(|e| Shutdown(e.to_string()))
282 }
283
284 fn set_propagation_delay(&self, propagation_delay: Round) -> Result<(), CoreError> {
285 self.tx_propagation_delay
286 .send(propagation_delay)
287 .map_err(|e| Shutdown(e.to_string()))
288 }
289
290 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
291 self.tx_last_known_proposed_round
292 .send(round)
293 .map_err(|e| Shutdown(e.to_string()))
294 }
295}
296
297#[cfg(test)]
299#[derive(Default)]
300pub(crate) struct MockCoreThreadDispatcher {
301 add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
302 missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
303 last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
304}
305
306#[cfg(test)]
307impl MockCoreThreadDispatcher {
308 #[cfg(test)]
309 pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
310 let mut add_blocks = self.add_blocks.lock();
311 add_blocks.drain(0..).collect()
312 }
313
314 #[cfg(test)]
315 pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
316 let mut missing_blocks = self.missing_blocks.lock();
317 missing_blocks.extend(block_refs);
318 }
319
320 #[cfg(test)]
321 pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
322 let last_known_proposed_round = self.last_known_proposed_round.lock();
323 last_known_proposed_round.clone()
324 }
325}
326
327#[cfg(test)]
328#[async_trait]
329impl CoreThreadDispatcher for MockCoreThreadDispatcher {
330 async fn add_blocks(
331 &self,
332 blocks: Vec<VerifiedBlock>,
333 ) -> Result<BTreeSet<BlockRef>, CoreError> {
334 let mut add_blocks = self.add_blocks.lock();
335 add_blocks.extend(blocks);
336 Ok(BTreeSet::new())
337 }
338
339 async fn check_block_refs(
340 &self,
341 _block_refs: Vec<BlockRef>,
342 ) -> Result<BTreeSet<BlockRef>, CoreError> {
343 Ok(BTreeSet::new())
344 }
345
346 async fn add_certified_commits(
347 &self,
348 _commits: CertifiedCommits,
349 ) -> Result<BTreeSet<BlockRef>, CoreError> {
350 todo!()
351 }
352
353 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
354 Ok(())
355 }
356
357 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
358 let mut missing_blocks = self.missing_blocks.lock();
359 let result = missing_blocks.clone();
360 missing_blocks.clear();
361 Ok(result)
362 }
363
364 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
365 todo!();
366 }
367
368 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
369 let mut last_known_proposed_round = self.last_known_proposed_round.lock();
370 last_known_proposed_round.push(round);
371 Ok(())
372 }
373}
374
375#[cfg(test)]
376mod test {
377 use std::time::Duration;
378
379 use parking_lot::RwLock;
380 use tokio::time::timeout;
381
382 use super::*;
383 use crate::{
384 CommitConsumerArgs,
385 block::{BlockAPI, TestBlock, genesis_blocks},
386 block_manager::BlockManager,
387 block_verifier::NoopBlockVerifier,
388 commit_observer::CommitObserver,
389 context::Context,
390 core::CoreSignals,
391 dag_state::DagState,
392 leader_schedule::LeaderSchedule,
393 round_tracker::RoundTracker,
394 storage::{Store, WriteBatch, mem_store::MemStore},
395 transaction::{TransactionClient, TransactionConsumer},
396 transaction_vote_tracker::TransactionVoteTracker,
397 };
398
399 #[tokio::test]
400 async fn test_core_thread() {
401 telemetry_subscribers::init_for_testing();
402 let (context, mut key_pairs) = Context::new_for_test(4);
403 let context = Arc::new(context);
404 let store = Arc::new(MemStore::new());
405 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
406 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
407 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
408 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
409 let transaction_vote_tracker = TransactionVoteTracker::new(
410 context.clone(),
411 Arc::new(NoopBlockVerifier {}),
412 dag_state.clone(),
413 );
414 let (signals, signal_receivers) = CoreSignals::new(context.clone());
415 let _block_receiver = signal_receivers.block_broadcast_receiver();
416 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
417 let commit_observer = CommitObserver::new(
418 context.clone(),
419 commit_consumer,
420 dag_state.clone(),
421 transaction_vote_tracker.clone(),
422 )
423 .await;
424 let leader_schedule = Arc::new(LeaderSchedule::from_store(
425 context.clone(),
426 dag_state.clone(),
427 ));
428 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
429 let core = Core::new_validator(
430 context.clone(),
431 leader_schedule,
432 transaction_consumer,
433 transaction_vote_tracker,
434 block_manager,
435 commit_observer,
436 signals,
437 key_pairs.remove(context.own_index.value()).1,
438 dag_state.clone(),
439 false,
440 round_tracker,
441 );
442
443 let (core_dispatcher, handle) =
444 ChannelCoreThreadDispatcher::start(context, &dag_state, core);
445
446 let dispatcher_1 = core_dispatcher.clone();
448 let dispatcher_2 = core_dispatcher.clone();
449
450 assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
452 assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
453
454 handle.stop().await;
456
457 assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
459 assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
460 }
461
462 #[tokio::test]
463 async fn test_last_known_sync_wakes_threshold_clock_round() {
464 telemetry_subscribers::init_for_testing();
465 let (context, mut key_pairs) = Context::new_for_test(4);
466 let context = Arc::new(context);
467 let store = Arc::new(MemStore::new());
468
469 let mut last_round_blocks = genesis_blocks(&context);
470 let mut all_blocks = last_round_blocks.clone();
471 for round in 1..=2 {
472 let mut this_round_blocks = Vec::new();
473 for (index, _authority) in context.committee.authorities() {
474 let block = VerifiedBlock::new_for_test(
475 TestBlock::new(round, index.value() as u32)
476 .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
477 .build(),
478 );
479 this_round_blocks.push(block);
480 }
481 all_blocks.extend(this_round_blocks.clone());
482 last_round_blocks = this_round_blocks;
483 }
484 store
485 .write(WriteBatch::default().blocks(all_blocks))
486 .expect("Storage error");
487
488 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
489 assert_eq!(
490 dag_state.read().get_last_proposed_block().unwrap().round(),
491 2
492 );
493 assert_eq!(dag_state.read().threshold_clock_round(), 3);
494
495 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
496 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
497 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
498 let transaction_vote_tracker = TransactionVoteTracker::new(
499 context.clone(),
500 Arc::new(NoopBlockVerifier {}),
501 dag_state.clone(),
502 );
503 transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
504 let (signals, signal_receivers) = CoreSignals::new(context.clone());
505 let mut block_receiver = signal_receivers.block_broadcast_receiver();
506 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
507 let commit_observer = CommitObserver::new(
508 context.clone(),
509 commit_consumer,
510 dag_state.clone(),
511 transaction_vote_tracker.clone(),
512 )
513 .await;
514 let leader_schedule = Arc::new(LeaderSchedule::from_store(
515 context.clone(),
516 dag_state.clone(),
517 ));
518 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
519 let core = Core::new_validator(
520 context.clone(),
521 leader_schedule,
522 transaction_consumer,
523 transaction_vote_tracker,
524 block_manager,
525 commit_observer,
526 signals,
527 key_pairs.remove(context.own_index.value()).1,
528 dag_state.clone(),
529 true,
530 round_tracker,
531 );
532
533 let (core_dispatcher, handle) =
534 ChannelCoreThreadDispatcher::start(context, &dag_state, core);
535
536 let recovered_block = timeout(Duration::from_secs(5), block_receiver.recv())
537 .await
538 .expect("timed out waiting for recovered block")
539 .expect("block broadcast closed");
540 assert_eq!(recovered_block.block.round(), 2);
541
542 assert!(
543 timeout(Duration::from_millis(100), block_receiver.recv())
544 .await
545 .is_err(),
546 "round 3 must not be proposed before last-known sync completes"
547 );
548
549 core_dispatcher
550 .set_last_known_proposed_round(1)
551 .expect("core thread should be running");
552
553 let proposed_block = timeout(Duration::from_secs(5), async {
554 loop {
555 let block = block_receiver.recv().await.expect("block broadcast closed");
556 if block.block.round() == 3 {
557 return block;
558 }
559 }
560 })
561 .await
562 .expect("timed out waiting for threshold-clock proposal");
563 assert_eq!(
564 proposed_block.block.author(),
565 core_dispatcher.context.own_index
566 );
567
568 handle.stop().await;
569 }
570}