sui_core/unit_tests/
consensus_test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common test utilities for consensus handler testing
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18    AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24    AuthorityPerEpochStore, ExecutionIndicesWithStats,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::{AssignedTxAndVersions, Schedulable};
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30    BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31    ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34    ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35    SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::execution_scheduler::SchedulingSource;
40use crate::mock_consensus::with_block_status;
41
42pub(crate) type CapturedTransactions =
43    Arc<Mutex<Vec<(Vec<Schedulable>, AssignedTxAndVersions, SchedulingSource)>>>;
44
45pub struct TestConsensusCommit {
46    pub transactions: Vec<ConsensusTransaction>,
47    pub round: u64,
48    pub timestamp_ms: u64,
49    pub sub_dag_index: u64,
50}
51
52impl TestConsensusCommit {
53    pub fn new(
54        transactions: Vec<ConsensusTransaction>,
55        round: u64,
56        timestamp_ms: u64,
57        sub_dag_index: u64,
58    ) -> Self {
59        Self {
60            transactions,
61            round,
62            timestamp_ms,
63            sub_dag_index,
64        }
65    }
66
67    pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
68        Self {
69            transactions: vec![],
70            round,
71            timestamp_ms,
72            sub_dag_index,
73        }
74    }
75}
76
77impl std::fmt::Display for TestConsensusCommit {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
82            self.round, self.timestamp_ms, self.sub_dag_index
83        )
84    }
85}
86
87impl ConsensusCommitAPI for TestConsensusCommit {
88    fn commit_ref(&self) -> consensus_core::CommitRef {
89        consensus_core::CommitRef::default()
90    }
91
92    fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
93        None
94    }
95
96    fn leader_round(&self) -> u64 {
97        self.round
98    }
99
100    fn leader_author_index(&self) -> AuthorityIndex {
101        0
102    }
103
104    fn commit_timestamp_ms(&self) -> u64 {
105        self.timestamp_ms
106    }
107
108    fn commit_sub_dag_index(&self) -> u64 {
109        self.sub_dag_index
110    }
111
112    fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
113        let block_ref = BlockRef {
114            author: consensus_config::AuthorityIndex::ZERO,
115            round: self.round as u32,
116            digest: Default::default(),
117        };
118
119        let parsed_txs: Vec<ParsedTransaction> = self
120            .transactions
121            .iter()
122            .map(|tx| ParsedTransaction {
123                transaction: tx.clone(),
124                rejected: false,
125                serialized_len: 0,
126            })
127            .collect();
128
129        vec![(block_ref, parsed_txs)]
130    }
131
132    fn rejected_transactions_digest(&self) -> Digest {
133        Digest::default()
134    }
135
136    fn rejected_transactions_debug_string(&self) -> String {
137        "no rejected transactions from TestConsensusCommit".to_string()
138    }
139}
140
141pub struct TestConsensusHandlerSetup<C> {
142    pub consensus_handler: ConsensusHandler<C>,
143    pub captured_transactions: CapturedTransactions,
144}
145
146pub fn make_consensus_adapter_for_test(
147    state: Arc<AuthorityState>,
148    process_via_checkpoint: HashSet<TransactionDigest>,
149    execute: bool,
150    mock_block_status_receivers: Vec<BlockStatusReceiver>,
151) -> Arc<ConsensusAdapter> {
152    let metrics = ConsensusAdapterMetrics::new_test();
153
154    #[derive(Clone)]
155    struct SubmitDirectly {
156        state: Arc<AuthorityState>,
157        process_via_checkpoint: HashSet<TransactionDigest>,
158        execute: bool,
159        mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
160    }
161
162    #[async_trait::async_trait]
163    impl ConsensusClient for SubmitDirectly {
164        async fn submit(
165            &self,
166            transactions: &[ConsensusTransaction],
167            epoch_store: &Arc<AuthorityPerEpochStore>,
168        ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
169            // If transactions are empty, then we are performing a ping check and will attempt to ping consensus and simulate a transaction submission to consensus.
170            if transactions.is_empty() {
171                return Ok((
172                    vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
173                    with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
174                ));
175            }
176
177            let num_transactions = transactions.len();
178            let mut executed_via_checkpoint = 0;
179
180            // Simple processing - just mark transactions for checkpoint execution if needed
181            for txn in transactions {
182                if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
183                    let transaction_digest = cert.digest();
184                    if self.process_via_checkpoint.contains(transaction_digest) {
185                        epoch_store
186                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
187                            .expect("Should not fail");
188                        executed_via_checkpoint += 1;
189                    }
190                } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
191                    let transaction_digest = tx.digest();
192                    if self.process_via_checkpoint.contains(transaction_digest) {
193                        epoch_store
194                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
195                            .expect("Should not fail");
196                        executed_via_checkpoint += 1;
197                    }
198                }
199            }
200
201            let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
202                .iter()
203                .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
204                .collect();
205
206            let keys = sequenced_transactions
207                .iter()
208                .map(|tx| tx.key())
209                .collect::<Vec<_>>();
210
211            // Only execute transactions if explicitly requested and not via checkpoint
212            if self.execute {
213                for tx in sequenced_transactions {
214                    if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
215                    {
216                        // Skip if already executed via checkpoint
217                        if self.process_via_checkpoint.contains(&transaction_digest) {
218                            continue;
219                        }
220
221                        // Extract executable transaction from consensus transaction
222                        let executable_tx = match &tx.transaction {
223                            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
224                                ConsensusTransactionKind::CertifiedTransaction(cert) => {
225                                    Some(VerifiedExecutableTransaction::new_from_certificate(
226                                        VerifiedCertificate::new_unchecked(*cert.clone()),
227                                    ))
228                                }
229                                ConsensusTransactionKind::UserTransaction(tx) => {
230                                    Some(VerifiedExecutableTransaction::new_from_consensus(
231                                        VerifiedTransaction::new_unchecked(*tx.clone()),
232                                        0,
233                                    ))
234                                }
235                                _ => None,
236                            },
237                            SequencedConsensusTransactionKind::System(sys_tx) => {
238                                Some(sys_tx.clone())
239                            }
240                        };
241
242                        if let Some(exec_tx) = executable_tx {
243                            let versions = epoch_store.assign_shared_object_versions_for_tests(
244                                self.state.get_object_cache_reader().as_ref(),
245                                &vec![exec_tx.clone()],
246                            )?;
247
248                            let assigned_version = versions
249                                .into_map()
250                                .into_iter()
251                                .next()
252                                .map(|(_, v)| v)
253                                .unwrap_or_default();
254
255                            self.state.execution_scheduler().enqueue(
256                                vec![(
257                                    Schedulable::Transaction(exec_tx),
258                                    ExecutionEnv::new().with_assigned_versions(assigned_version),
259                                )],
260                                epoch_store,
261                            );
262                        }
263                    }
264                }
265            }
266
267            epoch_store.process_notifications(keys.iter());
268
269            assert_eq!(
270                executed_via_checkpoint,
271                self.process_via_checkpoint.len(),
272                "Some transactions were not executed via checkpoint"
273            );
274
275            assert!(
276                !self.mock_block_status_receivers.lock().is_empty(),
277                "No mock submit responses left"
278            );
279
280            let mut consensus_positions = Vec::new();
281            for index in 0..num_transactions {
282                consensus_positions.push(ConsensusPosition {
283                    epoch: epoch_store.epoch(),
284                    index: index as u16,
285                    block: BlockRef::MIN,
286                });
287            }
288
289            Ok((
290                consensus_positions,
291                self.mock_block_status_receivers.lock().remove(0),
292            ))
293        }
294    }
295    let epoch_store = state.epoch_store_for_testing();
296    // Make a new consensus adapter instance.
297    Arc::new(ConsensusAdapter::new(
298        Arc::new(SubmitDirectly {
299            state: state.clone(),
300            process_via_checkpoint,
301            execute,
302            mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
303        }),
304        state.checkpoint_store.clone(),
305        state.name,
306        Arc::new(ConnectionMonitorStatusForTests {}),
307        100_000,
308        100_000,
309        None,
310        None,
311        metrics,
312        epoch_store.protocol_config().clone(),
313    ))
314}
315
316/// Creates a ConsensusHandler for testing with a mock ExecutionSchedulerSender that captures transactions
317pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
318    authority: &Arc<AuthorityState>,
319    checkpoint_service: Arc<C>,
320) -> TestConsensusHandlerSetup<C>
321where
322    C: Send + Sync + 'static,
323{
324    let epoch_store = authority.epoch_store_for_testing();
325    let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
326    let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
327    let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
328    let backpressure_manager = BackpressureManager::new_for_tests();
329    let consensus_adapter =
330        make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
331
332    let last_consensus_stats = ExecutionIndicesWithStats {
333        stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
334            consensus_committee.size(),
335        ),
336        ..Default::default()
337    };
338
339    // Create a test ExecutionSchedulerSender that captures transactions
340    let captured_transactions = Arc::new(Mutex::new(Vec::<(
341        Vec<Schedulable>,
342        AssignedTxAndVersions,
343        SchedulingSource,
344    )>::new()));
345    let captured_tx_clone = captured_transactions.clone();
346
347    // Create a channel to capture sent transactions
348    let (tx_sender, mut receiver) =
349        mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
350
351    // Spawn a task to capture transactions from the channel
352    tokio::spawn(async move {
353        while let Some(item) = receiver.recv().await {
354            captured_tx_clone.lock().push(item);
355        }
356    });
357
358    let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
359
360    let consensus_handler = ConsensusHandler::new_for_testing(
361        epoch_store.clone(),
362        checkpoint_service,
363        execution_scheduler_sender,
364        consensus_adapter,
365        authority.get_object_cache_reader().clone(),
366        Arc::new(ArcSwap::default()),
367        consensus_committee,
368        metrics,
369        Arc::new(throughput_calculator),
370        backpressure_manager.subscribe(),
371        authority.traffic_controller.clone(),
372        last_consensus_stats,
373    );
374
375    TestConsensusHandlerSetup {
376        consensus_handler,
377        captured_transactions,
378    }
379}
380
381/// Creates a ConsensusHandler for testing with CheckpointServiceNoop
382#[cfg(test)]
383pub async fn setup_consensus_handler_for_testing(
384    authority: &Arc<AuthorityState>,
385) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
386    setup_consensus_handler_for_testing_with_checkpoint_service(
387        authority,
388        Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
389    )
390    .await
391}