sui_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15    ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::VerifiedTransaction;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23    tx_sender: mpsc::Sender<ConsensusTransaction>,
24    _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28    // ConsensusClient does absolutely nothing when receiving a transaction
29    Noop,
30    // ConsensusClient directly sequences the transaction into the store.
31    DirectSequencing,
32}
33
34impl MockConsensusClient {
35    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38        Self {
39            tx_sender,
40            _consensus_handle,
41        }
42    }
43
44    pub fn run(
45        validator: Weak<AuthorityState>,
46        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47        consensus_mode: ConsensusMode,
48    ) -> JoinHandle<()> {
49        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50    }
51
52    async fn run_impl(
53        validator: Weak<AuthorityState>,
54        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55        consensus_mode: ConsensusMode,
56    ) {
57        while let Some(tx) = tx_receiver.recv().await {
58            let Some(validator) = validator.upgrade() else {
59                debug!("validator shut down; exiting MockConsensusClient");
60                return;
61            };
62            let epoch_store = validator.epoch_store_for_testing();
63            let env = match consensus_mode {
64                ConsensusMode::Noop => ExecutionEnv::new(),
65                ConsensusMode::DirectSequencing => {
66                    // Extract the executable transaction from the consensus transaction
67                    let executable_tx = match &tx.kind {
68                        // DEPRECATED: CertifiedTransaction is no longer used since MFP is live.
69                        ConsensusTransactionKind::CertifiedTransaction(_) => None,
70                        ConsensusTransactionKind::UserTransaction(tx) => {
71                            Some(VerifiedExecutableTransaction::new_from_consensus(
72                                VerifiedTransaction::new_unchecked(*tx.clone()),
73                                0,
74                            ))
75                        }
76                        ConsensusTransactionKind::UserTransactionV2(tx) => {
77                            Some(VerifiedExecutableTransaction::new_from_consensus(
78                                VerifiedTransaction::new_unchecked(tx.tx().clone()),
79                                0,
80                            ))
81                        }
82                        _ => None,
83                    };
84
85                    if let Some(exec_tx) = executable_tx {
86                        // Use the simpler assign_shared_object_versions_for_tests API
87                        let assigned_versions = epoch_store
88                            .assign_shared_object_versions_for_tests(
89                                validator.get_object_cache_reader().as_ref(),
90                                &vec![exec_tx.clone()],
91                            )
92                            .unwrap();
93
94                        let assigned_version = assigned_versions
95                            .into_map()
96                            .into_iter()
97                            .next()
98                            .map(|(_, v)| v)
99                            .unwrap_or_default();
100                        ExecutionEnv::new().with_assigned_versions(assigned_version)
101                    } else {
102                        ExecutionEnv::new()
103                    }
104                }
105            };
106            match &tx.kind {
107                // DEPRECATED: CertifiedTransaction is no longer used since MFP is live.
108                ConsensusTransactionKind::CertifiedTransaction(_) => {
109                    debug!("Ignoring deprecated CertifiedTransaction in MockConsensusClient");
110                }
111                ConsensusTransactionKind::UserTransaction(tx) => {
112                    if tx.is_consensus_tx() {
113                        validator.execution_scheduler().enqueue(
114                            vec![(
115                                VerifiedExecutableTransaction::new_from_consensus(
116                                    VerifiedTransaction::new_unchecked(*tx.clone()),
117                                    0,
118                                )
119                                .into(),
120                                env,
121                            )],
122                            &epoch_store,
123                        );
124                    }
125                }
126                ConsensusTransactionKind::UserTransactionV2(tx) => {
127                    if tx.tx().is_consensus_tx() {
128                        validator.execution_scheduler().enqueue(
129                            vec![(
130                                VerifiedExecutableTransaction::new_from_consensus(
131                                    VerifiedTransaction::new_unchecked(tx.tx().clone()),
132                                    0,
133                                )
134                                .into(),
135                                env,
136                            )],
137                            &epoch_store,
138                        );
139                    }
140                }
141                _ => {}
142            }
143        }
144    }
145
146    fn submit_impl(
147        &self,
148        transactions: &[ConsensusTransaction],
149    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
150        // TODO: maybe support multi-transactions and remove this check
151        assert!(transactions.len() == 1);
152        let transaction = &transactions[0];
153        self.tx_sender
154            .try_send(transaction.clone())
155            .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
156        // TODO(fastpath): Add some way to simulate consensus positions across blocks
157        Ok((
158            vec![ConsensusPosition {
159                epoch: EpochId::MIN,
160                block: BlockRef::MIN,
161                index: 0,
162            }],
163            with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
164        ))
165    }
166}
167
168impl SubmitToConsensus for MockConsensusClient {
169    fn submit_to_consensus(
170        &self,
171        transactions: &[ConsensusTransaction],
172        _epoch_store: &Arc<AuthorityPerEpochStore>,
173    ) -> SuiResult {
174        self.submit_impl(transactions).map(|_response| ())
175    }
176
177    fn submit_best_effort(
178        &self,
179        transaction: &ConsensusTransaction,
180        _epoch_store: &Arc<AuthorityPerEpochStore>,
181        _timeout: Duration,
182    ) -> SuiResult {
183        self.submit_impl(std::slice::from_ref(transaction))
184            .map(|_response| ())
185    }
186}
187
188#[async_trait::async_trait]
189impl ConsensusClient for MockConsensusClient {
190    async fn submit(
191        &self,
192        transactions: &[ConsensusTransaction],
193        _epoch_store: &Arc<AuthorityPerEpochStore>,
194    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
195        self.submit_impl(transactions)
196    }
197}
198
199pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
200    let (tx, rx) = oneshot::channel();
201    tx.send(status).ok();
202    rx
203}