sui_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15    ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::VerifiedTransaction;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23    tx_sender: mpsc::Sender<ConsensusTransaction>,
24    _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28    // ConsensusClient does absolutely nothing when receiving a transaction
29    Noop,
30    // ConsensusClient directly sequences the transaction into the store.
31    DirectSequencing,
32}
33
34impl MockConsensusClient {
35    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38        Self {
39            tx_sender,
40            _consensus_handle,
41        }
42    }
43
44    pub fn run(
45        validator: Weak<AuthorityState>,
46        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47        consensus_mode: ConsensusMode,
48    ) -> JoinHandle<()> {
49        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50    }
51
52    async fn run_impl(
53        validator: Weak<AuthorityState>,
54        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55        consensus_mode: ConsensusMode,
56    ) {
57        while let Some(tx) = tx_receiver.recv().await {
58            let Some(validator) = validator.upgrade() else {
59                debug!("validator shut down; exiting MockConsensusClient");
60                return;
61            };
62            let epoch_store = validator.epoch_store_for_testing();
63            let env = match consensus_mode {
64                ConsensusMode::Noop => ExecutionEnv::new(),
65                ConsensusMode::DirectSequencing => {
66                    // Extract the executable transaction from the consensus transaction.
67                    // DEPRECATED: CertifiedTransaction and UserTransaction are no longer
68                    // produced (MFP uses UserTransactionV2).
69                    let executable_tx = match &tx.kind {
70                        ConsensusTransactionKind::UserTransactionV2(tx) => {
71                            Some(VerifiedExecutableTransaction::new_from_consensus(
72                                VerifiedTransaction::new_unchecked(tx.tx().clone()),
73                                0,
74                            ))
75                        }
76                        _ => None,
77                    };
78
79                    if let Some(exec_tx) = executable_tx {
80                        // Use the simpler assign_shared_object_versions_for_tests API
81                        let assigned_versions = epoch_store
82                            .assign_shared_object_versions_for_tests(
83                                validator.get_object_cache_reader().as_ref(),
84                                std::slice::from_ref(&exec_tx),
85                            )
86                            .unwrap();
87
88                        let assigned_version = assigned_versions
89                            .into_map()
90                            .into_iter()
91                            .next()
92                            .map(|(_, v)| v)
93                            .unwrap_or_default();
94                        ExecutionEnv::new().with_assigned_versions(assigned_version)
95                    } else {
96                        ExecutionEnv::new()
97                    }
98                }
99            };
100            match &tx.kind {
101                // DEPRECATED: CertifiedTransaction and UserTransaction are no longer produced.
102                ConsensusTransactionKind::CertifiedTransaction(_)
103                | ConsensusTransactionKind::UserTransaction(_) => {
104                    debug!(
105                        "Ignoring deprecated {:?} in MockConsensusClient",
106                        std::mem::discriminant(&tx.kind)
107                    );
108                }
109                ConsensusTransactionKind::UserTransactionV2(tx) => {
110                    if tx.tx().is_consensus_tx() {
111                        validator.execution_scheduler().enqueue(
112                            vec![(
113                                VerifiedExecutableTransaction::new_from_consensus(
114                                    VerifiedTransaction::new_unchecked(tx.tx().clone()),
115                                    0,
116                                )
117                                .into(),
118                                env,
119                            )],
120                            &epoch_store,
121                        );
122                    }
123                }
124                _ => {}
125            }
126        }
127    }
128
129    fn submit_impl(
130        &self,
131        transactions: &[ConsensusTransaction],
132    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
133        // TODO: maybe support multi-transactions and remove this check
134        assert!(transactions.len() == 1);
135        let transaction = &transactions[0];
136        self.tx_sender
137            .try_send(transaction.clone())
138            .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
139        // TODO(fastpath): Add some way to simulate consensus positions across blocks
140        Ok((
141            vec![ConsensusPosition {
142                epoch: EpochId::MIN,
143                block: BlockRef::MIN,
144                index: 0,
145            }],
146            with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
147        ))
148    }
149}
150
151impl SubmitToConsensus for MockConsensusClient {
152    fn submit_to_consensus(
153        &self,
154        transactions: &[ConsensusTransaction],
155        _epoch_store: &Arc<AuthorityPerEpochStore>,
156    ) -> SuiResult {
157        self.submit_impl(transactions).map(|_response| ())
158    }
159
160    fn submit_best_effort(
161        &self,
162        transaction: &ConsensusTransaction,
163        _epoch_store: &Arc<AuthorityPerEpochStore>,
164        _timeout: Duration,
165    ) -> SuiResult {
166        self.submit_impl(std::slice::from_ref(transaction))
167            .map(|_response| ())
168    }
169}
170
171#[async_trait::async_trait]
172impl ConsensusClient for MockConsensusClient {
173    async fn submit(
174        &self,
175        transactions: &[ConsensusTransaction],
176        _epoch_store: &Arc<AuthorityPerEpochStore>,
177    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
178        self.submit_impl(transactions)
179    }
180}
181
182pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
183    let (tx, rx) = oneshot::channel();
184    tx.send(status).ok();
185    rx
186}