1use std::collections::HashSet;
7use std::sync::Arc;
8
9use consensus_core::BlockStatus;
10use consensus_types::block::BlockRef;
11use parking_lot::Mutex;
12use prometheus::Registry;
13use sui_types::digests::{Digest, TransactionDigest};
14use sui_types::error::SuiResult;
15use sui_types::executable_transaction::VerifiedExecutableTransaction;
16use sui_types::messages_consensus::{
17 AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
18};
19use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
20use sui_types::transaction::VerifiedTransaction;
21
22use crate::authority::authority_per_epoch_store::{
23 AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
24};
25use crate::authority::backpressure::BackpressureManager;
26use crate::authority::shared_object_version_manager::Schedulable;
27use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
28use crate::consensus_adapter::{
29 BlockStatusReceiver, ConsensusAdapter, ConsensusAdapterMetrics, ConsensusClient,
30};
31use crate::consensus_handler::{
32 ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
33 SequencedConsensusTransactionKind,
34};
35use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
36use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
37use crate::mock_consensus::with_block_status;
38
39pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
40
41pub struct TestConsensusCommit {
42 pub transactions: Vec<ConsensusTransaction>,
43 pub round: u64,
44 pub timestamp_ms: u64,
45 pub sub_dag_index: u64,
46}
47
48impl TestConsensusCommit {
49 pub fn new(
50 transactions: Vec<ConsensusTransaction>,
51 round: u64,
52 timestamp_ms: u64,
53 sub_dag_index: u64,
54 ) -> Self {
55 Self {
56 transactions,
57 round,
58 timestamp_ms,
59 sub_dag_index,
60 }
61 }
62
63 pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
64 Self {
65 transactions: vec![],
66 round,
67 timestamp_ms,
68 sub_dag_index,
69 }
70 }
71}
72
73impl std::fmt::Display for TestConsensusCommit {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 write!(
76 f,
77 "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
78 self.round, self.timestamp_ms, self.sub_dag_index
79 )
80 }
81}
82
83impl ConsensusCommitAPI for TestConsensusCommit {
84 fn commit_ref(&self) -> consensus_core::CommitRef {
85 consensus_core::CommitRef::default()
86 }
87
88 fn leader_round(&self) -> u64 {
89 self.round
90 }
91
92 fn leader_author_index(&self) -> AuthorityIndex {
93 0
94 }
95
96 fn commit_timestamp_ms(&self) -> u64 {
97 self.timestamp_ms
98 }
99
100 fn commit_sub_dag_index(&self) -> u64 {
101 self.sub_dag_index
102 }
103
104 fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
105 let block_ref = BlockRef {
106 author: consensus_config::AuthorityIndex::ZERO,
107 round: self.round as u32,
108 digest: Default::default(),
109 };
110
111 let parsed_txs: Vec<ParsedTransaction> = self
112 .transactions
113 .iter()
114 .map(|tx| ParsedTransaction {
115 transaction: tx.clone(),
116 rejected: false,
117 serialized_len: 0,
118 })
119 .collect();
120
121 vec![(block_ref, parsed_txs)]
122 }
123
124 fn rejected_transactions_digest(&self) -> Digest {
125 Digest::default()
126 }
127
128 fn rejected_transactions_debug_string(&self) -> String {
129 "no rejected transactions from TestConsensusCommit".to_string()
130 }
131}
132
133pub struct TestConsensusHandlerSetup<C> {
134 pub consensus_handler: ConsensusHandler<C>,
135 pub captured_transactions: CapturedTransactions,
136}
137
138pub fn make_consensus_adapter_for_test(
139 state: Arc<AuthorityState>,
140 process_via_checkpoint: HashSet<TransactionDigest>,
141 execute: bool,
142 mock_block_status_receivers: Vec<BlockStatusReceiver>,
143) -> Arc<ConsensusAdapter> {
144 let metrics = ConsensusAdapterMetrics::new_test();
145
146 #[derive(Clone)]
147 struct SubmitDirectly {
148 state: Arc<AuthorityState>,
149 process_via_checkpoint: HashSet<TransactionDigest>,
150 execute: bool,
151 mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
152 }
153
154 #[async_trait::async_trait]
155 impl ConsensusClient for SubmitDirectly {
156 async fn submit(
157 &self,
158 transactions: &[ConsensusTransaction],
159 epoch_store: &Arc<AuthorityPerEpochStore>,
160 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
161 if transactions.is_empty() {
163 return Ok((
164 vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
165 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
166 ));
167 }
168
169 let num_transactions = transactions.len();
170 let mut executed_via_checkpoint = 0;
171
172 for txn in transactions {
174 if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
175 let transaction_digest = tx.tx().digest();
176 if self.process_via_checkpoint.contains(transaction_digest) {
177 epoch_store
178 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
179 .expect("Should not fail");
180 executed_via_checkpoint += 1;
181 }
182 }
183 }
184
185 let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
186 .iter()
187 .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
188 .collect();
189
190 let keys = sequenced_transactions
191 .iter()
192 .map(|tx| tx.key())
193 .collect::<Vec<_>>();
194
195 if self.execute {
197 for tx in sequenced_transactions {
198 if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
199 {
200 if self.process_via_checkpoint.contains(&transaction_digest) {
202 continue;
203 }
204
205 let executable_tx = match &tx.transaction {
207 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
208 ConsensusTransactionKind::UserTransactionV2(tx) => {
209 Some(VerifiedExecutableTransaction::new_from_consensus(
210 VerifiedTransaction::new_unchecked(tx.tx().clone()),
211 0,
212 ))
213 }
214 _ => None,
215 },
216 SequencedConsensusTransactionKind::System(sys_tx) => {
217 Some(sys_tx.clone())
218 }
219 };
220
221 if let Some(exec_tx) = executable_tx {
222 let versions = epoch_store.assign_shared_object_versions_for_tests(
223 self.state.get_object_cache_reader().as_ref(),
224 std::slice::from_ref(&exec_tx),
225 )?;
226
227 let assigned_version = versions
228 .into_map()
229 .into_iter()
230 .next()
231 .map(|(_, v)| v)
232 .unwrap_or_default();
233
234 self.state.execution_scheduler().enqueue(
235 vec![(
236 Schedulable::Transaction(exec_tx),
237 ExecutionEnv::new().with_assigned_versions(assigned_version),
238 )],
239 epoch_store,
240 );
241 }
242 }
243 }
244 }
245
246 epoch_store.process_notifications(keys.iter());
247
248 assert_eq!(
249 executed_via_checkpoint,
250 self.process_via_checkpoint.len(),
251 "Some transactions were not executed via checkpoint"
252 );
253
254 assert!(
255 !self.mock_block_status_receivers.lock().is_empty(),
256 "No mock submit responses left"
257 );
258
259 let mut consensus_positions = Vec::new();
260 for index in 0..num_transactions {
261 consensus_positions.push(ConsensusPosition {
262 epoch: epoch_store.epoch(),
263 index: index as u16,
264 block: BlockRef::MIN,
265 });
266 }
267
268 Ok((
269 consensus_positions,
270 self.mock_block_status_receivers.lock().remove(0),
271 ))
272 }
273 }
274 Arc::new(ConsensusAdapter::new(
276 Arc::new(SubmitDirectly {
277 state: state.clone(),
278 process_via_checkpoint,
279 execute,
280 mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
281 }),
282 state.checkpoint_store.clone(),
283 state.name,
284 100_000,
285 100_000,
286 metrics,
287 Arc::new(tokio::sync::Notify::new()),
288 ))
289}
290
291pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
293 authority: &Arc<AuthorityState>,
294 checkpoint_service: Arc<C>,
295) -> TestConsensusHandlerSetup<C>
296where
297 C: Send + Sync + 'static,
298{
299 let epoch_store = authority.epoch_store_for_testing();
300 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
301 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
302 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
303 let backpressure_manager = BackpressureManager::new_for_tests();
304 let consensus_adapter =
305 make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
306
307 let last_consensus_stats = ExecutionIndicesWithStatsV2 {
308 stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
309 consensus_committee.size(),
310 ),
311 ..Default::default()
312 };
313
314 let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
315 let captured_tx_clone = captured_transactions.clone();
316
317 let (tx_sender, mut receiver) =
318 mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
319
320 tokio::spawn(async move {
321 while let Some(item) = receiver.recv().await {
322 captured_tx_clone.lock().push(item);
323 }
324 });
325
326 let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
327
328 let consensus_handler = ConsensusHandler::new_for_testing(
329 epoch_store.clone(),
330 checkpoint_service,
331 execution_scheduler_sender,
332 consensus_adapter,
333 authority.get_object_cache_reader().clone(),
334 consensus_committee,
335 metrics,
336 Arc::new(throughput_calculator),
337 backpressure_manager.subscribe(),
338 authority.traffic_controller.clone(),
339 last_consensus_stats,
340 );
341
342 TestConsensusHandlerSetup {
343 consensus_handler,
344 captured_transactions,
345 }
346}
347
348#[cfg(test)]
350pub async fn setup_consensus_handler_for_testing(
351 authority: &Arc<AuthorityState>,
352) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
353 setup_consensus_handler_for_testing_with_checkpoint_service(
354 authority,
355 Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
356 )
357 .await
358}