sui_core/
mock_consensus.rs1use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
5use crate::authority::{AuthorityState, ExecutionEnv};
6use crate::consensus_adapter::{BlockStatusReceiver, ConsensusClient, SubmitToConsensus};
7
8use consensus_types::block::BlockRef;
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11use sui_types::committee::EpochId;
12use sui_types::error::{SuiError, SuiResult};
13use sui_types::executable_transaction::VerifiedExecutableTransaction;
14use sui_types::messages_consensus::{
15 ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
16};
17use sui_types::transaction::VerifiedTransaction;
18use tokio::sync::{mpsc, oneshot};
19use tokio::task::JoinHandle;
20use tracing::debug;
21
22pub struct MockConsensusClient {
23 tx_sender: mpsc::Sender<ConsensusTransaction>,
24 _consensus_handle: JoinHandle<()>,
25}
26
27pub enum ConsensusMode {
28 Noop,
30 DirectSequencing,
32}
33
34impl MockConsensusClient {
35 pub fn new(validator: Weak<AuthorityState>, consensus_mode: ConsensusMode) -> Self {
36 let (tx_sender, tx_receiver) = mpsc::channel(1000000);
37 let _consensus_handle = Self::run(validator, tx_receiver, consensus_mode);
38 Self {
39 tx_sender,
40 _consensus_handle,
41 }
42 }
43
44 pub fn run(
45 validator: Weak<AuthorityState>,
46 tx_receiver: mpsc::Receiver<ConsensusTransaction>,
47 consensus_mode: ConsensusMode,
48 ) -> JoinHandle<()> {
49 tokio::spawn(async move { Self::run_impl(validator, tx_receiver, consensus_mode).await })
50 }
51
52 async fn run_impl(
53 validator: Weak<AuthorityState>,
54 mut tx_receiver: mpsc::Receiver<ConsensusTransaction>,
55 consensus_mode: ConsensusMode,
56 ) {
57 while let Some(tx) = tx_receiver.recv().await {
58 let Some(validator) = validator.upgrade() else {
59 debug!("validator shut down; exiting MockConsensusClient");
60 return;
61 };
62 let epoch_store = validator.epoch_store_for_testing();
63 let env = match consensus_mode {
64 ConsensusMode::Noop => ExecutionEnv::new(),
65 ConsensusMode::DirectSequencing => {
66 let executable_tx = match &tx.kind {
70 ConsensusTransactionKind::UserTransactionV2(tx) => {
71 Some(VerifiedExecutableTransaction::new_from_consensus(
72 VerifiedTransaction::new_unchecked(tx.tx().clone()),
73 0,
74 ))
75 }
76 _ => None,
77 };
78
79 if let Some(exec_tx) = executable_tx {
80 let assigned_versions = epoch_store
82 .assign_shared_object_versions_for_tests(
83 validator.get_object_cache_reader().as_ref(),
84 std::slice::from_ref(&exec_tx),
85 )
86 .unwrap();
87
88 let assigned_version = assigned_versions
89 .into_map()
90 .into_iter()
91 .next()
92 .map(|(_, v)| v)
93 .unwrap_or_default();
94 ExecutionEnv::new().with_assigned_versions(assigned_version)
95 } else {
96 ExecutionEnv::new()
97 }
98 }
99 };
100 match &tx.kind {
101 ConsensusTransactionKind::CertifiedTransaction(_)
103 | ConsensusTransactionKind::UserTransaction(_) => {
104 debug!(
105 "Ignoring deprecated {:?} in MockConsensusClient",
106 std::mem::discriminant(&tx.kind)
107 );
108 }
109 ConsensusTransactionKind::UserTransactionV2(tx) => {
110 if tx.tx().is_consensus_tx() {
111 validator.execution_scheduler().enqueue(
112 vec![(
113 VerifiedExecutableTransaction::new_from_consensus(
114 VerifiedTransaction::new_unchecked(tx.tx().clone()),
115 0,
116 )
117 .into(),
118 env,
119 )],
120 &epoch_store,
121 );
122 }
123 }
124 _ => {}
125 }
126 }
127 }
128
129 fn submit_impl(
130 &self,
131 transactions: &[ConsensusTransaction],
132 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
133 assert!(transactions.len() == 1);
135 let transaction = &transactions[0];
136 self.tx_sender
137 .try_send(transaction.clone())
138 .map_err(|_| SuiError::from("MockConsensusClient channel overflowed"))?;
139 Ok((
141 vec![ConsensusPosition {
142 epoch: EpochId::MIN,
143 block: BlockRef::MIN,
144 index: 0,
145 }],
146 with_block_status(consensus_core::BlockStatus::Sequenced(BlockRef::MIN)),
147 ))
148 }
149}
150
151impl SubmitToConsensus for MockConsensusClient {
152 fn submit_to_consensus(
153 &self,
154 transactions: &[ConsensusTransaction],
155 _epoch_store: &Arc<AuthorityPerEpochStore>,
156 ) -> SuiResult {
157 self.submit_impl(transactions).map(|_response| ())
158 }
159
160 fn submit_best_effort(
161 &self,
162 transaction: &ConsensusTransaction,
163 _epoch_store: &Arc<AuthorityPerEpochStore>,
164 _timeout: Duration,
165 ) -> SuiResult {
166 self.submit_impl(std::slice::from_ref(transaction))
167 .map(|_response| ())
168 }
169}
170
171#[async_trait::async_trait]
172impl ConsensusClient for MockConsensusClient {
173 async fn submit(
174 &self,
175 transactions: &[ConsensusTransaction],
176 _epoch_store: &Arc<AuthorityPerEpochStore>,
177 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
178 self.submit_impl(transactions)
179 }
180}
181
182pub(crate) fn with_block_status(status: consensus_core::BlockStatus) -> BlockStatusReceiver {
183 let (tx, rx) = oneshot::channel();
184 tx.send(status).ok();
185 rx
186}