sui_core/
mock_consensus.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15    ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23    tx_sender: mpsc::Sender<ConsensusTransaction>,
24    _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28    // ConsensusClient does absolutely nothing when receiving a transaction
29    Noop,
30    // ConsensusClient directly sequences the transaction into the store.
31    DirectSequencing,
32}
33
34impl MockConsensusClient {
35    pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36        let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37        let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38        Self {
39            tx_sender,
40            _consensus_handle,
41        }
42    }
43
44    pub fn run(
45        validator: Weak<AuthorityState>,
46        tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47        consensus_mode: ConsensusMode,
48    ) -> JoinHandle<()> {
49        tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50    }
51
52    async fn run_impl(
53        validator: Weak<AuthorityState>,
54        mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55        consensus_mode: ConsensusMode,
56    ) {
57        while let Some(tx) = tx_receiver.recv().await {
58            let Some(validator) = validator.upgrade() else {
59                debug!("validator shut down; exiting MockConsensusClient");
60                return;
61            };
62            let epoch_store = validator.epoch_store_for_testing();
63            let env = match consensus_mode {
64                ConsensusMode::Noop => ExecutionEnv::new(),
65                ConsensusMode::DirectSequencing => {
66                    // Extract the executable transaction from the consensus transaction
67                    let executable_tx = match &tx.kind {
68                        ConsensusTransactionKind::CertifiedTransaction(cert) => {
69                            Some(VerifiedExecutableTransaction::new_from_certificate(
70                                VerifiedCertificate::new_unchecked(*cert.clone()),
71                            ))
72                        }
73                        ConsensusTransactionKind::UserTransaction(tx) => {
74                            Some(VerifiedExecutableTransaction::new_from_consensus(
75                                VerifiedTransaction::new_unchecked(*tx.clone()),
76                                0,
77                            ))
78                        }
79                        _ => None,
80                    };
81
82                    if let Some(exec_tx) = executable_tx {
83                        // Use the simpler assign_shared_object_versions_for_tests API
84                        let assigned_versions = epoch_store
85                            .assign_shared_object_versions_for_tests(
86                                validator.get_object_cache_reader().as_ref(),
87                                &vec![exec_tx.clone()],
88                            )
89                            .unwrap();
90
91                        let assigned_version = assigned_versions
92                            .into_map()
93                            .into_iter()
94                            .next()
95                            .map(|(_, v)| v)
96                            .unwrap_or_default();
97                        ExecutionEnv::new().with_assigned_versions(assigned_version)
98                    } else {
99                        ExecutionEnv::new()
100                    }
101                }
102            };
103            match &tx.kind {
104                ConsensusTransactionKind::CertifiedTransaction(tx) => {
105                    if tx.is_consensus_tx() {
106                        validator.execution_scheduler().enqueue(
107                            vec![(
108                                VerifiedExecutableTransaction::new_from_certificate(
109                                    VerifiedCertificate::new_unchecked(*tx.clone()),
110                                )
111                                .into(),
112                                env,
113                            )],
114                            &epoch_store,
115                        );
116                    }
117                }
118                ConsensusTransactionKind::UserTransaction(tx) => {
119                    if tx.is_consensus_tx() {
120                        validator.execution_scheduler().enqueue(
121                            vec![(
122                                VerifiedExecutableTransaction::new_from_consensus(
123                                    VerifiedTransaction::new_unchecked(*tx.clone()),
124                                    0,
125                                )
126                                .into(),
127                                env,
128                            )],
129                            &epoch_store,
130                        );
131                    }
132                }
133                ConsensusTransactionKind::UserTransactionV2(tx) => {
134                    if tx.tx().is_consensus_tx() {
135                        validator.execution_scheduler().enqueue(
136                            vec![(
137                                VerifiedExecutableTransaction::new_from_consensus(
138                                    VerifiedTransaction::new_unchecked(tx.tx().clone()),
139                                    0,
140                                )
141                                .into(),
142                                env,
143                            )],
144                            &epoch_store,
145                        );
146                    }
147                }
148                _ => {}
149            }
150        }
151    }
152
153    fn submit_impl(
154        &self,
155        transactions: &[ConsensusTransaction],
156    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
157        // TODO: maybe support multi-transactions and remove this check
158        assert!(transactions.len() == 1);
159        let transaction = &transactions[0];
160        self.tx_sender
161            .try_send(transaction.clone())
162            .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
163        // TODO(fastpath): Add some way to simulate consensus positions across blocks
164        Ok((
165            vec![ConsensusPosition {
166                epoch: EpochId::MIN,
167                block: BlockRef::MIN,
168                index: 0,
169            }],
170            with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
171        ))
172    }
173}
174
175impl SubmitToConsensus for MockConsensusClient {
176    fn submit_to_consensus(
177        &self,
178        transactions: &[ConsensusTransaction],
179        _epoch_store: &Arc<AuthorityPerEpochStore>,
180    ) -> SuiResult {
181        self.submit_impl(transactions).map(|_response| ())
182    }
183
184    fn submit_best_effort(
185        &self,
186        transaction: &ConsensusTransaction,
187        _epoch_store: &Arc<AuthorityPerEpochStore>,
188        _timeout: Duration,
189    ) -> SuiResult {
190        self.submit_impl(std::slice::from_ref(transaction))
191            .map(|_response| ())
192    }
193}
194
195#[async_trait::async_trait]
196impl ConsensusClient for MockConsensusClient {
197    async fn submit(
198        &self,
199        transactions: &[ConsensusTransaction],
200        _epoch_store: &Arc<AuthorityPerEpochStore>,
201    ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
202        self.submit_impl(transactions)
203    }
204}
205
206pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
207    let (tx, rx) = oneshot::channel();
208    tx.send(status).ok();
209    rx
210}