1use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18 AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24 AuthorityPerEpochStore, ExecutionIndicesWithStats,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::{AssignedTxAndVersions, Schedulable};
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30 BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31 ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34 ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35 SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::execution_scheduler::SchedulingSource;
40use crate::mock_consensus::with_block_status;
41
42pub(crate) type CapturedTransactions =
43 Arc<Mutex<Vec<(Vec<Schedulable>, AssignedTxAndVersions, SchedulingSource)>>>;
44
45pub struct TestConsensusCommit {
46 pub transactions: Vec<ConsensusTransaction>,
47 pub round: u64,
48 pub timestamp_ms: u64,
49 pub sub_dag_index: u64,
50}
51
52impl TestConsensusCommit {
53 pub fn new(
54 transactions: Vec<ConsensusTransaction>,
55 round: u64,
56 timestamp_ms: u64,
57 sub_dag_index: u64,
58 ) -> Self {
59 Self {
60 transactions,
61 round,
62 timestamp_ms,
63 sub_dag_index,
64 }
65 }
66
67 pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
68 Self {
69 transactions: vec![],
70 round,
71 timestamp_ms,
72 sub_dag_index,
73 }
74 }
75}
76
77impl std::fmt::Display for TestConsensusCommit {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
82 self.round, self.timestamp_ms, self.sub_dag_index
83 )
84 }
85}
86
87impl ConsensusCommitAPI for TestConsensusCommit {
88 fn commit_ref(&self) -> consensus_core::CommitRef {
89 consensus_core::CommitRef::default()
90 }
91
92 fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
93 None
94 }
95
96 fn leader_round(&self) -> u64 {
97 self.round
98 }
99
100 fn leader_author_index(&self) -> AuthorityIndex {
101 0
102 }
103
104 fn commit_timestamp_ms(&self) -> u64 {
105 self.timestamp_ms
106 }
107
108 fn commit_sub_dag_index(&self) -> u64 {
109 self.sub_dag_index
110 }
111
112 fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
113 let block_ref = BlockRef {
114 author: consensus_config::AuthorityIndex::ZERO,
115 round: self.round as u32,
116 digest: Default::default(),
117 };
118
119 let parsed_txs: Vec<ParsedTransaction> = self
120 .transactions
121 .iter()
122 .map(|tx| ParsedTransaction {
123 transaction: tx.clone(),
124 rejected: false,
125 serialized_len: 0,
126 })
127 .collect();
128
129 vec![(block_ref, parsed_txs)]
130 }
131
132 fn rejected_transactions_digest(&self) -> Digest {
133 Digest::default()
134 }
135
136 fn rejected_transactions_debug_string(&self) -> String {
137 "no rejected transactions from TestConsensusCommit".to_string()
138 }
139}
140
141pub struct TestConsensusHandlerSetup<C> {
142 pub consensus_handler: ConsensusHandler<C>,
143 pub captured_transactions: CapturedTransactions,
144}
145
146pub fn make_consensus_adapter_for_test(
147 state: Arc<AuthorityState>,
148 process_via_checkpoint: HashSet<TransactionDigest>,
149 execute: bool,
150 mock_block_status_receivers: Vec<BlockStatusReceiver>,
151) -> Arc<ConsensusAdapter> {
152 let metrics = ConsensusAdapterMetrics::new_test();
153
154 #[derive(Clone)]
155 struct SubmitDirectly {
156 state: Arc<AuthorityState>,
157 process_via_checkpoint: HashSet<TransactionDigest>,
158 execute: bool,
159 mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
160 }
161
162 #[async_trait::async_trait]
163 impl ConsensusClient for SubmitDirectly {
164 async fn submit(
165 &self,
166 transactions: &[ConsensusTransaction],
167 epoch_store: &Arc<AuthorityPerEpochStore>,
168 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
169 if transactions.is_empty() {
171 return Ok((
172 vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
173 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
174 ));
175 }
176
177 let num_transactions = transactions.len();
178 let mut executed_via_checkpoint = 0;
179
180 for txn in transactions {
182 if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
183 let transaction_digest = cert.digest();
184 if self.process_via_checkpoint.contains(transaction_digest) {
185 epoch_store
186 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
187 .expect("Should not fail");
188 executed_via_checkpoint += 1;
189 }
190 } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
191 let transaction_digest = tx.digest();
192 if self.process_via_checkpoint.contains(transaction_digest) {
193 epoch_store
194 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
195 .expect("Should not fail");
196 executed_via_checkpoint += 1;
197 }
198 } else if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
199 let transaction_digest = tx.tx().digest();
200 if self.process_via_checkpoint.contains(transaction_digest) {
201 epoch_store
202 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
203 .expect("Should not fail");
204 executed_via_checkpoint += 1;
205 }
206 }
207 }
208
209 let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
210 .iter()
211 .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
212 .collect();
213
214 let keys = sequenced_transactions
215 .iter()
216 .map(|tx| tx.key())
217 .collect::<Vec<_>>();
218
219 if self.execute {
221 for tx in sequenced_transactions {
222 if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
223 {
224 if self.process_via_checkpoint.contains(&transaction_digest) {
226 continue;
227 }
228
229 let executable_tx = match &tx.transaction {
231 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
232 ConsensusTransactionKind::CertifiedTransaction(cert) => {
233 Some(VerifiedExecutableTransaction::new_from_certificate(
234 VerifiedCertificate::new_unchecked(*cert.clone()),
235 ))
236 }
237 ConsensusTransactionKind::UserTransaction(tx) => {
238 Some(VerifiedExecutableTransaction::new_from_consensus(
239 VerifiedTransaction::new_unchecked(*tx.clone()),
240 0,
241 ))
242 }
243 ConsensusTransactionKind::UserTransactionV2(tx) => {
244 Some(VerifiedExecutableTransaction::new_from_consensus(
245 VerifiedTransaction::new_unchecked(tx.tx().clone()),
246 0,
247 ))
248 }
249 _ => None,
250 },
251 SequencedConsensusTransactionKind::System(sys_tx) => {
252 Some(sys_tx.clone())
253 }
254 };
255
256 if let Some(exec_tx) = executable_tx {
257 let versions = epoch_store.assign_shared_object_versions_for_tests(
258 self.state.get_object_cache_reader().as_ref(),
259 &vec![exec_tx.clone()],
260 )?;
261
262 let assigned_version = versions
263 .into_map()
264 .into_iter()
265 .next()
266 .map(|(_, v)| v)
267 .unwrap_or_default();
268
269 self.state.execution_scheduler().enqueue(
270 vec![(
271 Schedulable::Transaction(exec_tx),
272 ExecutionEnv::new().with_assigned_versions(assigned_version),
273 )],
274 epoch_store,
275 );
276 }
277 }
278 }
279 }
280
281 epoch_store.process_notifications(keys.iter());
282
283 assert_eq!(
284 executed_via_checkpoint,
285 self.process_via_checkpoint.len(),
286 "Some transactions were not executed via checkpoint"
287 );
288
289 assert!(
290 !self.mock_block_status_receivers.lock().is_empty(),
291 "No mock submit responses left"
292 );
293
294 let mut consensus_positions = Vec::new();
295 for index in 0..num_transactions {
296 consensus_positions.push(ConsensusPosition {
297 epoch: epoch_store.epoch(),
298 index: index as u16,
299 block: BlockRef::MIN,
300 });
301 }
302
303 Ok((
304 consensus_positions,
305 self.mock_block_status_receivers.lock().remove(0),
306 ))
307 }
308 }
309 let epoch_store = state.epoch_store_for_testing();
310 Arc::new(ConsensusAdapter::new(
312 Arc::new(SubmitDirectly {
313 state: state.clone(),
314 process_via_checkpoint,
315 execute,
316 mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
317 }),
318 state.checkpoint_store.clone(),
319 state.name,
320 Arc::new(ConnectionMonitorStatusForTests {}),
321 100_000,
322 100_000,
323 None,
324 None,
325 metrics,
326 epoch_store.protocol_config().clone(),
327 ))
328}
329
330pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
332 authority: &Arc<AuthorityState>,
333 checkpoint_service: Arc<C>,
334) -> TestConsensusHandlerSetup<C>
335where
336 C: Send + Sync + 'static,
337{
338 let epoch_store = authority.epoch_store_for_testing();
339 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
340 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
341 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
342 let backpressure_manager = BackpressureManager::new_for_tests();
343 let consensus_adapter =
344 make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
345
346 let last_consensus_stats = ExecutionIndicesWithStats {
347 stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
348 consensus_committee.size(),
349 ),
350 ..Default::default()
351 };
352
353 let captured_transactions = Arc::new(Mutex::new(Vec::<(
355 Vec<Schedulable>,
356 AssignedTxAndVersions,
357 SchedulingSource,
358 )>::new()));
359 let captured_tx_clone = captured_transactions.clone();
360
361 let (tx_sender, mut receiver) =
363 mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
364
365 tokio::spawn(async move {
367 while let Some(item) = receiver.recv().await {
368 captured_tx_clone.lock().push(item);
369 }
370 });
371
372 let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
373
374 let consensus_handler = ConsensusHandler::new_for_testing(
375 epoch_store.clone(),
376 checkpoint_service,
377 execution_scheduler_sender,
378 consensus_adapter,
379 authority.get_object_cache_reader().clone(),
380 Arc::new(ArcSwap::default()),
381 consensus_committee,
382 metrics,
383 Arc::new(throughput_calculator),
384 backpressure_manager.subscribe(),
385 authority.traffic_controller.clone(),
386 last_consensus_stats,
387 );
388
389 TestConsensusHandlerSetup {
390 consensus_handler,
391 captured_transactions,
392 }
393}
394
395#[cfg(test)]
397pub async fn setup_consensus_handler_for_testing(
398 authority: &Arc<AuthorityState>,
399) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
400 setup_consensus_handler_for_testing_with_checkpoint_service(
401 authority,
402 Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
403 )
404 .await
405}