consensus_core/
core_thread.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeSet, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use consensus_types::block::{BlockRef, Round};
8use mysten_metrics::{
9    monitored_mpsc::{Receiver, Sender, WeakSender, channel},
10    monitored_scope, spawn_logged_monitored_task,
11};
12use parking_lot::RwLock;
13use thiserror::Error;
14use tokio::sync::{oneshot, watch};
15use tracing::warn;
16
17use crate::{
18    block::VerifiedBlock,
19    commit::CertifiedCommits,
20    context::Context,
21    core::Core,
22    core_thread::CoreError::Shutdown,
23    dag_state::DagState,
24    error::{ConsensusError, ConsensusResult},
25};
26
27const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
28
29enum CoreThreadCommand {
30    /// Add blocks to be processed and accepted
31    AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
32    /// Checks if block refs exist locally and sync missing ones.
33    CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
34    /// Adds certified commits and their certification blocks for processing and acceptance.
35    /// Returns missing ancestors of certification voting blocks. Blocks included in certified commits
36    /// cannot have missing ancestors.
37    AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
38    /// Called when the min round has passed or the leader timeout occurred and a block should be produced.
39    /// When the command is called with `force = true`, then the block will be created for `round` skipping
40    /// any checks (ex leader existence of previous round). More information can be found on the `Core` component.
41    NewBlock(Round, oneshot::Sender<()>, bool),
42    /// Request missing blocks that need to be synced.
43    GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
44}
45
46#[derive(Error, Debug)]
47pub enum CoreError {
48    #[error("Core thread shutdown: {0}")]
49    Shutdown(String),
50}
51
52/// The interface to dispatch commands to CoreThread and Core.
53/// Also this allows the easier mocking during unit tests.
54#[async_trait]
55pub trait CoreThreadDispatcher: Sync + Send + 'static {
56    async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
57    -> Result<BTreeSet<BlockRef>, CoreError>;
58
59    async fn check_block_refs(
60        &self,
61        block_refs: Vec<BlockRef>,
62    ) -> Result<BTreeSet<BlockRef>, CoreError>;
63
64    async fn add_certified_commits(
65        &self,
66        commits: CertifiedCommits,
67    ) -> Result<BTreeSet<BlockRef>, CoreError>;
68
69    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
70
71    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73    /// Sets the estimated delay to propagate a block to a quorum of peers, in
74    /// number of rounds.
75    fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
76
77    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
78}
79
80pub(crate) struct CoreThreadHandle {
81    sender: Sender<CoreThreadCommand>,
82    join_handle: tokio::task::JoinHandle<()>,
83}
84
85impl CoreThreadHandle {
86    pub async fn stop(self) {
87        // drop the sender, that will force all the other weak senders to not able to upgrade.
88        drop(self.sender);
89        self.join_handle.await.ok();
90    }
91}
92
93struct CoreThread {
94    core: Core,
95    receiver: Receiver<CoreThreadCommand>,
96    rx_propagation_delay: watch::Receiver<Round>,
97    rx_last_known_proposed_round: watch::Receiver<Round>,
98    context: Arc<Context>,
99}
100
101impl CoreThread {
102    pub async fn run(mut self) -> ConsensusResult<()> {
103        tracing::debug!("Started core thread");
104
105        loop {
106            tokio::select! {
107                command = self.receiver.recv() => {
108                    let Some(command) = command else {
109                        break;
110                    };
111                    self.context.metrics.node_metrics.core_lock_dequeued.inc();
112                    match command {
113                        CoreThreadCommand::AddBlocks(blocks, sender) => {
114                            let _scope = monitored_scope("CoreThread::loop::add_blocks");
115                            let missing_block_refs = self.core.add_blocks(blocks)?;
116                            sender.send(missing_block_refs).ok();
117                        }
118                        CoreThreadCommand::CheckBlockRefs(block_refs, sender) => {
119                            let _scope = monitored_scope("CoreThread::loop::check_block_refs");
120                            let missing_block_refs = self.core.check_block_refs(block_refs)?;
121                            sender.send(missing_block_refs).ok();
122                        }
123                        CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
124                            let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
125                            let missing_block_refs = self.core.add_certified_commits(commits)?;
126                            sender.send(missing_block_refs).ok();
127                        }
128                        CoreThreadCommand::NewBlock(round, sender, force) => {
129                            let _scope = monitored_scope("CoreThread::loop::new_block");
130                            self.core.new_block(round, force)?;
131                            sender.send(()).ok();
132                        }
133                        CoreThreadCommand::GetMissing(sender) => {
134                            let _scope = monitored_scope("CoreThread::loop::get_missing");
135                            sender.send(self.core.get_missing_blocks()).ok();
136                        }
137                    }
138                }
139                _ = self.rx_last_known_proposed_round.changed() => {
140                    let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
141                    let round = *self.rx_last_known_proposed_round.borrow();
142                    self.core.set_last_known_proposed_round(round);
143                    self.core.new_block(round + 1, true)?;
144                }
145                _ = self.rx_propagation_delay.changed() => {
146                    let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
147                    let should_propose_before = self.core.should_propose();
148                    let propagation_delay = *self.rx_propagation_delay.borrow();
149                    self.core.set_propagation_delay(
150                        propagation_delay
151                    );
152                    if !should_propose_before && self.core.should_propose() {
153                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
154                        // because block proposal could have been skipped.
155                        self.core.new_block(Round::MAX, true)?;
156                    }
157                }
158            }
159        }
160
161        Ok(())
162    }
163}
164
165#[derive(Clone)]
166pub(crate) struct ChannelCoreThreadDispatcher {
167    context: Arc<Context>,
168    sender: WeakSender<CoreThreadCommand>,
169    tx_propagation_delay: Arc<watch::Sender<Round>>,
170    tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
171}
172
173impl ChannelCoreThreadDispatcher {
174    pub(crate) fn start(
175        context: Arc<Context>,
176        _dag_state: &RwLock<DagState>,
177        core: Core,
178    ) -> (Self, CoreThreadHandle) {
179        let (sender, receiver) =
180            channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
181        let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
182        let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
183        rx_propagation_delay.mark_unchanged();
184        rx_last_known_proposed_round.mark_unchanged();
185        let core_thread = CoreThread {
186            core,
187            receiver,
188            rx_propagation_delay,
189            rx_last_known_proposed_round,
190            context: context.clone(),
191        };
192
193        let join_handle = spawn_logged_monitored_task!(
194            async move {
195                if let Err(err) = core_thread.run().await
196                    && !matches!(err, ConsensusError::Shutdown)
197                {
198                    panic!("Fatal error occurred: {err}");
199                }
200            },
201            "ConsensusCoreThread"
202        );
203
204        // Explicitly using downgraded sender in order to allow sharing the CoreThreadDispatcher but
205        // able to shutdown the CoreThread by dropping the original sender.
206        let dispatcher = ChannelCoreThreadDispatcher {
207            context,
208            sender: sender.downgrade(),
209            tx_propagation_delay: Arc::new(tx_propagation_delay),
210            tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
211        };
212        let handle = CoreThreadHandle {
213            join_handle,
214            sender,
215        };
216        (dispatcher, handle)
217    }
218
219    async fn send(&self, command: CoreThreadCommand) {
220        self.context.metrics.node_metrics.core_lock_enqueued.inc();
221        if let Some(sender) = self.sender.upgrade()
222            && let Err(err) = sender.send(command).await
223        {
224            warn!(
225                "Couldn't send command to core thread, probably is shutting down: {}",
226                err
227            );
228        }
229    }
230}
231
232#[async_trait]
233impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
234    async fn add_blocks(
235        &self,
236        blocks: Vec<VerifiedBlock>,
237    ) -> Result<BTreeSet<BlockRef>, CoreError> {
238        let (sender, receiver) = oneshot::channel();
239        self.send(CoreThreadCommand::AddBlocks(blocks, sender))
240            .await;
241        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
242
243        Ok(missing_block_refs)
244    }
245
246    async fn check_block_refs(
247        &self,
248        block_refs: Vec<BlockRef>,
249    ) -> Result<BTreeSet<BlockRef>, CoreError> {
250        let (sender, receiver) = oneshot::channel();
251        self.send(CoreThreadCommand::CheckBlockRefs(block_refs, sender))
252            .await;
253        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
254
255        Ok(missing_block_refs)
256    }
257
258    async fn add_certified_commits(
259        &self,
260        commits: CertifiedCommits,
261    ) -> Result<BTreeSet<BlockRef>, CoreError> {
262        let (sender, receiver) = oneshot::channel();
263        self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
264            .await;
265        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
266        Ok(missing_block_refs)
267    }
268
269    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
270        let (sender, receiver) = oneshot::channel();
271        self.send(CoreThreadCommand::NewBlock(round, sender, force))
272            .await;
273        receiver.await.map_err(|e| Shutdown(e.to_string()))
274    }
275
276    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
277        let (sender, receiver) = oneshot::channel();
278        self.send(CoreThreadCommand::GetMissing(sender)).await;
279        receiver.await.map_err(|e| Shutdown(e.to_string()))
280    }
281
282    fn set_propagation_delay(&self, propagation_delay: Round) -> Result<(), CoreError> {
283        self.tx_propagation_delay
284            .send(propagation_delay)
285            .map_err(|e| Shutdown(e.to_string()))
286    }
287
288    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
289        self.tx_last_known_proposed_round
290            .send(round)
291            .map_err(|e| Shutdown(e.to_string()))
292    }
293}
294
295// TODO: complete the Mock for thread dispatcher to be used from several tests
296#[cfg(test)]
297#[derive(Default)]
298pub(crate) struct MockCoreThreadDispatcher {
299    add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
300    missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
301    last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
302}
303
304#[cfg(test)]
305impl MockCoreThreadDispatcher {
306    #[cfg(test)]
307    pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
308        let mut add_blocks = self.add_blocks.lock();
309        add_blocks.drain(0..).collect()
310    }
311
312    #[cfg(test)]
313    pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
314        let mut missing_blocks = self.missing_blocks.lock();
315        missing_blocks.extend(block_refs);
316    }
317
318    #[cfg(test)]
319    pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
320        let last_known_proposed_round = self.last_known_proposed_round.lock();
321        last_known_proposed_round.clone()
322    }
323}
324
325#[cfg(test)]
326#[async_trait]
327impl CoreThreadDispatcher for MockCoreThreadDispatcher {
328    async fn add_blocks(
329        &self,
330        blocks: Vec<VerifiedBlock>,
331    ) -> Result<BTreeSet<BlockRef>, CoreError> {
332        let mut add_blocks = self.add_blocks.lock();
333        add_blocks.extend(blocks);
334        Ok(BTreeSet::new())
335    }
336
337    async fn check_block_refs(
338        &self,
339        _block_refs: Vec<BlockRef>,
340    ) -> Result<BTreeSet<BlockRef>, CoreError> {
341        Ok(BTreeSet::new())
342    }
343
344    async fn add_certified_commits(
345        &self,
346        _commits: CertifiedCommits,
347    ) -> Result<BTreeSet<BlockRef>, CoreError> {
348        todo!()
349    }
350
351    async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
352        Ok(())
353    }
354
355    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
356        let mut missing_blocks = self.missing_blocks.lock();
357        let result = missing_blocks.clone();
358        missing_blocks.clear();
359        Ok(result)
360    }
361
362    fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
363        todo!();
364    }
365
366    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
367        let mut last_known_proposed_round = self.last_known_proposed_round.lock();
368        last_known_proposed_round.push(round);
369        Ok(())
370    }
371}
372
373#[cfg(test)]
374mod test {
375    use mysten_metrics::monitored_mpsc;
376    use parking_lot::RwLock;
377
378    use super::*;
379    use crate::{
380        CommitConsumerArgs,
381        block_manager::BlockManager,
382        block_verifier::NoopBlockVerifier,
383        commit_observer::CommitObserver,
384        context::Context,
385        core::CoreSignals,
386        dag_state::DagState,
387        leader_schedule::LeaderSchedule,
388        round_tracker::RoundTracker,
389        storage::mem_store::MemStore,
390        transaction::{TransactionClient, TransactionConsumer},
391        transaction_certifier::TransactionCertifier,
392    };
393
394    #[tokio::test]
395    async fn test_core_thread() {
396        telemetry_subscribers::init_for_testing();
397        let (context, mut key_pairs) = Context::new_for_test(4);
398        let context = Arc::new(context);
399        let store = Arc::new(MemStore::new());
400        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
401        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
402        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
403        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
404        let (blocks_sender, _blocks_receiver) =
405            monitored_mpsc::unbounded_channel("consensus_block_output");
406        let transaction_certifier = TransactionCertifier::new(
407            context.clone(),
408            Arc::new(NoopBlockVerifier {}),
409            dag_state.clone(),
410            blocks_sender,
411        );
412        let (signals, signal_receivers) = CoreSignals::new(context.clone());
413        let _block_receiver = signal_receivers.block_broadcast_receiver();
414        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
415        let leader_schedule = Arc::new(LeaderSchedule::from_store(
416            context.clone(),
417            dag_state.clone(),
418        ));
419        let commit_observer = CommitObserver::new(
420            context.clone(),
421            commit_consumer,
422            dag_state.clone(),
423            transaction_certifier.clone(),
424            leader_schedule.clone(),
425        )
426        .await;
427        let leader_schedule = Arc::new(LeaderSchedule::from_store(
428            context.clone(),
429            dag_state.clone(),
430        ));
431        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
432        let core = Core::new(
433            context.clone(),
434            leader_schedule,
435            transaction_consumer,
436            transaction_certifier,
437            block_manager,
438            commit_observer,
439            signals,
440            key_pairs.remove(context.own_index.value()).1,
441            dag_state.clone(),
442            false,
443            round_tracker,
444        );
445
446        let (core_dispatcher, handle) =
447            ChannelCoreThreadDispatcher::start(context, &dag_state, core);
448
449        // Now create some clones of the dispatcher
450        let dispatcher_1 = core_dispatcher.clone();
451        let dispatcher_2 = core_dispatcher.clone();
452
453        // Try to send some commands
454        assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
455        assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
456
457        // Now shutdown the dispatcher
458        handle.stop().await;
459
460        // Try to send some commands
461        assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
462        assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
463    }
464}