1use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18 AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24 AuthorityPerEpochStore, ExecutionIndicesWithStats,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::{AssignedTxAndVersions, Schedulable};
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30 BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31 ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34 ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35 SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::execution_scheduler::SchedulingSource;
40use crate::mock_consensus::with_block_status;
41
42pub(crate) type CapturedTransactions =
43 Arc<Mutex<Vec<(Vec<Schedulable>, AssignedTxAndVersions, SchedulingSource)>>>;
44
45pub struct TestConsensusCommit {
46 pub transactions: Vec<ConsensusTransaction>,
47 pub round: u64,
48 pub timestamp_ms: u64,
49 pub sub_dag_index: u64,
50}
51
52impl TestConsensusCommit {
53 pub fn new(
54 transactions: Vec<ConsensusTransaction>,
55 round: u64,
56 timestamp_ms: u64,
57 sub_dag_index: u64,
58 ) -> Self {
59 Self {
60 transactions,
61 round,
62 timestamp_ms,
63 sub_dag_index,
64 }
65 }
66
67 pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
68 Self {
69 transactions: vec![],
70 round,
71 timestamp_ms,
72 sub_dag_index,
73 }
74 }
75}
76
77impl std::fmt::Display for TestConsensusCommit {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
82 self.round, self.timestamp_ms, self.sub_dag_index
83 )
84 }
85}
86
87impl ConsensusCommitAPI for TestConsensusCommit {
88 fn commit_ref(&self) -> consensus_core::CommitRef {
89 consensus_core::CommitRef::default()
90 }
91
92 fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
93 None
94 }
95
96 fn leader_round(&self) -> u64 {
97 self.round
98 }
99
100 fn leader_author_index(&self) -> AuthorityIndex {
101 0
102 }
103
104 fn commit_timestamp_ms(&self) -> u64 {
105 self.timestamp_ms
106 }
107
108 fn commit_sub_dag_index(&self) -> u64 {
109 self.sub_dag_index
110 }
111
112 fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
113 let block_ref = BlockRef {
114 author: consensus_config::AuthorityIndex::ZERO,
115 round: self.round as u32,
116 digest: Default::default(),
117 };
118
119 let parsed_txs: Vec<ParsedTransaction> = self
120 .transactions
121 .iter()
122 .map(|tx| ParsedTransaction {
123 transaction: tx.clone(),
124 rejected: false,
125 serialized_len: 0,
126 })
127 .collect();
128
129 vec![(block_ref, parsed_txs)]
130 }
131
132 fn rejected_transactions_digest(&self) -> Digest {
133 Digest::default()
134 }
135
136 fn rejected_transactions_debug_string(&self) -> String {
137 "no rejected transactions from TestConsensusCommit".to_string()
138 }
139}
140
141pub struct TestConsensusHandlerSetup<C> {
142 pub consensus_handler: ConsensusHandler<C>,
143 pub captured_transactions: CapturedTransactions,
144}
145
146pub fn make_consensus_adapter_for_test(
147 state: Arc<AuthorityState>,
148 process_via_checkpoint: HashSet<TransactionDigest>,
149 execute: bool,
150 mock_block_status_receivers: Vec<BlockStatusReceiver>,
151) -> Arc<ConsensusAdapter> {
152 let metrics = ConsensusAdapterMetrics::new_test();
153
154 #[derive(Clone)]
155 struct SubmitDirectly {
156 state: Arc<AuthorityState>,
157 process_via_checkpoint: HashSet<TransactionDigest>,
158 execute: bool,
159 mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
160 }
161
162 #[async_trait::async_trait]
163 impl ConsensusClient for SubmitDirectly {
164 async fn submit(
165 &self,
166 transactions: &[ConsensusTransaction],
167 epoch_store: &Arc<AuthorityPerEpochStore>,
168 ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
169 if transactions.is_empty() {
171 return Ok((
172 vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
173 with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
174 ));
175 }
176
177 let num_transactions = transactions.len();
178 let mut executed_via_checkpoint = 0;
179
180 for txn in transactions {
182 if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
183 let transaction_digest = cert.digest();
184 if self.process_via_checkpoint.contains(transaction_digest) {
185 epoch_store
186 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
187 .expect("Should not fail");
188 executed_via_checkpoint += 1;
189 }
190 } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
191 let transaction_digest = tx.digest();
192 if self.process_via_checkpoint.contains(transaction_digest) {
193 epoch_store
194 .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
195 .expect("Should not fail");
196 executed_via_checkpoint += 1;
197 }
198 }
199 }
200
201 let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
202 .iter()
203 .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
204 .collect();
205
206 let keys = sequenced_transactions
207 .iter()
208 .map(|tx| tx.key())
209 .collect::<Vec<_>>();
210
211 if self.execute {
213 for tx in sequenced_transactions {
214 if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
215 {
216 if self.process_via_checkpoint.contains(&transaction_digest) {
218 continue;
219 }
220
221 let executable_tx = match &tx.transaction {
223 SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
224 ConsensusTransactionKind::CertifiedTransaction(cert) => {
225 Some(VerifiedExecutableTransaction::new_from_certificate(
226 VerifiedCertificate::new_unchecked(*cert.clone()),
227 ))
228 }
229 ConsensusTransactionKind::UserTransaction(tx) => {
230 Some(VerifiedExecutableTransaction::new_from_consensus(
231 VerifiedTransaction::new_unchecked(*tx.clone()),
232 0,
233 ))
234 }
235 _ => None,
236 },
237 SequencedConsensusTransactionKind::System(sys_tx) => {
238 Some(sys_tx.clone())
239 }
240 };
241
242 if let Some(exec_tx) = executable_tx {
243 let versions = epoch_store.assign_shared_object_versions_for_tests(
244 self.state.get_object_cache_reader().as_ref(),
245 &vec![exec_tx.clone()],
246 )?;
247
248 let assigned_version = versions
249 .into_map()
250 .into_iter()
251 .next()
252 .map(|(_, v)| v)
253 .unwrap_or_default();
254
255 self.state.execution_scheduler().enqueue(
256 vec![(
257 Schedulable::Transaction(exec_tx),
258 ExecutionEnv::new().with_assigned_versions(assigned_version),
259 )],
260 epoch_store,
261 );
262 }
263 }
264 }
265 }
266
267 epoch_store.process_notifications(keys.iter());
268
269 assert_eq!(
270 executed_via_checkpoint,
271 self.process_via_checkpoint.len(),
272 "Some transactions were not executed via checkpoint"
273 );
274
275 assert!(
276 !self.mock_block_status_receivers.lock().is_empty(),
277 "No mock submit responses left"
278 );
279
280 let mut consensus_positions = Vec::new();
281 for index in 0..num_transactions {
282 consensus_positions.push(ConsensusPosition {
283 epoch: epoch_store.epoch(),
284 index: index as u16,
285 block: BlockRef::MIN,
286 });
287 }
288
289 Ok((
290 consensus_positions,
291 self.mock_block_status_receivers.lock().remove(0),
292 ))
293 }
294 }
295 let epoch_store = state.epoch_store_for_testing();
296 Arc::new(ConsensusAdapter::new(
298 Arc::new(SubmitDirectly {
299 state: state.clone(),
300 process_via_checkpoint,
301 execute,
302 mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
303 }),
304 state.checkpoint_store.clone(),
305 state.name,
306 Arc::new(ConnectionMonitorStatusForTests {}),
307 100_000,
308 100_000,
309 None,
310 None,
311 metrics,
312 epoch_store.protocol_config().clone(),
313 ))
314}
315
316pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
318 authority: &Arc<AuthorityState>,
319 checkpoint_service: Arc<C>,
320) -> TestConsensusHandlerSetup<C>
321where
322 C: Send + Sync + 'static,
323{
324 let epoch_store = authority.epoch_store_for_testing();
325 let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
326 let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
327 let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
328 let backpressure_manager = BackpressureManager::new_for_tests();
329 let consensus_adapter =
330 make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
331
332 let last_consensus_stats = ExecutionIndicesWithStats {
333 stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
334 consensus_committee.size(),
335 ),
336 ..Default::default()
337 };
338
339 let captured_transactions = Arc::new(Mutex::new(Vec::<(
341 Vec<Schedulable>,
342 AssignedTxAndVersions,
343 SchedulingSource,
344 )>::new()));
345 let captured_tx_clone = captured_transactions.clone();
346
347 let (tx_sender, mut receiver) =
349 mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
350
351 tokio::spawn(async move {
353 while let Some(item) = receiver.recv().await {
354 captured_tx_clone.lock().push(item);
355 }
356 });
357
358 let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
359
360 let consensus_handler = ConsensusHandler::new_for_testing(
361 epoch_store.clone(),
362 checkpoint_service,
363 execution_scheduler_sender,
364 consensus_adapter,
365 authority.get_object_cache_reader().clone(),
366 Arc::new(ArcSwap::default()),
367 consensus_committee,
368 metrics,
369 Arc::new(throughput_calculator),
370 backpressure_manager.subscribe(),
371 authority.traffic_controller.clone(),
372 last_consensus_stats,
373 );
374
375 TestConsensusHandlerSetup {
376 consensus_handler,
377 captured_transactions,
378 }
379}
380
381#[cfg(test)]
383pub async fn setup_consensus_handler_for_testing(
384 authority: &Arc<AuthorityState>,
385) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
386 setup_consensus_handler_for_testing_with_checkpoint_service(
387 authority,
388 Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
389 )
390 .await
391}