sui_core/unit_tests/
consensus_test_utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common test utilities for consensus handler testing
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use consensus_core::BlockStatus;
10use consensus_types::block::BlockRef;
11use parking_lot::Mutex;
12use prometheus::Registry;
13use sui_types::digests::{Digest, TransactionDigest};
14use sui_types::error::SuiResult;
15use sui_types::executable_transaction::VerifiedExecutableTransaction;
16use sui_types::messages_consensus::{
17    AuthorityIndex, ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind,
18};
19use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
20use sui_types::transaction::VerifiedTransaction;
21
22use crate::authority::authority_per_epoch_store::{
23    AuthorityPerEpochStore, ExecutionIndicesWithStatsV2,
24};
25use crate::authority::backpressure::BackpressureManager;
26use crate::authority::shared_object_version_manager::Schedulable;
27use crate::authority::{AuthorityMetrics, AuthorityState, ExecutionEnv};
28use crate::consensus_adapter::{
29    BlockStatusReceiver, ConsensusAdapter, ConsensusAdapterMetrics, ConsensusClient,
30};
31use crate::consensus_handler::{
32    ConsensusHandler, ExecutionSchedulerSender, SequencedConsensusTransaction,
33    SequencedConsensusTransactionKind,
34};
35use crate::consensus_throughput_calculator::ConsensusThroughputCalculator;
36use crate::consensus_types::consensus_output_api::{ConsensusCommitAPI, ParsedTransaction};
37use crate::mock_consensus::with_block_status;
38
39pub(crate) type CapturedTransactions = Arc<Mutex<Vec<crate::consensus_handler::SchedulerMessage>>>;
40
41pub struct TestConsensusCommit {
42    pub transactions: Vec<ConsensusTransaction>,
43    pub round: u64,
44    pub timestamp_ms: u64,
45    pub sub_dag_index: u64,
46}
47
48impl TestConsensusCommit {
49    pub fn new(
50        transactions: Vec<ConsensusTransaction>,
51        round: u64,
52        timestamp_ms: u64,
53        sub_dag_index: u64,
54    ) -> Self {
55        Self {
56            transactions,
57            round,
58            timestamp_ms,
59            sub_dag_index,
60        }
61    }
62
63    pub fn empty(round: u64, timestamp_ms: u64, sub_dag_index: u64) -> Self {
64        Self {
65            transactions: vec![],
66            round,
67            timestamp_ms,
68            sub_dag_index,
69        }
70    }
71}
72
73impl std::fmt::Display for TestConsensusCommit {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(
76            f,
77            "TestConsensusCommitAPI(round={}, timestamp_ms={}, sub_dag_index={})",
78            self.round, self.timestamp_ms, self.sub_dag_index
79        )
80    }
81}
82
83impl ConsensusCommitAPI for TestConsensusCommit {
84    fn commit_ref(&self) -> consensus_core::CommitRef {
85        consensus_core::CommitRef::default()
86    }
87
88    fn leader_round(&self) -> u64 {
89        self.round
90    }
91
92    fn leader_author_index(&self) -> AuthorityIndex {
93        0
94    }
95
96    fn commit_timestamp_ms(&self) -> u64 {
97        self.timestamp_ms
98    }
99
100    fn commit_sub_dag_index(&self) -> u64 {
101        self.sub_dag_index
102    }
103
104    fn transactions(&self) -> Vec<(BlockRef, Vec<ParsedTransaction>)> {
105        let block_ref = BlockRef {
106            author: consensus_config::AuthorityIndex::ZERO,
107            round: self.round as u32,
108            digest: Default::default(),
109        };
110
111        let parsed_txs: Vec<ParsedTransaction> = self
112            .transactions
113            .iter()
114            .map(|tx| ParsedTransaction {
115                transaction: tx.clone(),
116                rejected: false,
117                serialized_len: 0,
118            })
119            .collect();
120
121        vec![(block_ref, parsed_txs)]
122    }
123
124    fn rejected_transactions_digest(&self) -> Digest {
125        Digest::default()
126    }
127
128    fn rejected_transactions_debug_string(&self) -> String {
129        "no rejected transactions from TestConsensusCommit".to_string()
130    }
131}
132
133pub struct TestConsensusHandlerSetup<C> {
134    pub consensus_handler: ConsensusHandler<C>,
135    pub captured_transactions: CapturedTransactions,
136}
137
138pub fn make_consensus_adapter_for_test(
139    state: Arc<AuthorityState>,
140    process_via_checkpoint: HashSet<TransactionDigest>,
141    execute: bool,
142    mock_block_status_receivers: Vec<BlockStatusReceiver>,
143) -> Arc<ConsensusAdapter> {
144    let metrics = ConsensusAdapterMetrics::new_test();
145
146    #[derive(Clone)]
147    struct SubmitDirectly {
148        state: Arc<AuthorityState>,
149        process_via_checkpoint: HashSet<TransactionDigest>,
150        execute: bool,
151        mock_block_status_receivers: Arc<Mutex<Vec<BlockStatusReceiver>>>,
152    }
153
154    #[async_trait::async_trait]
155    impl ConsensusClient for SubmitDirectly {
156        async fn submit(
157            &self,
158            transactions: &[ConsensusTransaction],
159            epoch_store: &Arc<AuthorityPerEpochStore>,
160        ) -> SuiResult<(Vec<ConsensusPosition>, BlockStatusReceiver)> {
161            // If transactions are empty, then we are performing a ping check and will attempt to ping consensus and simulate a transaction submission to consensus.
162            if transactions.is_empty() {
163                return Ok((
164                    vec![ConsensusPosition::ping(epoch_store.epoch(), BlockRef::MIN)],
165                    with_block_status(BlockStatus::Sequenced(BlockRef::MIN)),
166                ));
167            }
168
169            let num_transactions = transactions.len();
170            let mut executed_via_checkpoint = 0;
171
172            // Simple processing - just mark transactions for checkpoint execution if needed
173            for txn in transactions {
174                if let ConsensusTransactionKind::UserTransactionV2(tx) = &txn.kind {
175                    let transaction_digest = tx.tx().digest();
176                    if self.process_via_checkpoint.contains(transaction_digest) {
177                        epoch_store
178                            .insert_finalized_transactions(vec![*transaction_digest].as_slice(), 10)
179                            .expect("Should not fail");
180                        executed_via_checkpoint += 1;
181                    }
182                }
183            }
184
185            let sequenced_transactions: Vec<SequencedConsensusTransaction> = transactions
186                .iter()
187                .map(|txn| SequencedConsensusTransaction::new_test(txn.clone()))
188                .collect();
189
190            let keys = sequenced_transactions
191                .iter()
192                .map(|tx| tx.key())
193                .collect::<Vec<_>>();
194
195            // Only execute transactions if explicitly requested and not via checkpoint
196            if self.execute {
197                for tx in sequenced_transactions {
198                    if let Some(transaction_digest) = tx.transaction.executable_transaction_digest()
199                    {
200                        // Skip if already executed via checkpoint
201                        if self.process_via_checkpoint.contains(&transaction_digest) {
202                            continue;
203                        }
204
205                        // Extract executable transaction from consensus transaction
206                        let executable_tx = match &tx.transaction {
207                            SequencedConsensusTransactionKind::External(ext) => match &ext.kind {
208                                ConsensusTransactionKind::UserTransactionV2(tx) => {
209                                    Some(VerifiedExecutableTransaction::new_from_consensus(
210                                        VerifiedTransaction::new_unchecked(tx.tx().clone()),
211                                        0,
212                                    ))
213                                }
214                                _ => None,
215                            },
216                            SequencedConsensusTransactionKind::System(sys_tx) => {
217                                Some(sys_tx.clone())
218                            }
219                        };
220
221                        if let Some(exec_tx) = executable_tx {
222                            let versions = epoch_store.assign_shared_object_versions_for_tests(
223                                self.state.get_object_cache_reader().as_ref(),
224                                std::slice::from_ref(&exec_tx),
225                            )?;
226
227                            let assigned_version = versions
228                                .into_map()
229                                .into_iter()
230                                .next()
231                                .map(|(_, v)| v)
232                                .unwrap_or_default();
233
234                            self.state.execution_scheduler().enqueue(
235                                vec![(
236                                    Schedulable::Transaction(exec_tx),
237                                    ExecutionEnv::new().with_assigned_versions(assigned_version),
238                                )],
239                                epoch_store,
240                            );
241                        }
242                    }
243                }
244            }
245
246            epoch_store.process_notifications(keys.iter());
247
248            assert_eq!(
249                executed_via_checkpoint,
250                self.process_via_checkpoint.len(),
251                "Some transactions were not executed via checkpoint"
252            );
253
254            assert!(
255                !self.mock_block_status_receivers.lock().is_empty(),
256                "No mock submit responses left"
257            );
258
259            let mut consensus_positions = Vec::new();
260            for index in 0..num_transactions {
261                consensus_positions.push(ConsensusPosition {
262                    epoch: epoch_store.epoch(),
263                    index: index as u16,
264                    block: BlockRef::MIN,
265                });
266            }
267
268            Ok((
269                consensus_positions,
270                self.mock_block_status_receivers.lock().remove(0),
271            ))
272        }
273    }
274    // Make a new consensus adapter instance.
275    Arc::new(ConsensusAdapter::new(
276        Arc::new(SubmitDirectly {
277            state: state.clone(),
278            process_via_checkpoint,
279            execute,
280            mock_block_status_receivers: Arc::new(Mutex::new(mock_block_status_receivers)),
281        }),
282        state.checkpoint_store.clone(),
283        state.name,
284        100_000,
285        100_000,
286        metrics,
287        Arc::new(tokio::sync::Notify::new()),
288    ))
289}
290
291/// Creates a ConsensusHandler for testing with a mock ExecutionSchedulerSender that captures transactions
292pub async fn setup_consensus_handler_for_testing_with_checkpoint_service<C>(
293    authority: &Arc<AuthorityState>,
294    checkpoint_service: Arc<C>,
295) -> TestConsensusHandlerSetup<C>
296where
297    C: Send + Sync + 'static,
298{
299    let epoch_store = authority.epoch_store_for_testing();
300    let consensus_committee = epoch_store.epoch_start_state().get_consensus_committee();
301    let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
302    let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
303    let backpressure_manager = BackpressureManager::new_for_tests();
304    let consensus_adapter =
305        make_consensus_adapter_for_test(authority.clone(), HashSet::new(), false, vec![]);
306
307    let last_consensus_stats = ExecutionIndicesWithStatsV2 {
308        stats: crate::authority::authority_per_epoch_store::ConsensusStats::new(
309            consensus_committee.size(),
310        ),
311        ..Default::default()
312    };
313
314    let captured_transactions: CapturedTransactions = Arc::new(Mutex::new(Vec::new()));
315    let captured_tx_clone = captured_transactions.clone();
316
317    let (tx_sender, mut receiver) =
318        mysten_metrics::monitored_mpsc::unbounded_channel("test_execution_scheduler");
319
320    tokio::spawn(async move {
321        while let Some(item) = receiver.recv().await {
322            captured_tx_clone.lock().push(item);
323        }
324    });
325
326    let execution_scheduler_sender = ExecutionSchedulerSender::new_for_testing(tx_sender);
327
328    let consensus_handler = ConsensusHandler::new_for_testing(
329        epoch_store.clone(),
330        checkpoint_service,
331        execution_scheduler_sender,
332        consensus_adapter,
333        authority.get_object_cache_reader().clone(),
334        consensus_committee,
335        metrics,
336        Arc::new(throughput_calculator),
337        backpressure_manager.subscribe(),
338        authority.traffic_controller.clone(),
339        last_consensus_stats,
340    );
341
342    TestConsensusHandlerSetup {
343        consensus_handler,
344        captured_transactions,
345    }
346}
347
348/// Creates a ConsensusHandler for testing with CheckpointServiceNoop
349#[cfg(test)]
350pub async fn setup_consensus_handler_for_testing(
351    authority: &Arc<AuthorityState>,
352) -> TestConsensusHandlerSetup<crate::checkpoints::CheckpointServiceNoop> {
353    setup_consensus_handler_for_testing_with_checkpoint_service(
354        authority,
355        Arc::new(crate::checkpoints::CheckpointServiceNoop {}),
356    )
357    .await
358}