sui_core/unit_tests/
consensus_test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common test utilities for consensus handler testing
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use arc_swap::ArcSwap;
10use consensus_core::BlockStatus;
11use consensus_types::block::BlockRef;
12use parking_lot::Mutex;
13use prometheus::Registry;
14use sui_types::digests::{Digest, TransactionDigest};
15use sui_types::error::SuiResult;
16use sui_types::executable_transaction::VerifiedExecutableTransaction;
17use sui_types::messages_consensus::{
18    AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
19};
20use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
21use sui_types::transaction::{VerifiedCertificate, VerifiedTransaction};
22
23use crate::authority::authority_per_epoch_store::{
24    AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
25};
26use crate::authority::backpressure::BackpressureManager;
27use crate::authority::shared_object_version_manager::Schedulable;
28use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
29use crate::consensus_adapter::{
30    BlockStatusReceiver, ConnectionMonitorStatusForTests, ConsensusAdapter,
31    ConsensusAdapterMetrics, ConsensusClient,
32};
33use crate::consensus_handler::{
34    ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
35    SequencedConsensusTransactionKind,
36};
37use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
38use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
39use crate::mock_consensus::with_block_status;
40
41pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
42
43pub struct TestConsensusCommit {
44    pub transactions: Vec<ConsensusTransaction>,
45    pub round: u64,
46    pub timestamp_ms: u64,
47    pub sub_dag_index: u64,
48}
49
50impl TestConsensusCommit {
51    pub fn new(
52        transactions: Vec<ConsensusTransaction>,
53        round: u64,
54        timestamp_ms: u64,
55        sub_dag_index: u64,
56    ) -> Self {
57        Self {
58            transactions,
59            round,
60            timestamp_ms,
61            sub_dag_index,
62        }
63    }
64
65    pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
66        Self {
67            transactions: vec![],
68            round,
69            timestamp_ms,
70            sub_dag_index,
71        }
72    }
73}
74
75impl std::fmt::Display for TestConsensusCommit {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(
78            f,
79            "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
80            self.round, self.timestamp_ms, self.sub_dag_index
81        )
82    }
83}
84
85impl ConsensusCommitAPI for TestConsensusCommit {
86    fn commit_ref(&self) -> consensus_core::CommitRef {
87        consensus_core::CommitRef::default()
88    }
89
90    fn reputation_score_sorted_desc(&self) -> Option<Vec<(AuthorityIndex, u64)>> {
91        None
92    }
93
94    fn leader_round(&self) -> u64 {
95        self.round
96    }
97
98    fn leader_author_index(&self) -> AuthorityIndex {
99        0
100    }
101
102    fn commit_timestamp_ms(&self) -> u64 {
103        self.timestamp_ms
104    }
105
106    fn commit_sub_dag_index(&self) -> u64 {
107        self.sub_dag_index
108    }
109
110    fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
111        let block_ref = BlockRef {
112            author: consensus_config::AuthorityIndex::ZERO,
113            round: self.round as u32,
114            digest: Default::default(),
115        };
116
117        let parsed_txs: Vec<ParsedTransaction> = self
118            .transactions
119            .iter()
120            .map(|tx| ParsedTransaction {
121                transaction: tx.clone(),
122                rejected: false,
123                serialized_len: 0,
124            })
125            .collect();
126
127        vec![(block_ref, parsed_txs)]
128    }
129
130    fn rejected_transactions_digest(&self) -> Digest {
131        Digest::default()
132    }
133
134    fn rejected_transactions_debug_string(&self) -> String {
135        "no rejected transactions from TestConsensusCommit".to_string()
136    }
137}
138
139pub struct TestConsensusHandlerSetup<C> {
140    pub consensus_handler: ConsensusHandler<C>,
141    pub captured_transactions: CapturedTransactions,
142}
143
144pub fn make_consensus_adapter_for_test(
145    state: Arc<AuthorityState>,
146    process_via_checkpoint: HashSet<TransactionDigest>,
147    execute: bool,
148    mock_block_status_receivers: Vec<BlockStatusReceiver>,
149) -> Arc<ConsensusAdapter> {
150    let metrics = ConsensusAdapterMetrics::new_test();
151
152    #[derive(Clone)]
153    struct SubmitDirectly {
154        state: Arc<AuthorityState>,
155        process_via_checkpoint: HashSet<TransactionDigest>,
156        execute: bool,
157        mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
158    }
159
160    #[async_trait::async_trait]
161    impl ConsensusClient for SubmitDirectly {
162        async fn submit(
163            &self,
164            transactions: &[ConsensusTransaction],
165            epoch_store: &Arc<AuthorityPerEpochStore>,
166        ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
167            // If transactions are empty, then we are performing a ping check and will attempt to ping consensus and simulate a transaction submission to consensus.
168            if transactions.is_empty() {
169                return Ok((
170                    vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
171                    with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
172                ));
173            }
174
175            let num_transactions = transactions.len();
176            let mut executed_via_checkpoint = 0;
177
178            // Simple processing - just mark transactions for checkpoint execution if needed
179            for txn in transactions {
180                if let ConsensusTransactionKind::CertifiedTransaction(cert) = &txn.kind {
181                    let transaction_digest = cert.digest();
182                    if self.process_via_checkpoint.contains(transaction_digest) {
183                        epoch_store
184                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
185                            .expect("Should not fail");
186                        executed_via_checkpoint += 1;
187                    }
188                } else if let ConsensusTransactionKind::UserTransaction(tx) = &txn.kind {
189                    let transaction_digest = tx.digest();
190                    if self.process_via_checkpoint.contains(transaction_digest) {
191                        epoch_store
192                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
193                            .expect("Should not fail");
194                        executed_via_checkpoint += 1;
195                    }
196                } else if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
197                    let transaction_digest = tx.tx().digest();
198                    if self.process_via_checkpoint.contains(transaction_digest) {
199                        epoch_store
200                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
201                            .expect("Should not fail");
202                        executed_via_checkpoint += 1;
203                    }
204                }
205            }
206
207            let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
208                .iter()
209                .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
210                .collect();
211
212            let keys = sequenced_transactions
213                .iter()
214                .map(|tx| tx.key())
215                .collect::<Vec<_>>();
216
217            // Only execute transactions if explicitly requested and not via checkpoint
218            if self.execute {
219                for tx in sequenced_transactions {
220                    if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
221                    {
222                        // Skip if already executed via checkpoint
223                        if self.process_via_checkpoint.contains(&transaction_digest) {
224                            continue;
225                        }
226
227                        // Extract executable transaction from consensus transaction
228                        let executable_tx = match &tx.transaction {
229                            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
230                                ConsensusTransactionKind::CertifiedTransaction(cert) => {
231                                    Some(VerifiedExecutableTransaction::new_from_certificate(
232                                        VerifiedCertificate::new_unchecked(*cert.clone()),
233                                    ))
234                                }
235                                ConsensusTransactionKind::UserTransaction(tx) => {
236                                    Some(VerifiedExecutableTransaction::new_from_consensus(
237                                        VerifiedTransaction::new_unchecked(*tx.clone()),
238                                        0,
239                                    ))
240                                }
241                                ConsensusTransactionKind::UserTransactionV2(tx) => {
242                                    Some(VerifiedExecutableTransaction::new_from_consensus(
243                                        VerifiedTransaction::new_unchecked(tx.tx().clone()),
244                                        0,
245                                    ))
246                                }
247                                _ => None,
248                            },
249                            SequencedConsensusTransactionKind::System(sys_tx) => {
250                                Some(sys_tx.clone())
251                            }
252                        };
253
254                        if let Some(exec_tx) = executable_tx {
255                            let versions = epoch_store.assign_shared_object_versions_for_tests(
256                                self.state.get_object_cache_reader().as_ref(),
257                                std::slice::from_ref(&exec_tx),
258                            )?;
259
260                            let assigned_version = versions
261                                .into_map()
262                                .into_iter()
263                                .next()
264                                .map(|(_, v)| v)
265                                .unwrap_or_default();
266
267                            self.state.execution_scheduler().enqueue(
268                                vec![(
269                                    Schedulable::Transaction(exec_tx),
270                                    ExecutionEnv::new().with_assigned_versions(assigned_version),
271                                )],
272                                epoch_store,
273                            );
274                        }
275                    }
276                }
277            }
278
279            epoch_store.process_notifications(keys.iter());
280
281            assert_eq!(
282                executed_via_checkpoint,
283                self.process_via_checkpoint.len(),
284                "Some transactions were not executed via checkpoint"
285            );
286
287            assert!(
288                !self.mock_block_status_receivers.lock().is_empty(),
289                "No mock submit responses left"
290            );
291
292            let mut consensus_positions = Vec::new();
293            for index in 0..num_transactions {
294                consensus_positions.push(ConsensusPosition {
295                    epoch: epoch_store.epoch(),
296                    index: index as u16,
297                    block: BlockRef::MIN,
298                });
299            }
300
301            Ok((
302                consensus_positions,
303                self.mock_block_status_receivers.lock().remove(0),
304            ))
305        }
306    }
307    let epoch_store = state.epoch_store_for_testing();
308    // Make a new consensus adapter instance.
309    Arc::new(ConsensusAdapter::new(
310        Arc::new(SubmitDirectly {
311            state: state.clone(),
312            process_via_checkpoint,
313            execute,
314            mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
315        }),
316        state.checkpoint_store.clone(),
317        state.name,
318        Arc::new(ConnectionMonitorStatusForTests {}),
319        100_000,
320        100_000,
321        None,
322        None,
323        metrics,
324        epoch_store.protocol_config().clone(),
325    ))
326}
327
328/// Creates a ConsensusHandler for testing with a mock ExecutionSchedulerSender that captures transactions
329pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
330    authority: &Arc<AuthorityState>,
331    checkpoint_service: Arc<C>,
332) -> TestConsensusHandlerSetup<C>
333where
334    C: Send + Sync + 'static,
335{
336    let epoch_store = authority.epoch_store_for_testing();
337    let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
338    let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
339    let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
340    let backpressure_manager = BackpressureManager::new_for_tests();
341    let consensus_adapter =
342        make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
343
344    let last_consensus_stats = ExecutionIndicesWithStatsV2 {
345        stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
346            consensus_committee.size(),
347        ),
348        ..Default::default()
349    };
350
351    let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
352    let captured_tx_clone = captured_transactions.clone();
353
354    let (tx_sender, mut receiver) =
355        mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
356
357    tokio::spawn(async move {
358        while let Some(item) = receiver.recv().await {
359            captured_tx_clone.lock().push(item);
360        }
361    });
362
363    let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
364
365    let consensus_handler = ConsensusHandler::new_for_testing(
366        epoch_store.clone(),
367        checkpoint_service,
368        execution_scheduler_sender,
369        consensus_adapter,
370        authority.get_object_cache_reader().clone(),
371        Arc::new(ArcSwap::default()),
372        consensus_committee,
373        metrics,
374        Arc::new(throughput_calculator),
375        backpressure_manager.subscribe(),
376        authority.traffic_controller.clone(),
377        last_consensus_stats,
378    );
379
380    TestConsensusHandlerSetup {
381        consensus_handler,
382        captured_transactions,
383    }
384}
385
386/// Creates a ConsensusHandler for testing with CheckpointServiceNoop
387#[cfg(test)]
388pub async fn setup_consensus_handler_for_testing(
389    authority: &Arc<AuthorityState>,
390) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
391    setup_consensus_handler_for_testing_with_checkpoint_service(
392        authority,
393        Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
394    )
395    .await
396}