1use std::collections::HashSet;
7use std::sync::Arc;
8
9use consensus_core::BlockStatus;
10use consensus_types::block::BlockRef;
11use parking_lot::Mutex;
12use prometheus::Registry;
13use sui_types::digests::{Digest, TransactionDigest};
14use sui_types::error::SuiResult;
15use sui_types::executable_transaction::VerifiedExecutableTransaction;
16use sui_types::messages_consensus::{
17 AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
18};
19use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
20use sui_types::transaction::VerifiedTransaction;
21
22use crate::authority::authority_per_epoch_store::{
23 AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
24};
25use crate::authority::backpressure::BackpressureManager;
26use crate::authority::shared_object_version_manager::Schedulable;
27use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
28use crate::consensus_adapter::{
29 BlockStatusReceiver, ConsensusAdapter, ConsensusAdapterMetrics, ConsensusClient,
30};
31use crate::consensus_handler::{
32 ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
33 SequencedConsensusTransactionKind,
34};
35use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
36use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
37use crate::mock_consensus::with_block_status;
38
39pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
40
41pub struct TestConsensusCommit {
42 pub transactions: Vec<ConsensusTransaction>,
43 pub round: u64,
44 pub timestamp_ms: u64,
45 pub sub_dag_index: u64,
46}
47
48impl TestConsensusCommit {
49 pub fn new(
50 transactions: Vec<ConsensusTransaction>,
51 round: u64,
52 timestamp_ms: u64,
53 sub_dag_index: u64,
54 ) -> Self {
55 Self {
56 transactions,
57 round,
58 timestamp_ms,
59 sub_dag_index,
60 }
61 }
62
63 pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
64 Self {
65 transactions: vec![],
66 round,
67 timestamp_ms,
68 sub_dag_index,
69 }
70 }
71}
72
73impl std::fmt::Display for TestConsensusCommit {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(
76 f,
77 "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
78 self.round, self.timestamp_ms, self.sub_dag_index
79 )
80 }
81}
82
83impl ConsensusCommitAPI for TestConsensusCommit {
84 fn commit_ref(&self) -> consensus_core::CommitRef {
85 consensus_core::CommitRef::default()
86 }
87
88 fn leader_round(&self) -> u64 {
89 self.round
90 }
91
92 fn leader_author_index(&self) -> AuthorityIndex {
93 0
94 }
95
96 fn commit_timestamp_ms(&self) -> u64 {
97 self.timestamp_ms
98 }
99
100 fn commit_sub_dag_index(&self) -> u64 {
101 self.sub_dag_index
102 }
103
104 fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
105 let block_ref = BlockRef {
106 author: consensus_config::AuthorityIndex::ZERO,
107 round: self.round as u32,
108 digest: Default::default(),
109 };
110
111 let parsed_txs: Vec<ParsedTransaction> = self
112 .transactions
113 .iter()
114 .map(|tx| ParsedTransaction {
115 transaction: tx.clone(),
116 rejected: false,
117 serialized_len: 0,
118 })
119 .collect();
120
121 vec![(block_ref, parsed_txs)]
122 }
123
124 fn rejected_transactions_digest(&self) -> Digest {
125 Digest::default()
126 }
127
128 fn rejected_transactions_debug_string(&self) -> String {
129 "no rejected transactions from TestConsensusCommit".to_string()
130 }
131}
132
133pub struct TestConsensusHandlerSetup<C> {
134 pub consensus_handler: ConsensusHandler<C>,
135 pub captured_transactions: CapturedTransactions,
136}
137
138pub fn make_consensus_adapter_with_client_for_test(
141 state: &Arc<AuthorityState>,
142 client: Arc<dyn ConsensusClient>,
143 max_pending_local_submissions: usize,
144) -> Arc<ConsensusAdapter> {
145 Arc::new(ConsensusAdapter::new(
146 client,
147 state.checkpoint_store.clone(),
148 state.name,
149 100_000,
150 max_pending_local_submissions,
151 ConsensusAdapterMetrics::new_test(),
152 Arc::new(tokio::sync::Notify::new()),
153 ))
154}
155
156pub fn make_consensus_adapter_for_test(
157 state: Arc<AuthorityState>,
158 process_via_checkpoint: HashSet<TransactionDigest>,
159 execute: bool,
160 mock_block_status_receivers: Vec<BlockStatusReceiver>,
161) -> Arc<ConsensusAdapter> {
162 make_consensus_adapter_for_test_with_submit_limit(
163 state,
164 process_via_checkpoint,
165 execute,
166 mock_block_status_receivers,
167 100_000,
168 )
169}
170
171pub fn make_consensus_adapter_for_test_with_submit_limit(
172 state: Arc<AuthorityState>,
173 process_via_checkpoint: HashSet<TransactionDigest>,
174 execute: bool,
175 mock_block_status_receivers: Vec<BlockStatusReceiver>,
176 max_pending_local_submissions: usize,
177) -> Arc<ConsensusAdapter> {
178 #[derive(Clone)]
179 struct SubmitDirectly {
180 state: Arc<AuthorityState>,
181 process_via_checkpoint: HashSet<TransactionDigest>,
182 execute: bool,
183 mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
184 }
185
186 #[async_trait::async_trait]
187 impl ConsensusClient for SubmitDirectly {
188 async fn submit(
189 &self,
190 transactions: &[ConsensusTransaction],
191 epoch_store: &Arc<AuthorityPerEpochStore>,
192 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
193 if transactions.is_empty() {
195 return Ok((
196 vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
197 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
198 ));
199 }
200
201 let num_transactions = transactions.len();
202 let mut executed_via_checkpoint = 0;
203
204 for txn in transactions {
206 if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
207 let transaction_digest = tx.tx().digest();
208 if self.process_via_checkpoint.contains(transaction_digest) {
209 epoch_store
210 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
211 .expect("Should not fail");
212 executed_via_checkpoint += 1;
213 }
214 }
215 }
216
217 let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
218 .iter()
219 .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
220 .collect();
221
222 let keys = sequenced_transactions
223 .iter()
224 .map(|tx| tx.key())
225 .collect::<Vec<_>>();
226
227 if self.execute {
229 for tx in sequenced_transactions {
230 if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
231 {
232 if self.process_via_checkpoint.contains(&transaction_digest) {
234 continue;
235 }
236
237 let executable_tx = match &tx.transaction {
239 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
240 ConsensusTransactionKind::UserTransactionV2(tx) => {
241 Some(VerifiedExecutableTransaction::new_from_consensus(
242 VerifiedTransaction::new_unchecked(tx.tx().clone()),
243 0,
244 ))
245 }
246 _ => None,
247 },
248 SequencedConsensusTransactionKind::System(sys_tx) => {
249 Some(sys_tx.clone())
250 }
251 };
252
253 if let Some(exec_tx) = executable_tx {
254 let versions = epoch_store.assign_shared_object_versions_for_tests(
255 self.state.get_object_cache_reader().as_ref(),
256 std::slice::from_ref(&exec_tx),
257 )?;
258
259 let assigned_version = versions
260 .into_map()
261 .into_iter()
262 .next()
263 .map(|(_, v)| v)
264 .unwrap_or_default();
265
266 self.state.execution_scheduler().enqueue(
267 vec![(
268 Schedulable::Transaction(exec_tx),
269 ExecutionEnv::new().with_assigned_versions(assigned_version),
270 )],
271 epoch_store,
272 );
273 }
274 }
275 }
276 }
277
278 epoch_store.process_notifications(keys.iter());
279
280 assert_eq!(
281 executed_via_checkpoint,
282 self.process_via_checkpoint.len(),
283 "Some transactions were not executed via checkpoint"
284 );
285
286 assert!(
287 !self.mock_block_status_receivers.lock().is_empty(),
288 "No mock submit responses left"
289 );
290
291 let mut consensus_positions = Vec::new();
292 for index in 0..num_transactions {
293 consensus_positions.push(ConsensusPosition {
294 epoch: epoch_store.epoch(),
295 index: index as u16,
296 block: BlockRef::MIN,
297 });
298 }
299
300 Ok((
301 consensus_positions,
302 self.mock_block_status_receivers.lock().remove(0),
303 ))
304 }
305 }
306 let client = Arc::new(SubmitDirectly {
308 state: state.clone(),
309 process_via_checkpoint,
310 execute,
311 mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
312 });
313 make_consensus_adapter_with_client_for_test(&state, client, max_pending_local_submissions)
314}
315
316pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
318 authority: &Arc<AuthorityState>,
319 checkpoint_service: Arc<C>,
320) -> TestConsensusHandlerSetup<C>
321where
322 C: Send + Sync + 'static,
323{
324 let epoch_store = authority.epoch_store_for_testing();
325 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
326 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
327 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
328 let backpressure_manager = BackpressureManager::new_for_tests();
329 let consensus_adapter =
330 make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
331
332 let last_consensus_stats = ExecutionIndicesWithStatsV2 {
333 stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
334 consensus_committee.size(),
335 ),
336 ..Default::default()
337 };
338
339 let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
340 let captured_tx_clone = captured_transactions.clone();
341
342 let (tx_sender, mut receiver) =
343 mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
344
345 tokio::spawn(async move {
346 while let Some(item) = receiver.recv().await {
347 captured_tx_clone.lock().push(item);
348 }
349 });
350
351 let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
352
353 let consensus_handler = ConsensusHandler::new_for_testing(
354 epoch_store.clone(),
355 checkpoint_service,
356 execution_scheduler_sender,
357 consensus_adapter,
358 authority.get_object_cache_reader().clone(),
359 consensus_committee,
360 metrics,
361 Arc::new(throughput_calculator),
362 backpressure_manager.subscribe(),
363 authority.traffic_controller.clone(),
364 last_consensus_stats,
365 );
366
367 TestConsensusHandlerSetup {
368 consensus_handler,
369 captured_transactions,
370 }
371}
372
373#[cfg(test)]
375pub async fn setup_consensus_handler_for_testing(
376 authority: &Arc<AuthorityState>,
377) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
378 setup_consensus_handler_for_testing_with_checkpoint_service(
379 authority,
380 Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
381 )
382 .await
383}