1use std::{collections::BTreeSet, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use consensus_types::block::{BlockRef, Round};
8use mysten_metrics::{
9 monitored_mpsc::{Receiver, Sender, WeakSender, channel},
10 monitored_scope, spawn_logged_monitored_task,
11};
12use parking_lot::RwLock;
13use thiserror::Error;
14use tokio::sync::{oneshot, watch};
15use tracing::warn;
16
17use crate::{
18 block::VerifiedBlock,
19 commit::CertifiedCommits,
20 context::Context,
21 core::Core,
22 core_thread::CoreError::Shutdown,
23 dag_state::DagState,
24 error::{ConsensusError, ConsensusResult},
25};
26
27const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
28
29enum CoreThreadCommand {
30 AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
32 CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
34 AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
38 NewBlock(Round, oneshot::Sender<()>, bool),
42 GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
44}
45
46#[derive(Error, Debug)]
47pub enum CoreError {
48 #[error("Core thread shutdown: {0}")]
49 Shutdown(String),
50}
51
52#[async_trait]
55pub trait CoreThreadDispatcher: Sync + Send + 'static {
56 async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
57 -> Result<BTreeSet<BlockRef>, CoreError>;
58
59 async fn check_block_refs(
60 &self,
61 block_refs: Vec<BlockRef>,
62 ) -> Result<BTreeSet<BlockRef>, CoreError>;
63
64 async fn add_certified_commits(
65 &self,
66 commits: CertifiedCommits,
67 ) -> Result<BTreeSet<BlockRef>, CoreError>;
68
69 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
70
71 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73 fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
76
77 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
78}
79
80pub(crate) struct CoreThreadHandle {
81 sender: Sender<CoreThreadCommand>,
82 join_handle: tokio::task::JoinHandle<()>,
83}
84
85impl CoreThreadHandle {
86 pub async fn stop(self) {
87 drop(self.sender);
89 self.join_handle.await.ok();
90 }
91}
92
93struct CoreThread {
94 core: Core,
95 receiver: Receiver<CoreThreadCommand>,
96 rx_propagation_delay: watch::Receiver<Round>,
97 rx_last_known_proposed_round: watch::Receiver<Round>,
98 context: Arc<Context>,
99}
100
101impl CoreThread {
102 pub async fn run(mut self) -> ConsensusResult<()> {
103 tracing::debug!("Started core thread");
104
105 loop {
106 tokio::select! {
107 command = self.receiver.recv() => {
108 let Some(command) = command else {
109 break;
110 };
111 self.context.metrics.node_metrics.core_lock_dequeued.inc();
112 match command {
113 CoreThreadCommand::AddBlocks(blocks, sender) => {
114 let _scope = monitored_scope("CoreThread::loop::add_blocks");
115 let missing_block_refs = self.core.add_blocks(blocks)?;
116 sender.send(missing_block_refs).ok();
117 }
118 CoreThreadCommand::CheckBlockRefs(block_refs, sender) => {
119 let _scope = monitored_scope("CoreThread::loop::check_block_refs");
120 let missing_block_refs = self.core.check_block_refs(block_refs)?;
121 sender.send(missing_block_refs).ok();
122 }
123 CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
124 let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
125 let missing_block_refs = self.core.add_certified_commits(commits)?;
126 sender.send(missing_block_refs).ok();
127 }
128 CoreThreadCommand::NewBlock(round, sender, force) => {
129 let _scope = monitored_scope("CoreThread::loop::new_block");
130 self.core.new_block(round, force)?;
131 sender.send(()).ok();
132 }
133 CoreThreadCommand::GetMissing(sender) => {
134 let _scope = monitored_scope("CoreThread::loop::get_missing");
135 sender.send(self.core.get_missing_blocks()).ok();
136 }
137 }
138 }
139 _ = self.rx_last_known_proposed_round.changed() => {
140 let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
141 let round = *self.rx_last_known_proposed_round.borrow();
142 self.core.set_last_known_proposed_round(round);
143 self.core.new_block(round + 1, true)?;
144 }
145 _ = self.rx_propagation_delay.changed() => {
146 let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
147 let should_propose_before = self.core.should_propose();
148 let propagation_delay = *self.rx_propagation_delay.borrow();
149 self.core.set_propagation_delay(
150 propagation_delay
151 );
152 if !should_propose_before && self.core.should_propose() {
153 self.core.new_block(Round::MAX, true)?;
156 }
157 }
158 }
159 }
160
161 Ok(())
162 }
163}
164
165#[derive(Clone)]
166pub(crate) struct ChannelCoreThreadDispatcher {
167 context: Arc<Context>,
168 sender: WeakSender<CoreThreadCommand>,
169 tx_propagation_delay: Arc<watch::Sender<Round>>,
170 tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
171}
172
173impl ChannelCoreThreadDispatcher {
174 pub(crate) fn start(
175 context: Arc<Context>,
176 _dag_state: &RwLock<DagState>,
177 core: Core,
178 ) -> (Self, CoreThreadHandle) {
179 let (sender, receiver) =
180 channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
181 let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
182 let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
183 rx_propagation_delay.mark_unchanged();
184 rx_last_known_proposed_round.mark_unchanged();
185 let core_thread = CoreThread {
186 core,
187 receiver,
188 rx_propagation_delay,
189 rx_last_known_proposed_round,
190 context: context.clone(),
191 };
192
193 let join_handle = spawn_logged_monitored_task!(
194 async move {
195 if let Err(err) = core_thread.run().await
196 && !matches!(err, ConsensusError::Shutdown)
197 {
198 panic!("Fatal error occurred: {err}");
199 }
200 },
201 "ConsensusCoreThread"
202 );
203
204 let dispatcher = ChannelCoreThreadDispatcher {
207 context,
208 sender: sender.downgrade(),
209 tx_propagation_delay: Arc::new(tx_propagation_delay),
210 tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
211 };
212 let handle = CoreThreadHandle {
213 join_handle,
214 sender,
215 };
216 (dispatcher, handle)
217 }
218
219 async fn send(&self, command: CoreThreadCommand) {
220 self.context.metrics.node_metrics.core_lock_enqueued.inc();
221 if let Some(sender) = self.sender.upgrade()
222 && let Err(err) = sender.send(command).await
223 {
224 warn!(
225 "Couldn't send command to core thread, probably is shutting down: {}",
226 err
227 );
228 }
229 }
230}
231
232#[async_trait]
233impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
234 async fn add_blocks(
235 &self,
236 blocks: Vec<VerifiedBlock>,
237 ) -> Result<BTreeSet<BlockRef>, CoreError> {
238 let (sender, receiver) = oneshot::channel();
239 self.send(CoreThreadCommand::AddBlocks(blocks, sender))
240 .await;
241 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
242
243 Ok(missing_block_refs)
244 }
245
246 async fn check_block_refs(
247 &self,
248 block_refs: Vec<BlockRef>,
249 ) -> Result<BTreeSet<BlockRef>, CoreError> {
250 let (sender, receiver) = oneshot::channel();
251 self.send(CoreThreadCommand::CheckBlockRefs(block_refs, sender))
252 .await;
253 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
254
255 Ok(missing_block_refs)
256 }
257
258 async fn add_certified_commits(
259 &self,
260 commits: CertifiedCommits,
261 ) -> Result<BTreeSet<BlockRef>, CoreError> {
262 let (sender, receiver) = oneshot::channel();
263 self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
264 .await;
265 let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
266 Ok(missing_block_refs)
267 }
268
269 async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
270 let (sender, receiver) = oneshot::channel();
271 self.send(CoreThreadCommand::NewBlock(round, sender, force))
272 .await;
273 receiver.await.map_err(|e| Shutdown(e.to_string()))
274 }
275
276 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
277 let (sender, receiver) = oneshot::channel();
278 self.send(CoreThreadCommand::GetMissing(sender)).await;
279 receiver.await.map_err(|e| Shutdown(e.to_string()))
280 }
281
282 fn set_propagation_delay(&self, propagation_delay: Round) -> Result<(), CoreError> {
283 self.tx_propagation_delay
284 .send(propagation_delay)
285 .map_err(|e| Shutdown(e.to_string()))
286 }
287
288 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
289 self.tx_last_known_proposed_round
290 .send(round)
291 .map_err(|e| Shutdown(e.to_string()))
292 }
293}
294
295#[cfg(test)]
297#[derive(Default)]
298pub(crate) struct MockCoreThreadDispatcher {
299 add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
300 missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
301 last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
302}
303
304#[cfg(test)]
305impl MockCoreThreadDispatcher {
306 #[cfg(test)]
307 pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
308 let mut add_blocks = self.add_blocks.lock();
309 add_blocks.drain(0..).collect()
310 }
311
312 #[cfg(test)]
313 pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
314 let mut missing_blocks = self.missing_blocks.lock();
315 missing_blocks.extend(block_refs);
316 }
317
318 #[cfg(test)]
319 pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
320 let last_known_proposed_round = self.last_known_proposed_round.lock();
321 last_known_proposed_round.clone()
322 }
323}
324
325#[cfg(test)]
326#[async_trait]
327impl CoreThreadDispatcher for MockCoreThreadDispatcher {
328 async fn add_blocks(
329 &self,
330 blocks: Vec<VerifiedBlock>,
331 ) -> Result<BTreeSet<BlockRef>, CoreError> {
332 let mut add_blocks = self.add_blocks.lock();
333 add_blocks.extend(blocks);
334 Ok(BTreeSet::new())
335 }
336
337 async fn check_block_refs(
338 &self,
339 _block_refs: Vec<BlockRef>,
340 ) -> Result<BTreeSet<BlockRef>, CoreError> {
341 Ok(BTreeSet::new())
342 }
343
344 async fn add_certified_commits(
345 &self,
346 _commits: CertifiedCommits,
347 ) -> Result<BTreeSet<BlockRef>, CoreError> {
348 todo!()
349 }
350
351 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
352 Ok(())
353 }
354
355 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
356 let mut missing_blocks = self.missing_blocks.lock();
357 let result = missing_blocks.clone();
358 missing_blocks.clear();
359 Ok(result)
360 }
361
362 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
363 todo!();
364 }
365
366 fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
367 let mut last_known_proposed_round = self.last_known_proposed_round.lock();
368 last_known_proposed_round.push(round);
369 Ok(())
370 }
371}
372
373#[cfg(test)]
374mod test {
375 use mysten_metrics::monitored_mpsc;
376 use parking_lot::RwLock;
377
378 use super::*;
379 use crate::{
380 CommitConsumerArgs,
381 block_manager::BlockManager,
382 block_verifier::NoopBlockVerifier,
383 commit_observer::CommitObserver,
384 context::Context,
385 core::CoreSignals,
386 dag_state::DagState,
387 leader_schedule::LeaderSchedule,
388 round_tracker::RoundTracker,
389 storage::mem_store::MemStore,
390 transaction::{TransactionClient, TransactionConsumer},
391 transaction_certifier::TransactionCertifier,
392 };
393
394 #[tokio::test]
395 async fn test_core_thread() {
396 telemetry_subscribers::init_for_testing();
397 let (context, mut key_pairs) = Context::new_for_test(4);
398 let context = Arc::new(context);
399 let store = Arc::new(MemStore::new());
400 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
401 let block_manager = BlockManager::new(context.clone(), dag_state.clone());
402 let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
403 let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
404 let (blocks_sender, _blocks_receiver) =
405 monitored_mpsc::unbounded_channel("consensus_block_output");
406 let transaction_certifier = TransactionCertifier::new(
407 context.clone(),
408 Arc::new(NoopBlockVerifier {}),
409 dag_state.clone(),
410 blocks_sender,
411 );
412 let (signals, signal_receivers) = CoreSignals::new(context.clone());
413 let _block_receiver = signal_receivers.block_broadcast_receiver();
414 let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
415 let leader_schedule = Arc::new(LeaderSchedule::from_store(
416 context.clone(),
417 dag_state.clone(),
418 ));
419 let commit_observer = CommitObserver::new(
420 context.clone(),
421 commit_consumer,
422 dag_state.clone(),
423 transaction_certifier.clone(),
424 leader_schedule.clone(),
425 )
426 .await;
427 let leader_schedule = Arc::new(LeaderSchedule::from_store(
428 context.clone(),
429 dag_state.clone(),
430 ));
431 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
432 let core = Core::new(
433 context.clone(),
434 leader_schedule,
435 transaction_consumer,
436 transaction_certifier,
437 block_manager,
438 commit_observer,
439 signals,
440 key_pairs.remove(context.own_index.value()).1,
441 dag_state.clone(),
442 false,
443 round_tracker,
444 );
445
446 let (core_dispatcher, handle) =
447 ChannelCoreThreadDispatcher::start(context, &dag_state, core);
448
449 let dispatcher_1 = core_dispatcher.clone();
451 let dispatcher_2 = core_dispatcher.clone();
452
453 assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
455 assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
456
457 handle.stop().await;
459
460 assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
462 assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
463 }
464}