1use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18 AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24 AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::Schedulable;
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30 BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31 ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34 ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35 SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::mock_consensus::with_block_status;
40
41pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
42
43pub struct TestConsensusCommit {
44 pub transactions: Vec<ConsensusTransaction>,
45 pub round: u64,
46 pub timestamp_ms: u64,
47 pub sub_dag_index: u64,
48}
49
50impl TestConsensusCommit {
51 pub fn new(
52 transactions: Vec<ConsensusTransaction>,
53 round: u64,
54 timestamp_ms: u64,
55 sub_dag_index: u64,
56 ) -> Self {
57 Self {
58 transactions,
59 round,
60 timestamp_ms,
61 sub_dag_index,
62 }
63 }
64
65 pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
66 Self {
67 transactions: vec![],
68 round,
69 timestamp_ms,
70 sub_dag_index,
71 }
72 }
73}
74
75impl std::fmt::Display for TestConsensusCommit {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(
78 f,
79 "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
80 self.round, self.timestamp_ms, self.sub_dag_index
81 )
82 }
83}
84
85impl ConsensusCommitAPI for TestConsensusCommit {
86 fn commit_ref(&self) -> consensus_core::CommitRef {
87 consensus_core::CommitRef::default()
88 }
89
90 fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
91 None
92 }
93
94 fn leader_round(&self) -> u64 {
95 self.round
96 }
97
98 fn leader_author_index(&self) -> AuthorityIndex {
99 0
100 }
101
102 fn commit_timestamp_ms(&self) -> u64 {
103 self.timestamp_ms
104 }
105
106 fn commit_sub_dag_index(&self) -> u64 {
107 self.sub_dag_index
108 }
109
110 fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
111 let block_ref = BlockRef {
112 author: consensus_config::AuthorityIndex::ZERO,
113 round: self.round as u32,
114 digest: Default::default(),
115 };
116
117 let parsed_txs: Vec<ParsedTransaction> = self
118 .transactions
119 .iter()
120 .map(|tx| ParsedTransaction {
121 transaction: tx.clone(),
122 rejected: false,
123 serialized_len: 0,
124 })
125 .collect();
126
127 vec![(block_ref, parsed_txs)]
128 }
129
130 fn rejected_transactions_digest(&self) -> Digest {
131 Digest::default()
132 }
133
134 fn rejected_transactions_debug_string(&self) -> String {
135 "no rejected transactions from TestConsensusCommit".to_string()
136 }
137}
138
139pub struct TestConsensusHandlerSetup<C> {
140 pub consensus_handler: ConsensusHandler<C>,
141 pub captured_transactions: CapturedTransactions,
142}
143
144pub fn make_consensus_adapter_for_test(
145 state: Arc<AuthorityState>,
146 process_via_checkpoint: HashSet<TransactionDigest>,
147 execute: bool,
148 mock_block_status_receivers: Vec<BlockStatusReceiver>,
149) -> Arc<ConsensusAdapter> {
150 let metrics = ConsensusAdapterMetrics::new_test();
151
152 #[derive(Clone)]
153 struct SubmitDirectly {
154 state: Arc<AuthorityState>,
155 process_via_checkpoint: HashSet<TransactionDigest>,
156 execute: bool,
157 mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
158 }
159
160 #[async_trait::async_trait]
161 impl ConsensusClient for SubmitDirectly {
162 async fn submit(
163 &self,
164 transactions: &[ConsensusTransaction],
165 epoch_store: &Arc<AuthorityPerEpochStore>,
166 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
167 if transactions.is_empty() {
169 return Ok((
170 vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
171 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
172 ));
173 }
174
175 let num_transactions = transactions.len();
176 let mut executed_via_checkpoint = 0;
177
178 for txn in transactions {
180 if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
181 let transaction_digest = cert.digest();
182 if self.process_via_checkpoint.contains(transaction_digest) {
183 epoch_store
184 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
185 .expect("Should not fail");
186 executed_via_checkpoint += 1;
187 }
188 } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
189 let transaction_digest = tx.digest();
190 if self.process_via_checkpoint.contains(transaction_digest) {
191 epoch_store
192 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
193 .expect("Should not fail");
194 executed_via_checkpoint += 1;
195 }
196 } else if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
197 let transaction_digest = tx.tx().digest();
198 if self.process_via_checkpoint.contains(transaction_digest) {
199 epoch_store
200 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
201 .expect("Should not fail");
202 executed_via_checkpoint += 1;
203 }
204 }
205 }
206
207 let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
208 .iter()
209 .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
210 .collect();
211
212 let keys = sequenced_transactions
213 .iter()
214 .map(|tx| tx.key())
215 .collect::<Vec<_>>();
216
217 if self.execute {
219 for tx in sequenced_transactions {
220 if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
221 {
222 if self.process_via_checkpoint.contains(&transaction_digest) {
224 continue;
225 }
226
227 let executable_tx = match &tx.transaction {
229 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
230 ConsensusTransactionKind::CertifiedTransaction(cert) => {
231 Some(VerifiedExecutableTransaction::new_from_certificate(
232 VerifiedCertificate::new_unchecked(*cert.clone()),
233 ))
234 }
235 ConsensusTransactionKind::UserTransaction(tx) => {
236 Some(VerifiedExecutableTransaction::new_from_consensus(
237 VerifiedTransaction::new_unchecked(*tx.clone()),
238 0,
239 ))
240 }
241 ConsensusTransactionKind::UserTransactionV2(tx) => {
242 Some(VerifiedExecutableTransaction::new_from_consensus(
243 VerifiedTransaction::new_unchecked(tx.tx().clone()),
244 0,
245 ))
246 }
247 _ => None,
248 },
249 SequencedConsensusTransactionKind::System(sys_tx) => {
250 Some(sys_tx.clone())
251 }
252 };
253
254 if let Some(exec_tx) = executable_tx {
255 let versions = epoch_store.assign_shared_object_versions_for_tests(
256 self.state.get_object_cache_reader().as_ref(),
257 std::slice::from_ref(&exec_tx),
258 )?;
259
260 let assigned_version = versions
261 .into_map()
262 .into_iter()
263 .next()
264 .map(|(_, v)| v)
265 .unwrap_or_default();
266
267 self.state.execution_scheduler().enqueue(
268 vec![(
269 Schedulable::Transaction(exec_tx),
270 ExecutionEnv::new().with_assigned_versions(assigned_version),
271 )],
272 epoch_store,
273 );
274 }
275 }
276 }
277 }
278
279 epoch_store.process_notifications(keys.iter());
280
281 assert_eq!(
282 executed_via_checkpoint,
283 self.process_via_checkpoint.len(),
284 "Some transactions were not executed via checkpoint"
285 );
286
287 assert!(
288 !self.mock_block_status_receivers.lock().is_empty(),
289 "No mock submit responses left"
290 );
291
292 let mut consensus_positions = Vec::new();
293 for index in 0..num_transactions {
294 consensus_positions.push(ConsensusPosition {
295 epoch: epoch_store.epoch(),
296 index: index as u16,
297 block: BlockRef::MIN,
298 });
299 }
300
301 Ok((
302 consensus_positions,
303 self.mock_block_status_receivers.lock().remove(0),
304 ))
305 }
306 }
307 let epoch_store = state.epoch_store_for_testing();
308 Arc::new(ConsensusAdapter::new(
310 Arc::new(SubmitDirectly {
311 state: state.clone(),
312 process_via_checkpoint,
313 execute,
314 mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
315 }),
316 state.checkpoint_store.clone(),
317 state.name,
318 Arc::new(ConnectionMonitorStatusForTests {}),
319 100_000,
320 100_000,
321 None,
322 None,
323 metrics,
324 epoch_store.protocol_config().clone(),
325 ))
326}
327
328pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
330 authority: &Arc<AuthorityState>,
331 checkpoint_service: Arc<C>,
332) -> TestConsensusHandlerSetup<C>
333where
334 C: Send + Sync + 'static,
335{
336 let epoch_store = authority.epoch_store_for_testing();
337 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
338 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
339 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
340 let backpressure_manager = BackpressureManager::new_for_tests();
341 let consensus_adapter =
342 make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
343
344 let last_consensus_stats = ExecutionIndicesWithStatsV2 {
345 stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
346 consensus_committee.size(),
347 ),
348 ..Default::default()
349 };
350
351 let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
352 let captured_tx_clone = captured_transactions.clone();
353
354 let (tx_sender, mut receiver) =
355 mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
356
357 tokio::spawn(async move {
358 while let Some(item) = receiver.recv().await {
359 captured_tx_clone.lock().push(item);
360 }
361 });
362
363 let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
364
365 let consensus_handler = ConsensusHandler::new_for_testing(
366 epoch_store.clone(),
367 checkpoint_service,
368 execution_scheduler_sender,
369 consensus_adapter,
370 authority.get_object_cache_reader().clone(),
371 Arc::new(ArcSwap::default()),
372 consensus_committee,
373 metrics,
374 Arc::new(throughput_calculator),
375 backpressure_manager.subscribe(),
376 authority.traffic_controller.clone(),
377 last_consensus_stats,
378 );
379
380 TestConsensusHandlerSetup {
381 consensus_handler,
382 captured_transactions,
383 }
384}
385
386#[cfg(test)]
388pub async fn setup_consensus_handler_for_testing(
389 authority: &Arc<AuthorityState>,
390) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
391 setup_consensus_handler_for_testing_with_checkpoint_service(
392 authority,
393 Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
394 )
395 .await
396}