consensus_core/
core_thread.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeSet, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7use consensus_types::block::{BlockRef, Round};
8use mysten_metrics::{
9    monitored_mpsc::{Receiver, Sender, WeakSender, channel},
10    monitored_scope, spawn_logged_monitored_task,
11};
12use parking_lot::RwLock;
13use thiserror::Error;
14use tokio::sync::{oneshot, watch};
15use tracing::warn;
16
17use crate::{
18    block::VerifiedBlock,
19    commit::CertifiedCommits,
20    context::Context,
21    core::Core,
22    core_thread::CoreError::Shutdown,
23    dag_state::DagState,
24    error::{ConsensusError, ConsensusResult},
25};
26
27const CORE_THREAD_COMMANDS_CHANNEL_SIZE: usize = 2000;
28
29enum CoreThreadCommand {
30    /// Add blocks to be processed and accepted
31    AddBlocks(Vec<VerifiedBlock>, oneshot::Sender<BTreeSet<BlockRef>>),
32    /// Checks if block refs exist locally and sync missing ones.
33    CheckBlockRefs(Vec<BlockRef>, oneshot::Sender<BTreeSet<BlockRef>>),
34    /// Adds certified commits and their certification blocks for processing and acceptance.
35    /// Returns missing ancestors of certification voting blocks. Blocks included in certified commits
36    /// cannot have missing ancestors.
37    AddCertifiedCommits(CertifiedCommits, oneshot::Sender<BTreeSet<BlockRef>>),
38    /// Called when the min round has passed or the leader timeout occurred and a block should be produced.
39    /// When the command is called with `force = true`, then the block will be created for `round` skipping
40    /// any checks (ex leader existence of previous round). More information can be found on the `Core` component.
41    NewBlock(Round, oneshot::Sender<()>, bool),
42    /// Request missing blocks that need to be synced.
43    GetMissing(oneshot::Sender<BTreeSet<BlockRef>>),
44}
45
46#[derive(Error, Debug)]
47pub enum CoreError {
48    #[error("Core thread shutdown: {0}")]
49    Shutdown(String),
50}
51
52/// The interface to dispatch commands to CoreThread and Core.
53/// Also this allows the easier mocking during unit tests.
54#[async_trait]
55pub trait CoreThreadDispatcher: Sync + Send + 'static {
56    async fn add_blocks(&self, blocks: Vec<VerifiedBlock>)
57    -> Result<BTreeSet<BlockRef>, CoreError>;
58
59    async fn check_block_refs(
60        &self,
61        block_refs: Vec<BlockRef>,
62    ) -> Result<BTreeSet<BlockRef>, CoreError>;
63
64    async fn add_certified_commits(
65        &self,
66        commits: CertifiedCommits,
67    ) -> Result<BTreeSet<BlockRef>, CoreError>;
68
69    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError>;
70
71    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError>;
72
73    /// Sets the estimated delay to propagate a block to a quorum of peers, in
74    /// number of rounds.
75    fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
76
77    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
78}
79
80pub(crate) struct CoreThreadHandle {
81    sender: Sender<CoreThreadCommand>,
82    join_handle: tokio::task::JoinHandle<()>,
83}
84
85impl CoreThreadHandle {
86    pub async fn stop(self) {
87        // drop the sender, that will force all the other weak senders to not able to upgrade.
88        drop(self.sender);
89        self.join_handle.await.ok();
90    }
91}
92
93struct CoreThread {
94    core: Core,
95    receiver: Receiver<CoreThreadCommand>,
96    rx_propagation_delay: watch::Receiver<Round>,
97    rx_last_known_proposed_round: watch::Receiver<Round>,
98    context: Arc<Context>,
99}
100
101impl CoreThread {
102    pub async fn run(mut self) -> ConsensusResult<()> {
103        tracing::debug!("Started core thread");
104
105        loop {
106            tokio::select! {
107                command = self.receiver.recv() => {
108                    let Some(command) = command else {
109                        break;
110                    };
111                    self.context.metrics.node_metrics.core_lock_dequeued.inc();
112                    match command {
113                        CoreThreadCommand::AddBlocks(blocks, sender) => {
114                            let _scope = monitored_scope("CoreThread::loop::add_blocks");
115                            let missing_block_refs = self.core.add_blocks(blocks)?;
116                            sender.send(missing_block_refs).ok();
117                        }
118                        CoreThreadCommand::CheckBlockRefs(block_refs, sender) => {
119                            let _scope = monitored_scope("CoreThread::loop::check_block_refs");
120                            let missing_block_refs = self.core.check_block_refs(block_refs)?;
121                            sender.send(missing_block_refs).ok();
122                        }
123                        CoreThreadCommand::AddCertifiedCommits(commits, sender) => {
124                            let _scope = monitored_scope("CoreThread::loop::add_certified_commits");
125                            let missing_block_refs = self.core.add_certified_commits(commits)?;
126                            sender.send(missing_block_refs).ok();
127                        }
128                        CoreThreadCommand::NewBlock(round, sender, force) => {
129                            let _scope = monitored_scope("CoreThread::loop::new_block");
130                            self.core.new_block(round, force)?;
131                            sender.send(()).ok();
132                        }
133                        CoreThreadCommand::GetMissing(sender) => {
134                            let _scope = monitored_scope("CoreThread::loop::get_missing");
135                            sender.send(self.core.get_missing_blocks()).ok();
136                        }
137                    }
138                }
139                _ = self.rx_last_known_proposed_round.changed() => {
140                    let _scope = monitored_scope("CoreThread::loop::set_last_known_proposed_round");
141                    let round = *self.rx_last_known_proposed_round.borrow();
142                    self.core.set_last_known_proposed_round(round);
143                    // `round` arg is meant to avoid proposing below already proposed round.
144                    // Passing Round::MAX to select the threshold clock round for proposing.
145                    self.core.new_block(Round::MAX, true)?;
146                }
147                _ = self.rx_propagation_delay.changed() => {
148                    let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
149                    let should_propose_before = self.core.should_propose();
150                    let propagation_delay = *self.rx_propagation_delay.borrow();
151                    self.core.set_propagation_delay(
152                        propagation_delay
153                    );
154                    if !should_propose_before && self.core.should_propose() {
155                        // If core cannot propose before but can propose now, try to produce a new block to ensure liveness,
156                        // because block proposal could have been skipped.
157                        self.core.new_block(Round::MAX, true)?;
158                    }
159                }
160            }
161        }
162
163        Ok(())
164    }
165}
166
167#[derive(Clone)]
168pub(crate) struct ChannelCoreThreadDispatcher {
169    context: Arc<Context>,
170    sender: WeakSender<CoreThreadCommand>,
171    tx_propagation_delay: Arc<watch::Sender<Round>>,
172    tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
173}
174
175impl ChannelCoreThreadDispatcher {
176    pub(crate) fn start(
177        context: Arc<Context>,
178        _dag_state: &RwLock<DagState>,
179        core: Core,
180    ) -> (Self, CoreThreadHandle) {
181        let (sender, receiver) =
182            channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
183        let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
184        let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
185        rx_propagation_delay.mark_unchanged();
186        rx_last_known_proposed_round.mark_unchanged();
187        let core_thread = CoreThread {
188            core,
189            receiver,
190            rx_propagation_delay,
191            rx_last_known_proposed_round,
192            context: context.clone(),
193        };
194
195        let join_handle = spawn_logged_monitored_task!(
196            async move {
197                if let Err(err) = core_thread.run().await
198                    && !matches!(err, ConsensusError::Shutdown)
199                {
200                    panic!("Fatal error occurred: {err}");
201                }
202            },
203            "ConsensusCoreThread"
204        );
205
206        // Explicitly using downgraded sender in order to allow sharing the CoreThreadDispatcher but
207        // able to shutdown the CoreThread by dropping the original sender.
208        let dispatcher = ChannelCoreThreadDispatcher {
209            context,
210            sender: sender.downgrade(),
211            tx_propagation_delay: Arc::new(tx_propagation_delay),
212            tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
213        };
214        let handle = CoreThreadHandle {
215            join_handle,
216            sender,
217        };
218        (dispatcher, handle)
219    }
220
221    async fn send(&self, command: CoreThreadCommand) {
222        self.context.metrics.node_metrics.core_lock_enqueued.inc();
223        if let Some(sender) = self.sender.upgrade()
224            && let Err(err) = sender.send(command).await
225        {
226            warn!(
227                "Couldn't send command to core thread, probably is shutting down: {}",
228                err
229            );
230        }
231    }
232}
233
234#[async_trait]
235impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
236    async fn add_blocks(
237        &self,
238        blocks: Vec<VerifiedBlock>,
239    ) -> Result<BTreeSet<BlockRef>, CoreError> {
240        let (sender, receiver) = oneshot::channel();
241        self.send(CoreThreadCommand::AddBlocks(blocks, sender))
242            .await;
243        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
244
245        Ok(missing_block_refs)
246    }
247
248    async fn check_block_refs(
249        &self,
250        block_refs: Vec<BlockRef>,
251    ) -> Result<BTreeSet<BlockRef>, CoreError> {
252        let (sender, receiver) = oneshot::channel();
253        self.send(CoreThreadCommand::CheckBlockRefs(block_refs, sender))
254            .await;
255        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
256
257        Ok(missing_block_refs)
258    }
259
260    async fn add_certified_commits(
261        &self,
262        commits: CertifiedCommits,
263    ) -> Result<BTreeSet<BlockRef>, CoreError> {
264        let (sender, receiver) = oneshot::channel();
265        self.send(CoreThreadCommand::AddCertifiedCommits(commits, sender))
266            .await;
267        let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;
268        Ok(missing_block_refs)
269    }
270
271    async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
272        let (sender, receiver) = oneshot::channel();
273        self.send(CoreThreadCommand::NewBlock(round, sender, force))
274            .await;
275        receiver.await.map_err(|e| Shutdown(e.to_string()))
276    }
277
278    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
279        let (sender, receiver) = oneshot::channel();
280        self.send(CoreThreadCommand::GetMissing(sender)).await;
281        receiver.await.map_err(|e| Shutdown(e.to_string()))
282    }
283
284    fn set_propagation_delay(&self, propagation_delay: Round) -> Result<(), CoreError> {
285        self.tx_propagation_delay
286            .send(propagation_delay)
287            .map_err(|e| Shutdown(e.to_string()))
288    }
289
290    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
291        self.tx_last_known_proposed_round
292            .send(round)
293            .map_err(|e| Shutdown(e.to_string()))
294    }
295}
296
297// TODO: complete the Mock for thread dispatcher to be used from several tests
298#[cfg(test)]
299#[derive(Default)]
300pub(crate) struct MockCoreThreadDispatcher {
301    add_blocks: parking_lot::Mutex<Vec<VerifiedBlock>>,
302    missing_blocks: parking_lot::Mutex<BTreeSet<BlockRef>>,
303    last_known_proposed_round: parking_lot::Mutex<Vec<Round>>,
304}
305
306#[cfg(test)]
307impl MockCoreThreadDispatcher {
308    #[cfg(test)]
309    pub(crate) async fn get_add_blocks(&self) -> Vec<VerifiedBlock> {
310        let mut add_blocks = self.add_blocks.lock();
311        add_blocks.drain(0..).collect()
312    }
313
314    #[cfg(test)]
315    pub(crate) async fn stub_missing_blocks(&self, block_refs: BTreeSet<BlockRef>) {
316        let mut missing_blocks = self.missing_blocks.lock();
317        missing_blocks.extend(block_refs);
318    }
319
320    #[cfg(test)]
321    pub(crate) async fn get_last_own_proposed_round(&self) -> Vec<Round> {
322        let last_known_proposed_round = self.last_known_proposed_round.lock();
323        last_known_proposed_round.clone()
324    }
325}
326
327#[cfg(test)]
328#[async_trait]
329impl CoreThreadDispatcher for MockCoreThreadDispatcher {
330    async fn add_blocks(
331        &self,
332        blocks: Vec<VerifiedBlock>,
333    ) -> Result<BTreeSet<BlockRef>, CoreError> {
334        let mut add_blocks = self.add_blocks.lock();
335        add_blocks.extend(blocks);
336        Ok(BTreeSet::new())
337    }
338
339    async fn check_block_refs(
340        &self,
341        _block_refs: Vec<BlockRef>,
342    ) -> Result<BTreeSet<BlockRef>, CoreError> {
343        Ok(BTreeSet::new())
344    }
345
346    async fn add_certified_commits(
347        &self,
348        _commits: CertifiedCommits,
349    ) -> Result<BTreeSet<BlockRef>, CoreError> {
350        todo!()
351    }
352
353    async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
354        Ok(())
355    }
356
357    async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
358        let mut missing_blocks = self.missing_blocks.lock();
359        let result = missing_blocks.clone();
360        missing_blocks.clear();
361        Ok(result)
362    }
363
364    fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
365        todo!();
366    }
367
368    fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError> {
369        let mut last_known_proposed_round = self.last_known_proposed_round.lock();
370        last_known_proposed_round.push(round);
371        Ok(())
372    }
373}
374
375#[cfg(test)]
376mod test {
377    use std::time::Duration;
378
379    use parking_lot::RwLock;
380    use tokio::time::timeout;
381
382    use super::*;
383    use crate::{
384        CommitConsumerArgs,
385        block::{BlockAPI, TestBlock, genesis_blocks},
386        block_manager::BlockManager,
387        block_verifier::NoopBlockVerifier,
388        commit_observer::CommitObserver,
389        context::Context,
390        core::CoreSignals,
391        dag_state::DagState,
392        leader_schedule::LeaderSchedule,
393        round_tracker::RoundTracker,
394        storage::{Store, WriteBatch, mem_store::MemStore},
395        transaction::{TransactionClient, TransactionConsumer},
396        transaction_vote_tracker::TransactionVoteTracker,
397    };
398
399    #[tokio::test]
400    async fn test_core_thread() {
401        telemetry_subscribers::init_for_testing();
402        let (context, mut key_pairs) = Context::new_for_test(4);
403        let context = Arc::new(context);
404        let store = Arc::new(MemStore::new());
405        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
406        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
407        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
408        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
409        let transaction_vote_tracker = TransactionVoteTracker::new(
410            context.clone(),
411            Arc::new(NoopBlockVerifier {}),
412            dag_state.clone(),
413        );
414        let (signals, signal_receivers) = CoreSignals::new(context.clone());
415        let _block_receiver = signal_receivers.block_broadcast_receiver();
416        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
417        let commit_observer = CommitObserver::new(
418            context.clone(),
419            commit_consumer,
420            dag_state.clone(),
421            transaction_vote_tracker.clone(),
422        )
423        .await;
424        let leader_schedule = Arc::new(LeaderSchedule::from_store(
425            context.clone(),
426            dag_state.clone(),
427        ));
428        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
429        let core = Core::new_validator(
430            context.clone(),
431            leader_schedule,
432            transaction_consumer,
433            transaction_vote_tracker,
434            block_manager,
435            commit_observer,
436            signals,
437            key_pairs.remove(context.own_index.value()).1,
438            dag_state.clone(),
439            false,
440            round_tracker,
441        );
442
443        let (core_dispatcher, handle) =
444            ChannelCoreThreadDispatcher::start(context, &dag_state, core);
445
446        // Now create some clones of the dispatcher
447        let dispatcher_1 = core_dispatcher.clone();
448        let dispatcher_2 = core_dispatcher.clone();
449
450        // Try to send some commands
451        assert!(dispatcher_1.add_blocks(vec![]).await.is_ok());
452        assert!(dispatcher_2.add_blocks(vec![]).await.is_ok());
453
454        // Now shutdown the dispatcher
455        handle.stop().await;
456
457        // Try to send some commands
458        assert!(dispatcher_1.add_blocks(vec![]).await.is_err());
459        assert!(dispatcher_2.add_blocks(vec![]).await.is_err());
460    }
461
462    #[tokio::test]
463    async fn test_last_known_sync_wakes_threshold_clock_round() {
464        telemetry_subscribers::init_for_testing();
465        let (context, mut key_pairs) = Context::new_for_test(4);
466        let context = Arc::new(context);
467        let store = Arc::new(MemStore::new());
468
469        let mut last_round_blocks = genesis_blocks(&context);
470        let mut all_blocks = last_round_blocks.clone();
471        for round in 1..=2 {
472            let mut this_round_blocks = Vec::new();
473            for (index, _authority) in context.committee.authorities() {
474                let block = VerifiedBlock::new_for_test(
475                    TestBlock::new(round, index.value() as u32)
476                        .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect())
477                        .build(),
478                );
479                this_round_blocks.push(block);
480            }
481            all_blocks.extend(this_round_blocks.clone());
482            last_round_blocks = this_round_blocks;
483        }
484        store
485            .write(WriteBatch::default().blocks(all_blocks))
486            .expect("Storage error");
487
488        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
489        assert_eq!(
490            dag_state.read().get_last_proposed_block().unwrap().round(),
491            2
492        );
493        assert_eq!(dag_state.read().threshold_clock_round(), 3);
494
495        let block_manager = BlockManager::new(context.clone(), dag_state.clone());
496        let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
497        let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
498        let transaction_vote_tracker = TransactionVoteTracker::new(
499            context.clone(),
500            Arc::new(NoopBlockVerifier {}),
501            dag_state.clone(),
502        );
503        transaction_vote_tracker.recover_blocks_after_round(dag_state.read().gc_round());
504        let (signals, signal_receivers) = CoreSignals::new(context.clone());
505        let mut block_receiver = signal_receivers.block_broadcast_receiver();
506        let (commit_consumer, _commit_receiver) = CommitConsumerArgs::new(0, 0);
507        let commit_observer = CommitObserver::new(
508            context.clone(),
509            commit_consumer,
510            dag_state.clone(),
511            transaction_vote_tracker.clone(),
512        )
513        .await;
514        let leader_schedule = Arc::new(LeaderSchedule::from_store(
515            context.clone(),
516            dag_state.clone(),
517        ));
518        let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
519        let core = Core::new_validator(
520            context.clone(),
521            leader_schedule,
522            transaction_consumer,
523            transaction_vote_tracker,
524            block_manager,
525            commit_observer,
526            signals,
527            key_pairs.remove(context.own_index.value()).1,
528            dag_state.clone(),
529            true,
530            round_tracker,
531        );
532
533        let (core_dispatcher, handle) =
534            ChannelCoreThreadDispatcher::start(context, &dag_state, core);
535
536        let recovered_block = timeout(Duration::from_secs(5), block_receiver.recv())
537            .await
538            .expect("timed out waiting for recovered block")
539            .expect("block broadcast closed");
540        assert_eq!(recovered_block.block.round(), 2);
541
542        assert!(
543            timeout(Duration::from_millis(100), block_receiver.recv())
544                .await
545                .is_err(),
546            "round 3 must not be proposed before last-known sync completes"
547        );
548
549        core_dispatcher
550            .set_last_known_proposed_round(1)
551            .expect("core thread should be running");
552
553        let proposed_block = timeout(Duration::from_secs(5), async {
554            loop {
555                let block = block_receiver.recv().await.expect("block broadcast closed");
556                if block.block.round() == 3 {
557                    return block;
558                }
559            }
560        })
561        .await
562        .expect("timed out waiting for threshold-clock proposal");
563        assert_eq!(
564            proposed_block.block.author(),
565            core_dispatcher.context.own_index
566        );
567
568        handle.stop().await;
569    }
570}