sui_core/unit_tests/
consensus_test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common test utilities for consensus handler testing
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18    AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24    AuthorityPerEpochStore, ExecutionIndicesWithStats,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::{AssignedTxAndVersions, Schedulable};
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30    BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31    ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34    ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35    SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::execution_scheduler::SchedulingSource;
40use crate::mock_consensus::with_block_status;
41
42pub(crate) type CapturedTransactions =
43    Arc<Mutex<Vec<(Vec<Schedulable>, AssignedTxAndVersions, SchedulingSource)>>>;
44
45pub struct TestConsensusCommit {
46    pub transactions: Vec<ConsensusTransaction>,
47    pub round: u64,
48    pub timestamp_ms: u64,
49    pub sub_dag_index: u64,
50}
51
52impl TestConsensusCommit {
53    pub fn new(
54        transactions: Vec<ConsensusTransaction>,
55        round: u64,
56        timestamp_ms: u64,
57        sub_dag_index: u64,
58    ) -> Self {
59        Self {
60            transactions,
61            round,
62            timestamp_ms,
63            sub_dag_index,
64        }
65    }
66
67    pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
68        Self {
69            transactions: vec![],
70            round,
71            timestamp_ms,
72            sub_dag_index,
73        }
74    }
75}
76
77impl std::fmt::Display for TestConsensusCommit {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
82            self.round, self.timestamp_ms, self.sub_dag_index
83        )
84    }
85}
86
87impl ConsensusCommitAPI for TestConsensusCommit {
88    fn commit_ref(&self) -> consensus_core::CommitRef {
89        consensus_core::CommitRef::default()
90    }
91
92    fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
93        None
94    }
95
96    fn leader_round(&self) -> u64 {
97        self.round
98    }
99
100    fn leader_author_index(&self) -> AuthorityIndex {
101        0
102    }
103
104    fn commit_timestamp_ms(&self) -> u64 {
105        self.timestamp_ms
106    }
107
108    fn commit_sub_dag_index(&self) -> u64 {
109        self.sub_dag_index
110    }
111
112    fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
113        let block_ref = BlockRef {
114            author: consensus_config::AuthorityIndex::ZERO,
115            round: self.round as u32,
116            digest: Default::default(),
117        };
118
119        let parsed_txs: Vec<ParsedTransaction> = self
120            .transactions
121            .iter()
122            .map(|tx| ParsedTransaction {
123                transaction: tx.clone(),
124                rejected: false,
125                serialized_len: 0,
126            })
127            .collect();
128
129        vec![(block_ref, parsed_txs)]
130    }
131
132    fn rejected_transactions_digest(&self) -> Digest {
133        Digest::default()
134    }
135
136    fn rejected_transactions_debug_string(&self) -> String {
137        "no rejected transactions from TestConsensusCommit".to_string()
138    }
139}
140
141pub struct TestConsensusHandlerSetup<C> {
142    pub consensus_handler: ConsensusHandler<C>,
143    pub captured_transactions: CapturedTransactions,
144}
145
146pub fn make_consensus_adapter_for_test(
147    state: Arc<AuthorityState>,
148    process_via_checkpoint: HashSet<TransactionDigest>,
149    execute: bool,
150    mock_block_status_receivers: Vec<BlockStatusReceiver>,
151) -> Arc<ConsensusAdapter> {
152    let metrics = ConsensusAdapterMetrics::new_test();
153
154    #[derive(Clone)]
155    struct SubmitDirectly {
156        state: Arc<AuthorityState>,
157        process_via_checkpoint: HashSet<TransactionDigest>,
158        execute: bool,
159        mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
160    }
161
162    #[async_trait::async_trait]
163    impl ConsensusClient for SubmitDirectly {
164        async fn submit(
165            &self,
166            transactions: &[ConsensusTransaction],
167            epoch_store: &Arc<AuthorityPerEpochStore>,
168        ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
169            // If transactions are empty, then we are performing a ping check and will attempt to ping consensus and simulate a transaction submission to consensus.
170            if transactions.is_empty() {
171                return Ok((
172                    vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
173                    with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
174                ));
175            }
176
177            let num_transactions = transactions.len();
178            let mut executed_via_checkpoint = 0;
179
180            // Simple processing - just mark transactions for checkpoint execution if needed
181            for txn in transactions {
182                if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
183                    let transaction_digest = cert.digest();
184                    if self.process_via_checkpoint.contains(transaction_digest) {
185                        epoch_store
186                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
187                            .expect("Should not fail");
188                        executed_via_checkpoint += 1;
189                    }
190                } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
191                    let transaction_digest = tx.digest();
192                    if self.process_via_checkpoint.contains(transaction_digest) {
193                        epoch_store
194                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
195                            .expect("Should not fail");
196                        executed_via_checkpoint += 1;
197                    }
198                } else if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
199                    let transaction_digest = tx.tx().digest();
200                    if self.process_via_checkpoint.contains(transaction_digest) {
201                        epoch_store
202                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
203                            .expect("Should not fail");
204                        executed_via_checkpoint += 1;
205                    }
206                }
207            }
208
209            let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
210                .iter()
211                .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
212                .collect();
213
214            let keys = sequenced_transactions
215                .iter()
216                .map(|tx| tx.key())
217                .collect::<Vec<_>>();
218
219            // Only execute transactions if explicitly requested and not via checkpoint
220            if self.execute {
221                for tx in sequenced_transactions {
222                    if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
223                    {
224                        // Skip if already executed via checkpoint
225                        if self.process_via_checkpoint.contains(&transaction_digest) {
226                            continue;
227                        }
228
229                        // Extract executable transaction from consensus transaction
230                        let executable_tx = match &tx.transaction {
231                            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
232                                ConsensusTransactionKind::CertifiedTransaction(cert) => {
233                                    Some(VerifiedExecutableTransaction::new_from_certificate(
234                                        VerifiedCertificate::new_unchecked(*cert.clone()),
235                                    ))
236                                }
237                                ConsensusTransactionKind::UserTransaction(tx) => {
238                                    Some(VerifiedExecutableTransaction::new_from_consensus(
239                                        VerifiedTransaction::new_unchecked(*tx.clone()),
240                                        0,
241                                    ))
242                                }
243                                ConsensusTransactionKind::UserTransactionV2(tx) => {
244                                    Some(VerifiedExecutableTransaction::new_from_consensus(
245                                        VerifiedTransaction::new_unchecked(tx.tx().clone()),
246                                        0,
247                                    ))
248                                }
249                                _ => None,
250                            },
251                            SequencedConsensusTransactionKind::System(sys_tx) => {
252                                Some(sys_tx.clone())
253                            }
254                        };
255
256                        if let Some(exec_tx) = executable_tx {
257                            let versions = epoch_store.assign_shared_object_versions_for_tests(
258                                self.state.get_object_cache_reader().as_ref(),
259                                &vec![exec_tx.clone()],
260                            )?;
261
262                            let assigned_version = versions
263                                .into_map()
264                                .into_iter()
265                                .next()
266                                .map(|(_, v)| v)
267                                .unwrap_or_default();
268
269                            self.state.execution_scheduler().enqueue(
270                                vec![(
271                                    Schedulable::Transaction(exec_tx),
272                                    ExecutionEnv::new().with_assigned_versions(assigned_version),
273                                )],
274                                epoch_store,
275                            );
276                        }
277                    }
278                }
279            }
280
281            epoch_store.process_notifications(keys.iter());
282
283            assert_eq!(
284                executed_via_checkpoint,
285                self.process_via_checkpoint.len(),
286                "Some transactions were not executed via checkpoint"
287            );
288
289            assert!(
290                !self.mock_block_status_receivers.lock().is_empty(),
291                "No mock submit responses left"
292            );
293
294            let mut consensus_positions = Vec::new();
295            for index in 0..num_transactions {
296                consensus_positions.push(ConsensusPosition {
297                    epoch: epoch_store.epoch(),
298                    index: index as u16,
299                    block: BlockRef::MIN,
300                });
301            }
302
303            Ok((
304                consensus_positions,
305                self.mock_block_status_receivers.lock().remove(0),
306            ))
307        }
308    }
309    let epoch_store = state.epoch_store_for_testing();
310    // Make a new consensus adapter instance.
311    Arc::new(ConsensusAdapter::new(
312        Arc::new(SubmitDirectly {
313            state: state.clone(),
314            process_via_checkpoint,
315            execute,
316            mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
317        }),
318        state.checkpoint_store.clone(),
319        state.name,
320        Arc::new(ConnectionMonitorStatusForTests {}),
321        100_000,
322        100_000,
323        None,
324        None,
325        metrics,
326        epoch_store.protocol_config().clone(),
327    ))
328}
329
330/// Creates a ConsensusHandler for testing with a mock ExecutionSchedulerSender that captures transactions
331pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
332    authority: &Arc<AuthorityState>,
333    checkpoint_service: Arc<C>,
334) -> TestConsensusHandlerSetup<C>
335where
336    C: Send + Sync + 'static,
337{
338    let epoch_store = authority.epoch_store_for_testing();
339    let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
340    let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
341    let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
342    let backpressure_manager = BackpressureManager::new_for_tests();
343    let consensus_adapter =
344        make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
345
346    let last_consensus_stats = ExecutionIndicesWithStats {
347        stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
348            consensus_committee.size(),
349        ),
350        ..Default::default()
351    };
352
353    // Create a test ExecutionSchedulerSender that captures transactions
354    let captured_transactions = Arc::new(Mutex::new(Vec::<(
355        Vec<Schedulable>,
356        AssignedTxAndVersions,
357        SchedulingSource,
358    )>::new()));
359    let captured_tx_clone = captured_transactions.clone();
360
361    // Create a channel to capture sent transactions
362    let (tx_sender, mut receiver) =
363        mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
364
365    // Spawn a task to capture transactions from the channel
366    tokio::spawn(async move {
367        while let Some(item) = receiver.recv().await {
368            captured_tx_clone.lock().push(item);
369        }
370    });
371
372    let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
373
374    let consensus_handler = ConsensusHandler::new_for_testing(
375        epoch_store.clone(),
376        checkpoint_service,
377        execution_scheduler_sender,
378        consensus_adapter,
379        authority.get_object_cache_reader().clone(),
380        Arc::new(ArcSwap::default()),
381        consensus_committee,
382        metrics,
383        Arc::new(throughput_calculator),
384        backpressure_manager.subscribe(),
385        authority.traffic_controller.clone(),
386        last_consensus_stats,
387    );
388
389    TestConsensusHandlerSetup {
390        consensus_handler,
391        captured_transactions,
392    }
393}
394
395/// Creates a ConsensusHandler for testing with CheckpointServiceNoop
396#[cfg(test)]
397pub async fn setup_consensus_handler_for_testing(
398    authority: &Arc<AuthorityState>,
399) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
400    setup_consensus_handler_for_testing_with_checkpoint_service(
401        authority,
402        Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
403    )
404    .await
405}