sui_core/
mock_consensus.rs1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23 tx_sender: mpsc::Sender<ConsensusTransaction>,
24 _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28 Noop,
30 DirectSequencing,
32}
33
34impl MockConsensusClient {
35 pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36 let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37 let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38 Self {
39 tx_sender,
40 _consensus_handle,
41 }
42 }
43
44 pub fn run(
45 validator: Weak<AuthorityState>,
46 tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47 consensus_mode: ConsensusMode,
48 ) -> JoinHandle<()> {
49 tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50 }
51
52 async fn run_impl(
53 validator: Weak<AuthorityState>,
54 mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55 consensus_mode: ConsensusMode,
56 ) {
57 while let Some(tx) = tx_receiver.recv().await {
58 let Some(validator) = validator.upgrade() else {
59 debug!("validator shut down; exiting MockConsensusClient");
60 return;
61 };
62 let epoch_store = validator.epoch_store_for_testing();
63 let env = match consensus_mode {
64 ConsensusMode::Noop => ExecutionEnv::new(),
65 ConsensusMode::DirectSequencing => {
66 let executable_tx = match &tx.kind {
68 ConsensusTransactionKind::CertifiedTransaction(cert) => {
69 Some(VerifiedExecutableTransaction::new_from_certificate(
70 VerifiedCertificate::new_unchecked(*cert.clone()),
71 ))
72 }
73 ConsensusTransactionKind::UserTransaction(tx) => {
74 Some(VerifiedExecutableTransaction::new_from_consensus(
75 VerifiedTransaction::new_unchecked(*tx.clone()),
76 0,
77 ))
78 }
79 _ => None,
80 };
81
82 if let Some(exec_tx) = executable_tx {
83 let assigned_versions = epoch_store
85 .assign_shared_object_versions_for_tests(
86 validator.get_object_cache_reader().as_ref(),
87 &vec![exec_tx.clone()],
88 )
89 .unwrap();
90
91 let assigned_version = assigned_versions
92 .into_map()
93 .into_iter()
94 .next()
95 .map(|(_, v)| v)
96 .unwrap_or_default();
97 ExecutionEnv::new().with_assigned_versions(assigned_version)
98 } else {
99 ExecutionEnv::new()
100 }
101 }
102 };
103 match &tx.kind {
104 ConsensusTransactionKind::CertifiedTransaction(tx) => {
105 if tx.is_consensus_tx() {
106 validator.execution_scheduler().enqueue(
107 vec![(
108 VerifiedExecutableTransaction::new_from_certificate(
109 VerifiedCertificate::new_unchecked(*tx.clone()),
110 )
111 .into(),
112 env,
113 )],
114 &epoch_store,
115 );
116 }
117 }
118 ConsensusTransactionKind::UserTransaction(tx) => {
119 if tx.is_consensus_tx() {
120 validator.execution_scheduler().enqueue(
121 vec![(
122 VerifiedExecutableTransaction::new_from_consensus(
123 VerifiedTransaction::new_unchecked(*tx.clone()),
124 0,
125 )
126 .into(),
127 env,
128 )],
129 &epoch_store,
130 );
131 }
132 }
133 ConsensusTransactionKind::UserTransactionV2(tx) => {
134 if tx.tx().is_consensus_tx() {
135 validator.execution_scheduler().enqueue(
136 vec![(
137 VerifiedExecutableTransaction::new_from_consensus(
138 VerifiedTransaction::new_unchecked(tx.tx().clone()),
139 0,
140 )
141 .into(),
142 env,
143 )],
144 &epoch_store,
145 );
146 }
147 }
148 _ => {}
149 }
150 }
151 }
152
153 fn submit_impl(
154 &self,
155 transactions: &[ConsensusTransaction],
156 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
157 assert!(transactions.len() == 1);
159 let transaction = &transactions[0];
160 self.tx_sender
161 .try_send(transaction.clone())
162 .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
163 Ok((
165 vec![ConsensusPosition {
166 epoch: EpochId::MIN,
167 block: BlockRef::MIN,
168 index: 0,
169 }],
170 with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
171 ))
172 }
173}
174
175impl SubmitToConsensus for MockConsensusClient {
176 fn submit_to_consensus(
177 &self,
178 transactions: &[ConsensusTransaction],
179 _epoch_store: &Arc<AuthorityPerEpochStore>,
180 ) -> SuiResult {
181 self.submit_impl(transactions).map(|_response| ())
182 }
183
184 fn submit_best_effort(
185 &self,
186 transaction: &ConsensusTransaction,
187 _epoch_store: &Arc<AuthorityPerEpochStore>,
188 _timeout: Duration,
189 ) -> SuiResult {
190 self.submit_impl(std::slice::from_ref(transaction))
191 .map(|_response| ())
192 }
193}
194
195#[async_trait::async_trait]
196impl ConsensusClient for MockConsensusClient {
197 async fn submit(
198 &self,
199 transactions: &[ConsensusTransaction],
200 _epoch_store: &Arc<AuthorityPerEpochStore>,
201 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
202 self.submit_impl(transactions)
203 }
204}
205
206pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
207 let (tx, rx) = oneshot::channel();
208 tx.send(status).ok();
209 rx
210}