sui_core/
mock_consensus.rs1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::VerifiedTransaction;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23 tx_sender: mpsc::Sender<ConsensusTransaction>,
24 _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28 Noop,
30 DirectSequencing,
32}
33
34impl MockConsensusClient {
35 pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36 let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37 let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38 Self {
39 tx_sender,
40 _consensus_handle,
41 }
42 }
43
44 pub fn run(
45 validator: Weak<AuthorityState>,
46 tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47 consensus_mode: ConsensusMode,
48 ) -> JoinHandle<()> {
49 tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50 }
51
52 async fn run_impl(
53 validator: Weak<AuthorityState>,
54 mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55 consensus_mode: ConsensusMode,
56 ) {
57 while let Some(tx) = tx_receiver.recv().await {
58 let Some(validator) = validator.upgrade() else {
59 debug!("validator shut down; exiting MockConsensusClient");
60 return;
61 };
62 let epoch_store = validator.epoch_store_for_testing();
63 let env = match consensus_mode {
64 ConsensusMode::Noop => ExecutionEnv::new(),
65 ConsensusMode::DirectSequencing => {
66 let executable_tx = match &tx.kind {
68 ConsensusTransactionKind::CertifiedTransaction(_) => None,
70 ConsensusTransactionKind::UserTransaction(tx) => {
71 Some(VerifiedExecutableTransaction::new_from_consensus(
72 VerifiedTransaction::new_unchecked(*tx.clone()),
73 0,
74 ))
75 }
76 ConsensusTransactionKind::UserTransactionV2(tx) => {
77 Some(VerifiedExecutableTransaction::new_from_consensus(
78 VerifiedTransaction::new_unchecked(tx.tx().clone()),
79 0,
80 ))
81 }
82 _ => None,
83 };
84
85 if let Some(exec_tx) = executable_tx {
86 let assigned_versions = epoch_store
88 .assign_shared_object_versions_for_tests(
89 validator.get_object_cache_reader().as_ref(),
90 &vec![exec_tx.clone()],
91 )
92 .unwrap();
93
94 let assigned_version = assigned_versions
95 .into_map()
96 .into_iter()
97 .next()
98 .map(|(_, v)| v)
99 .unwrap_or_default();
100 ExecutionEnv::new().with_assigned_versions(assigned_version)
101 } else {
102 ExecutionEnv::new()
103 }
104 }
105 };
106 match &tx.kind {
107 ConsensusTransactionKind::CertifiedTransaction(_) => {
109 debug!("Ignoring deprecated CertifiedTransaction in MockConsensusClient");
110 }
111 ConsensusTransactionKind::UserTransaction(tx) => {
112 if tx.is_consensus_tx() {
113 validator.execution_scheduler().enqueue(
114 vec![(
115 VerifiedExecutableTransaction::new_from_consensus(
116 VerifiedTransaction::new_unchecked(*tx.clone()),
117 0,
118 )
119 .into(),
120 env,
121 )],
122 &epoch_store,
123 );
124 }
125 }
126 ConsensusTransactionKind::UserTransactionV2(tx) => {
127 if tx.tx().is_consensus_tx() {
128 validator.execution_scheduler().enqueue(
129 vec![(
130 VerifiedExecutableTransaction::new_from_consensus(
131 VerifiedTransaction::new_unchecked(tx.tx().clone()),
132 0,
133 )
134 .into(),
135 env,
136 )],
137 &epoch_store,
138 );
139 }
140 }
141 _ => {}
142 }
143 }
144 }
145
146 fn submit_impl(
147 &self,
148 transactions: &[ConsensusTransaction],
149 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
150 assert!(transactions.len() == 1);
152 let transaction = &transactions[0];
153 self.tx_sender
154 .try_send(transaction.clone())
155 .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
156 Ok((
158 vec![ConsensusPosition {
159 epoch: EpochId::MIN,
160 block: BlockRef::MIN,
161 index: 0,
162 }],
163 with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
164 ))
165 }
166}
167
168impl SubmitToConsensus for MockConsensusClient {
169 fn submit_to_consensus(
170 &self,
171 transactions: &[ConsensusTransaction],
172 _epoch_store: &Arc<AuthorityPerEpochStore>,
173 ) -> SuiResult {
174 self.submit_impl(transactions).map(|_response| ())
175 }
176
177 fn submit_best_effort(
178 &self,
179 transaction: &ConsensusTransaction,
180 _epoch_store: &Arc<AuthorityPerEpochStore>,
181 _timeout: Duration,
182 ) -> SuiResult {
183 self.submit_impl(std::slice::from_ref(transaction))
184 .map(|_response| ())
185 }
186}
187
188#[async_trait::async_trait]
189impl ConsensusClient for MockConsensusClient {
190 async fn submit(
191 &self,
192 transactions: &[ConsensusTransaction],
193 _epoch_store: &Arc<AuthorityPerEpochStore>,
194 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
195 self.submit_impl(transactions)
196 }
197}
198
199pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
200 let (tx, rx) = oneshot::channel();
201 tx.send(status).ok();
202 rx
203}