sui_core/unit_tests/
consensus_test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common test utilities for consensus handler testing
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use consensus_core::BlockStatus;
10use consensus_types::block::BlockRef;
11use parking_lot::Mutex;
12use prometheus::Registry;
13use sui_types::digests::{Digest, TransactionDigest};
14use sui_types::error::SuiResult;
15use sui_types::executable_transaction::VerifiedExecutableTransaction;
16use sui_types::messages_consensus::{
17    AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
18};
19use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
20use sui_types::transaction::VerifiedTransaction;
21
22use crate::authority::authority_per_epoch_store::{
23    AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
24};
25use crate::authority::backpressure::BackpressureManager;
26use crate::authority::shared_object_version_manager::Schedulable;
27use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
28use crate::consensus_adapter::{
29    BlockStatusReceiver, ConsensusAdapter, ConsensusAdapterMetrics, ConsensusClient,
30};
31use crate::consensus_handler::{
32    ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
33    SequencedConsensusTransactionKind,
34};
35use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
36use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
37use crate::mock_consensus::with_block_status;
38
39pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
40
41pub struct TestConsensusCommit {
42    pub transactions: Vec<ConsensusTransaction>,
43    pub round: u64,
44    pub timestamp_ms: u64,
45    pub sub_dag_index: u64,
46}
47
48impl TestConsensusCommit {
49    pub fn new(
50        transactions: Vec<ConsensusTransaction>,
51        round: u64,
52        timestamp_ms: u64,
53        sub_dag_index: u64,
54    ) -> Self {
55        Self {
56            transactions,
57            round,
58            timestamp_ms,
59            sub_dag_index,
60        }
61    }
62
63    pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
64        Self {
65            transactions: vec![],
66            round,
67            timestamp_ms,
68            sub_dag_index,
69        }
70    }
71}
72
73impl std::fmt::Display for TestConsensusCommit {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(
76            f,
77            "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
78            self.round, self.timestamp_ms, self.sub_dag_index
79        )
80    }
81}
82
83impl ConsensusCommitAPI for TestConsensusCommit {
84    fn commit_ref(&self) -> consensus_core::CommitRef {
85        consensus_core::CommitRef::default()
86    }
87
88    fn leader_round(&self) -> u64 {
89        self.round
90    }
91
92    fn leader_author_index(&self) -> AuthorityIndex {
93        0
94    }
95
96    fn commit_timestamp_ms(&self) -> u64 {
97        self.timestamp_ms
98    }
99
100    fn commit_sub_dag_index(&self) -> u64 {
101        self.sub_dag_index
102    }
103
104    fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
105        let block_ref = BlockRef {
106            author: consensus_config::AuthorityIndex::ZERO,
107            round: self.round as u32,
108            digest: Default::default(),
109        };
110
111        let parsed_txs: Vec<ParsedTransaction> = self
112            .transactions
113            .iter()
114            .map(|tx| ParsedTransaction {
115                transaction: tx.clone(),
116                rejected: false,
117                serialized_len: 0,
118            })
119            .collect();
120
121        vec![(block_ref, parsed_txs)]
122    }
123
124    fn rejected_transactions_digest(&self) -> Digest {
125        Digest::default()
126    }
127
128    fn rejected_transactions_debug_string(&self) -> String {
129        "no rejected transactions from TestConsensusCommit".to_string()
130    }
131}
132
133pub struct TestConsensusHandlerSetup<C> {
134    pub consensus_handler: ConsensusHandler<C>,
135    pub captured_transactions: CapturedTransactions,
136}
137
138/// Makes a consensus adapter with the standard test wiring (limits, metrics), backed by the
139/// given consensus client.
140pub fn make_consensus_adapter_with_client_for_test(
141    state: &Arc<AuthorityState>,
142    client: Arc<dyn ConsensusClient>,
143    max_pending_local_submissions: usize,
144) -> Arc<ConsensusAdapter> {
145    Arc::new(ConsensusAdapter::new(
146        client,
147        state.checkpoint_store.clone(),
148        state.name,
149        100_000,
150        max_pending_local_submissions,
151        ConsensusAdapterMetrics::new_test(),
152        Arc::new(tokio::sync::Notify::new()),
153    ))
154}
155
156pub fn make_consensus_adapter_for_test(
157    state: Arc<AuthorityState>,
158    process_via_checkpoint: HashSet<TransactionDigest>,
159    execute: bool,
160    mock_block_status_receivers: Vec<BlockStatusReceiver>,
161) -> Arc<ConsensusAdapter> {
162    make_consensus_adapter_for_test_with_submit_limit(
163        state,
164        process_via_checkpoint,
165        execute,
166        mock_block_status_receivers,
167        100_000,
168    )
169}
170
171pub fn make_consensus_adapter_for_test_with_submit_limit(
172    state: Arc<AuthorityState>,
173    process_via_checkpoint: HashSet<TransactionDigest>,
174    execute: bool,
175    mock_block_status_receivers: Vec<BlockStatusReceiver>,
176    max_pending_local_submissions: usize,
177) -> Arc<ConsensusAdapter> {
178    #[derive(Clone)]
179    struct SubmitDirectly {
180        state: Arc<AuthorityState>,
181        process_via_checkpoint: HashSet<TransactionDigest>,
182        execute: bool,
183        mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
184    }
185
186    #[async_trait::async_trait]
187    impl ConsensusClient for SubmitDirectly {
188        async fn submit(
189            &self,
190            transactions: &[ConsensusTransaction],
191            epoch_store: &Arc<AuthorityPerEpochStore>,
192        ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
193            // If transactions are empty, then we are performing a ping check and will attempt to ping consensus and simulate a transaction submission to consensus.
194            if transactions.is_empty() {
195                return Ok((
196                    vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
197                    with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
198                ));
199            }
200
201            let num_transactions = transactions.len();
202            let mut executed_via_checkpoint = 0;
203
204            // Simple processing - just mark transactions for checkpoint execution if needed
205            for txn in transactions {
206                if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
207                    let transaction_digest = tx.tx().digest();
208                    if self.process_via_checkpoint.contains(transaction_digest) {
209                        epoch_store
210                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
211                            .expect("Should not fail");
212                        executed_via_checkpoint += 1;
213                    }
214                }
215            }
216
217            let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
218                .iter()
219                .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
220                .collect();
221
222            let keys = sequenced_transactions
223                .iter()
224                .map(|tx| tx.key())
225                .collect::<Vec<_>>();
226
227            // Only execute transactions if explicitly requested and not via checkpoint
228            if self.execute {
229                for tx in sequenced_transactions {
230                    if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
231                    {
232                        // Skip if already executed via checkpoint
233                        if self.process_via_checkpoint.contains(&transaction_digest) {
234                            continue;
235                        }
236
237                        // Extract executable transaction from consensus transaction
238                        let executable_tx = match &tx.transaction {
239                            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
240                                ConsensusTransactionKind::UserTransactionV2(tx) => {
241                                    Some(VerifiedExecutableTransaction::new_from_consensus(
242                                        VerifiedTransaction::new_unchecked(tx.tx().clone()),
243                                        0,
244                                    ))
245                                }
246                                _ => None,
247                            },
248                            SequencedConsensusTransactionKind::System(sys_tx) => {
249                                Some(sys_tx.clone())
250                            }
251                        };
252
253                        if let Some(exec_tx) = executable_tx {
254                            let versions = epoch_store.assign_shared_object_versions_for_tests(
255                                self.state.get_object_cache_reader().as_ref(),
256                                std::slice::from_ref(&exec_tx),
257                            )?;
258
259                            let assigned_version = versions
260                                .into_map()
261                                .into_iter()
262                                .next()
263                                .map(|(_, v)| v)
264                                .unwrap_or_default();
265
266                            self.state.execution_scheduler().enqueue(
267                                vec![(
268                                    Schedulable::Transaction(exec_tx),
269                                    ExecutionEnv::new().with_assigned_versions(assigned_version),
270                                )],
271                                epoch_store,
272                            );
273                        }
274                    }
275                }
276            }
277
278            epoch_store.process_notifications(keys.iter());
279
280            assert_eq!(
281                executed_via_checkpoint,
282                self.process_via_checkpoint.len(),
283                "Some transactions were not executed via checkpoint"
284            );
285
286            assert!(
287                !self.mock_block_status_receivers.lock().is_empty(),
288                "No mock submit responses left"
289            );
290
291            let mut consensus_positions = Vec::new();
292            for index in 0..num_transactions {
293                consensus_positions.push(ConsensusPosition {
294                    epoch: epoch_store.epoch(),
295                    index: index as u16,
296                    block: BlockRef::MIN,
297                });
298            }
299
300            Ok((
301                consensus_positions,
302                self.mock_block_status_receivers.lock().remove(0),
303            ))
304        }
305    }
306    // Make a new consensus adapter instance.
307    let client = Arc::new(SubmitDirectly {
308        state: state.clone(),
309        process_via_checkpoint,
310        execute,
311        mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
312    });
313    make_consensus_adapter_with_client_for_test(&state, client, max_pending_local_submissions)
314}
315
316/// Creates a ConsensusHandler for testing with a mock ExecutionSchedulerSender that captures transactions
317pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
318    authority: &Arc<AuthorityState>,
319    checkpoint_service: Arc<C>,
320) -> TestConsensusHandlerSetup<C>
321where
322    C: Send + Sync + 'static,
323{
324    let epoch_store = authority.epoch_store_for_testing();
325    let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
326    let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
327    let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
328    let backpressure_manager = BackpressureManager::new_for_tests();
329    let consensus_adapter =
330        make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
331
332    let last_consensus_stats = ExecutionIndicesWithStatsV2 {
333        stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
334            consensus_committee.size(),
335        ),
336        ..Default::default()
337    };
338
339    let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
340    let captured_tx_clone = captured_transactions.clone();
341
342    let (tx_sender, mut receiver) =
343        mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
344
345    tokio::spawn(async move {
346        while let Some(item) = receiver.recv().await {
347            captured_tx_clone.lock().push(item);
348        }
349    });
350
351    let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
352
353    let consensus_handler = ConsensusHandler::new_for_testing(
354        epoch_store.clone(),
355        checkpoint_service,
356        execution_scheduler_sender,
357        consensus_adapter,
358        authority.get_object_cache_reader().clone(),
359        consensus_committee,
360        metrics,
361        Arc::new(throughput_calculator),
362        backpressure_manager.subscribe(),
363        authority.traffic_controller.clone(),
364        last_consensus_stats,
365    );
366
367    TestConsensusHandlerSetup {
368        consensus_handler,
369        captured_transactions,
370    }
371}
372
373/// Creates a ConsensusHandler for testing with CheckpointServiceNoop
374#[cfg(test)]
375pub async fn setup_consensus_handler_for_testing(
376    authority: &Arc<AuthorityState>,
377) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
378    setup_consensus_handler_for_testing_with_checkpoint_service(
379        authority,
380        Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
381    )
382    .await
383}