test_cluster/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::{StreamExt, future::join_all};
5use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
6use mysten_common::fatal;
7use rand::{distributions::*, rngs::OsRng, seq::SliceRandom};
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::num::NonZeroUsize;
11use std::path::PathBuf;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use sui_config::genesis::Genesis;
15use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
16use sui_config::{Config, ExecutionCacheConfig, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG};
17use sui_config::{NodeConfig, PersistedConfig, SUI_KEYSTORE_FILENAME};
18use sui_core::authority_aggregator::AuthorityAggregator;
19use sui_core::authority_client::NetworkAuthorityClient;
20use sui_json_rpc_types::{
21    SuiExecutionStatus, SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse,
22    TransactionFilter,
23};
24use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
25use sui_node::SuiNodeHandle;
26use sui_protocol_config::{Chain, ProtocolVersion};
27use sui_sdk::apis::QuorumDriverApi;
28use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
29use sui_sdk::wallet_context::WalletContext;
30use sui_sdk::{SuiClient, SuiClientBuilder};
31use sui_swarm::memory::{Swarm, SwarmBuilder};
32use sui_swarm_config::genesis_config::{
33    AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig,
34};
35use sui_swarm_config::network_config::NetworkConfig;
36use sui_swarm_config::network_config_builder::{
37    GlobalStateHashV2EnabledCallback, GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig,
38    SupportedProtocolVersionsCallback,
39};
40use sui_swarm_config::node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder};
41use sui_test_transaction_builder::TestTransactionBuilder;
42use sui_types::base_types::ConciseableName;
43use sui_types::base_types::{AuthorityName, ObjectID, ObjectRef, SuiAddress};
44use sui_types::committee::CommitteeTrait;
45use sui_types::committee::{Committee, EpochId};
46use sui_types::crypto::KeypairTraits;
47use sui_types::crypto::SuiKeyPair;
48use sui_types::digests::{ChainIdentifier, TransactionDigest};
49use sui_types::effects::{TransactionEffects, TransactionEvents};
50use sui_types::error::SuiResult;
51use sui_types::message_envelope::Message;
52use sui_types::messages_grpc::{RawSubmitTxRequest, SubmitTxType};
53use sui_types::object::Object;
54use sui_types::sui_system_state::SuiSystemState;
55use sui_types::sui_system_state::SuiSystemStateTrait;
56use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
57use sui_types::supported_protocol_versions::SupportedProtocolVersions;
58use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
59use sui_types::transaction::{
60    CertifiedTransaction, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
61};
62use tokio::time::{Instant, timeout};
63use tokio::{task::JoinHandle, time::sleep};
64use tonic::IntoRequest;
65use tracing::{error, info};
66
67mod test_indexer_handle;
68
69const NUM_VALIDATOR: usize = 4;
70
71pub struct FullNodeHandle {
72    pub sui_node: SuiNodeHandle,
73    pub sui_client: SuiClient,
74    pub rpc_client: HttpClient,
75    pub rpc_url: String,
76}
77
78impl FullNodeHandle {
79    pub async fn new(sui_node: SuiNodeHandle, json_rpc_address: SocketAddr) -> Self {
80        let rpc_url = format!("http://{}", json_rpc_address);
81        let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
82
83        let sui_client = SuiClientBuilder::default().build(&rpc_url).await.unwrap();
84
85        Self {
86            sui_node,
87            sui_client,
88            rpc_client,
89            rpc_url,
90        }
91    }
92}
93
94pub struct TestCluster {
95    pub swarm: Swarm,
96    pub wallet: WalletContext,
97    pub fullnode_handle: FullNodeHandle,
98    indexer_handle: Option<test_indexer_handle::IndexerHandle>,
99}
100
101impl TestCluster {
102    pub fn rpc_client(&self) -> &HttpClient {
103        self.indexer_handle
104            .as_ref()
105            .map(|h| &h.rpc_client)
106            .unwrap_or(&self.fullnode_handle.rpc_client)
107    }
108
109    pub fn sui_client(&self) -> &SuiClient {
110        self.indexer_handle
111            .as_ref()
112            .map(|h| &h.sui_client)
113            .unwrap_or(&self.fullnode_handle.sui_client)
114    }
115
116    pub fn rpc_url(&self) -> &str {
117        self.indexer_handle
118            .as_ref()
119            .map(|h| h.rpc_url.as_str())
120            .unwrap_or(&self.fullnode_handle.rpc_url)
121    }
122
123    pub fn quorum_driver_api(&self) -> &QuorumDriverApi {
124        self.sui_client().quorum_driver_api()
125    }
126
127    pub fn wallet(&mut self) -> &WalletContext {
128        &self.wallet
129    }
130
131    pub fn wallet_mut(&mut self) -> &mut WalletContext {
132        &mut self.wallet
133    }
134
135    pub fn get_addresses(&self) -> Vec<SuiAddress> {
136        self.wallet.get_addresses()
137    }
138
139    // Helper function to get the 0th address in WalletContext
140    pub fn get_address_0(&self) -> SuiAddress {
141        self.get_addresses()[0]
142    }
143
144    // Helper function to get the 1st address in WalletContext
145    pub fn get_address_1(&self) -> SuiAddress {
146        self.get_addresses()[1]
147    }
148
149    // Helper function to get the 2nd address in WalletContext
150    pub fn get_address_2(&self) -> SuiAddress {
151        self.get_addresses()[2]
152    }
153
154    pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
155        self.swarm.get_fullnode_config_builder()
156    }
157
158    pub fn committee(&self) -> Arc<Committee> {
159        self.fullnode_handle
160            .sui_node
161            .with(|node| node.state().epoch_store_for_testing().committee().clone())
162    }
163
164    /// Convenience method to start a new fullnode in the test cluster.
165    pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
166        self.start_fullnode_from_config(
167            self.fullnode_config_builder()
168                .build(&mut OsRng, self.swarm.config()),
169        )
170        .await
171    }
172
173    pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
174        let json_rpc_address = config.json_rpc_address;
175        let node = self.swarm.spawn_new_node(config).await;
176        FullNodeHandle::new(node, json_rpc_address).await
177    }
178
179    pub fn all_node_handles(&self) -> Vec<SuiNodeHandle> {
180        self.swarm
181            .all_nodes()
182            .flat_map(|n| n.get_node_handle())
183            .collect()
184    }
185
186    pub fn all_validator_handles(&self) -> Vec<SuiNodeHandle> {
187        self.swarm
188            .validator_nodes()
189            .map(|n| n.get_node_handle().unwrap())
190            .collect()
191    }
192
193    pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
194        self.swarm.active_validators().map(|v| v.name()).collect()
195    }
196
197    pub fn get_genesis(&self) -> Genesis {
198        self.swarm.config().genesis.clone()
199    }
200
201    pub fn stop_node(&self, name: &AuthorityName) {
202        self.swarm.node(name).unwrap().stop();
203    }
204
205    pub async fn stop_all_validators(&self) {
206        info!("Stopping all validators in the cluster");
207        self.swarm.active_validators().for_each(|v| v.stop());
208        tokio::time::sleep(Duration::from_secs(3)).await;
209    }
210
211    pub async fn start_all_validators(&self) {
212        info!("Starting all validators in the cluster");
213        for v in self.swarm.validator_nodes() {
214            if v.is_running() {
215                continue;
216            }
217            v.start().await.unwrap();
218        }
219        tokio::time::sleep(Duration::from_secs(3)).await;
220    }
221
222    pub async fn start_node(&self, name: &AuthorityName) {
223        let node = self.swarm.node(name).unwrap();
224        if node.is_running() {
225            return;
226        }
227        node.start().await.unwrap();
228    }
229
230    pub async fn spawn_new_validator(
231        &mut self,
232        genesis_config: ValidatorGenesisConfig,
233    ) -> SuiNodeHandle {
234        let node_config = ValidatorConfigBuilder::new()
235            .build(genesis_config, self.swarm.config().genesis.clone());
236        self.swarm.spawn_new_node(node_config).await
237    }
238
239    pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
240        RandomNodeRestarter::new(self.clone())
241    }
242
243    pub async fn get_reference_gas_price(&self) -> u64 {
244        self.sui_client()
245            .governance_api()
246            .get_reference_gas_price()
247            .await
248            .expect("failed to get reference gas price")
249    }
250
251    pub fn get_chain_identifier(&self) -> ChainIdentifier {
252        ChainIdentifier::from(*self.swarm.config().genesis.checkpoint().digest())
253    }
254
255    pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
256        self.fullnode_handle
257            .sui_node
258            .with_async(|node| async { node.state().get_object(object_id).await })
259            .await
260    }
261
262    pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
263        self.get_object_from_fullnode_store(object_id)
264            .await
265            .unwrap()
266            .compute_object_reference()
267    }
268
269    pub async fn get_object_or_tombstone_from_fullnode_store(
270        &self,
271        object_id: ObjectID,
272    ) -> ObjectRef {
273        self.fullnode_handle
274            .sui_node
275            .state()
276            .get_object_cache_reader()
277            .get_latest_object_ref_or_tombstone(object_id)
278            .unwrap()
279    }
280
281    pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
282        self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
283            .await
284    }
285
286    pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
287        &self,
288        timeout_dur: Duration,
289    ) -> Option<RunWithRange> {
290        let mut shutdown_channel_rx = self
291            .fullnode_handle
292            .sui_node
293            .with(|node| node.subscribe_to_shutdown_channel());
294
295        timeout(timeout_dur, async move {
296            tokio::select! {
297                msg = shutdown_channel_rx.recv() =>
298                {
299                    match msg {
300                        Ok(Some(run_with_range)) => Some(run_with_range),
301                        Ok(None) => None,
302                        Err(e) => {
303                            error!("failed recv from sui-node shutdown channel: {}", e);
304                            None
305                        },
306                    }
307                },
308            }
309        })
310        .await
311        .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from sui-node")
312    }
313
314    pub async fn wait_for_protocol_version(
315        &self,
316        target_protocol_version: ProtocolVersion,
317    ) -> SuiSystemState {
318        self.wait_for_protocol_version_with_timeout(
319            target_protocol_version,
320            Duration::from_secs(60),
321        )
322        .await
323    }
324
325    pub async fn wait_for_protocol_version_with_timeout(
326        &self,
327        target_protocol_version: ProtocolVersion,
328        timeout_dur: Duration,
329    ) -> SuiSystemState {
330        timeout(timeout_dur, async move {
331            loop {
332                let system_state = self.wait_for_epoch(None).await;
333                if system_state.protocol_version() >= target_protocol_version.as_u64() {
334                    return system_state;
335                }
336            }
337        })
338        .await
339        .expect("Timed out waiting for cluster to target protocol version")
340    }
341
342    /// Ask 2f+1 validators to close epoch actively, and wait for the entire network to reach the next
343    /// epoch. This requires waiting for both the fullnode and all validators to reach the next epoch.
344    pub async fn trigger_reconfiguration(&self) {
345        info!("Starting reconfiguration");
346        let start = Instant::now();
347
348        // Close epoch on 2f+1 validators.
349        let cur_committee = self
350            .fullnode_handle
351            .sui_node
352            .with(|node| node.state().clone_committee_for_testing());
353        let mut cur_stake = 0;
354        for node in self.swarm.active_validators() {
355            node.get_node_handle()
356                .unwrap()
357                .with_async(|node| async {
358                    node.close_epoch_for_testing().await.unwrap_or_else(|_| {
359                        fatal!(
360                            "Failed to close epoch for validator {:?}",
361                            node.state().name
362                        );
363                    });
364                    cur_stake += cur_committee.weight(&node.state().name);
365                })
366                .await;
367            if cur_stake >= cur_committee.quorum_threshold() {
368                break;
369            }
370        }
371        info!("close_epoch complete after {:?}", start.elapsed());
372
373        self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
374        self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
375
376        info!("reconfiguration complete after {:?}", start.elapsed());
377    }
378
379    /// To detect whether the network has reached such state, we use the fullnode as the
380    /// source of truth, since a fullnode only does epoch transition when the network has
381    /// done so.
382    /// If target_epoch is specified, wait until the cluster reaches that epoch.
383    /// If target_epoch is None, wait until the cluster reaches the next epoch.
384    /// Note that this function does not guarantee that every node is at the target epoch.
385    pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
386        self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
387            .await
388    }
389
390    pub async fn wait_for_epoch_on_node(
391        &self,
392        handle: &SuiNodeHandle,
393        target_epoch: Option<EpochId>,
394        timeout_dur: Duration,
395    ) -> SuiSystemState {
396        let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
397
398        let mut state = None;
399        timeout(timeout_dur, async {
400            let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
401            if Some(epoch) == target_epoch {
402                return handle.with(|node| node.state().get_sui_system_state_object_for_testing().unwrap());
403            }
404            while let Ok(system_state) = epoch_rx.recv().await {
405                info!("received epoch {}", system_state.epoch());
406                state = Some(system_state.clone());
407                match target_epoch {
408                    Some(target_epoch) if system_state.epoch() >= target_epoch => {
409                        return system_state;
410                    }
411                    None => {
412                        return system_state;
413                    }
414                    _ => (),
415                }
416            }
417            unreachable!("Broken reconfig channel");
418        })
419        .await
420        .unwrap_or_else(|_| {
421            error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
422            if let Some(state) = state {
423                panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
424            }
425            panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
426        })
427    }
428
429    pub async fn wait_for_epoch_with_timeout(
430        &self,
431        target_epoch: Option<EpochId>,
432        timeout_dur: Duration,
433    ) -> SuiSystemState {
434        self.wait_for_epoch_on_node(&self.fullnode_handle.sui_node, target_epoch, timeout_dur)
435            .await
436    }
437
438    pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
439        let handles: Vec<_> = self
440            .swarm
441            .all_nodes()
442            .map(|node| node.get_node_handle().unwrap())
443            .collect();
444        let tasks: Vec<_> = handles
445            .iter()
446            .map(|handle| {
447                handle.with_async(|node| async {
448                    let mut retries = 0;
449                    loop {
450                        let epoch = node.state().epoch_store_for_testing().epoch();
451                        if epoch == target_epoch {
452                            if let Some(agg) = node.clone_authority_aggregator() {
453                                // This is a fullnode, we need to wait for its auth aggregator to reconfigure as well.
454                                if agg.committee.epoch() == target_epoch {
455                                    break;
456                                }
457                            } else {
458                                // This is a validator, we don't need to check the auth aggregator.
459                                break;
460                            }
461                        }
462                        tokio::time::sleep(Duration::from_secs(1)).await;
463                        retries += 1;
464                        if retries % 5 == 0 {
465                            tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
466                        }
467                    }
468                })
469            })
470            .collect();
471
472        timeout(Duration::from_secs(40), join_all(tasks))
473            .await
474            .expect("timed out waiting for reconfiguration to complete");
475    }
476
477    /// Upgrade the network protocol version, by restarting every validator with a new
478    /// supported versions.
479    /// Note that we don't restart the fullnode here, and it is assumed that the fulnode supports
480    /// the entire version range.
481    pub async fn update_validator_supported_versions(
482        &self,
483        new_supported_versions: SupportedProtocolVersions,
484    ) {
485        for authority in self.get_validator_pubkeys() {
486            self.stop_node(&authority);
487            tokio::time::sleep(Duration::from_millis(1000)).await;
488            self.swarm
489                .node(&authority)
490                .unwrap()
491                .config()
492                .supported_protocol_versions = Some(new_supported_versions);
493            self.start_node(&authority).await;
494            info!("Restarted validator {}", authority);
495        }
496    }
497
498    /// Wait for all nodes in the network to upgrade to `protocol_version`.
499    pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
500        for h in self.all_node_handles() {
501            h.with_async(|node| async {
502                while node
503                    .state()
504                    .epoch_store_for_testing()
505                    .epoch_start_state()
506                    .protocol_version()
507                    .as_u64()
508                    != protocol_version
509                {
510                    tokio::time::sleep(Duration::from_secs(1)).await;
511                }
512            })
513            .await;
514        }
515    }
516
517    pub async fn wait_for_authenticator_state_update(&self) {
518        timeout(
519            Duration::from_secs(60),
520            self.fullnode_handle.sui_node.with_async(|node| async move {
521                let mut txns = node.state().subscription_handler.subscribe_transactions(
522                    TransactionFilter::ChangedObject(ObjectID::from_hex_literal("0x7").unwrap()),
523                );
524                let state = node.state();
525
526                while let Some(tx) = txns.next().await {
527                    let digest = *tx.transaction_digest();
528                    let tx = state
529                        .get_transaction_cache_reader()
530                        .get_transaction_block(&digest)
531                        .unwrap();
532                    match &tx.data().intent_message().value.kind() {
533                        TransactionKind::EndOfEpochTransaction(_) => (),
534                        TransactionKind::AuthenticatorStateUpdate(_) => break,
535                        _ => panic!("{:?}", tx),
536                    }
537                }
538            }),
539        )
540        .await
541        .expect("Timed out waiting for authenticator state update");
542    }
543
544    /// Return the highest observed protocol version in the test cluster.
545    pub fn highest_protocol_version(&self) -> ProtocolVersion {
546        self.all_node_handles()
547            .into_iter()
548            .map(|h| {
549                h.with(|node| {
550                    node.state()
551                        .epoch_store_for_testing()
552                        .epoch_start_state()
553                        .protocol_version()
554                })
555            })
556            .max()
557            .expect("at least one node must be up to get highest protocol version")
558    }
559
560    pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
561        let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
562        self.test_transaction_builder_with_gas_object(sender, gas)
563            .await
564    }
565
566    pub async fn test_transaction_builder_with_sender(
567        &self,
568        sender: SuiAddress,
569    ) -> TestTransactionBuilder {
570        let gas = self
571            .wallet
572            .get_one_gas_object_owned_by_address(sender)
573            .await
574            .unwrap()
575            .unwrap();
576        self.test_transaction_builder_with_gas_object(sender, gas)
577            .await
578    }
579
580    pub async fn test_transaction_builder_with_gas_object(
581        &self,
582        sender: SuiAddress,
583        gas: ObjectRef,
584    ) -> TestTransactionBuilder {
585        let rgp = self.get_reference_gas_price().await;
586        TestTransactionBuilder::new(sender, gas, rgp)
587    }
588
589    pub async fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
590        self.wallet.sign_transaction(tx_data).await
591    }
592
593    pub async fn sign_and_execute_transaction(
594        &self,
595        tx_data: &TransactionData,
596    ) -> SuiTransactionBlockResponse {
597        let tx = self.wallet.sign_transaction(tx_data).await;
598        self.execute_transaction(tx).await
599    }
600
601    /// Sign and execute the transaction via direct validator submission, bypassing the fullnode.
602    pub async fn sign_and_execute_transaction_directly(
603        &self,
604        tx_data: &TransactionData,
605    ) -> SuiResult<(TransactionDigest, TransactionEffects)> {
606        let mut res = self
607            .sign_and_execute_txns_in_soft_bundle(std::slice::from_ref(tx_data))
608            .await?;
609        assert_eq!(res.len(), 1);
610        Ok(res.pop().unwrap())
611    }
612
613    /// Sign and execute multiple transactions in a soft bundle.
614    /// Soft bundles allow submitting multiple transactions together with best-effort
615    /// ordering if they use the same gas price. Transactions in a soft bundle can be
616    /// individually rejected or deferred without affecting other transactions.
617    ///
618    /// NOTE: This is a simplified implementation that processes transactions individually.
619    /// For true soft bundle submission, the test file should use the raw gRPC client directly
620    /// with tonic, as shown in test_soft_bundle_different_gas_payers.
621    pub async fn sign_and_execute_txns_in_soft_bundle(
622        &self,
623        txns: &[TransactionData],
624    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
625        // Sign all transactions
626        let signed_txs: Vec<Transaction> =
627            futures::future::join_all(txns.iter().map(|tx| self.wallet.sign_transaction(tx))).await;
628
629        self.execute_signed_txns_in_soft_bundle(&signed_txs).await
630    }
631
632    pub async fn execute_signed_txns_in_soft_bundle(
633        &self,
634        signed_txs: &[Transaction],
635    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
636        let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
637
638        let request = RawSubmitTxRequest {
639            transactions: signed_txs
640                .iter()
641                .map(|tx| bcs::to_bytes(tx).unwrap().into())
642                .collect(),
643            submit_type: SubmitTxType::SoftBundle.into(),
644        };
645
646        let mut validator_client = self
647            .authority_aggregator()
648            .authority_clients
649            .iter()
650            .next()
651            .unwrap()
652            .1
653            .authority_client()
654            .get_client_for_testing()
655            .unwrap();
656
657        let result = validator_client
658            .submit_transaction(request.into_request())
659            .await
660            .map(tonic::Response::into_inner)?;
661        assert_eq!(result.results.len(), signed_txs.len());
662
663        for raw_result in result.results.iter() {
664            let submit_result: sui_types::messages_grpc::SubmitTxResult =
665                raw_result.clone().try_into()?;
666            if let sui_types::messages_grpc::SubmitTxResult::Rejected { error } = submit_result {
667                return Err(error);
668            }
669        }
670
671        let effects = self
672            .fullnode_handle
673            .sui_node
674            .with_async(|node| {
675                let digests = digests.clone();
676                async move {
677                    let state = node.state();
678                    let transaction_cache_reader = state.get_transaction_cache_reader();
679                    transaction_cache_reader
680                        .notify_read_executed_effects(
681                            "sign_and_execute_txns_in_soft_bundle",
682                            &digests,
683                        )
684                        .await
685                }
686            })
687            .await;
688
689        Ok(digests.into_iter().zip(effects.into_iter()).collect())
690    }
691
692    pub async fn wait_for_tx_settlement(&self, digests: &[TransactionDigest]) {
693        self.fullnode_handle
694            .sui_node
695            .with_async(|node| async move {
696                let state = node.state();
697                // wait until the transactions are in checkpoints
698                let checkpoint_seqs = state
699                    .epoch_store_for_testing()
700                    .transactions_executed_in_checkpoint_notify(digests.to_vec())
701                    .await
702                    .unwrap();
703
704                // then wait until the highest of the checkpoints is executed
705                let max_checkpoint_seq = checkpoint_seqs.into_iter().max().unwrap();
706                state
707                    .checkpoint_store
708                    .notify_read_executed_checkpoint(max_checkpoint_seq)
709                    .await;
710            })
711            .await;
712    }
713
714    /// Execute a transaction on the network and wait for it to be executed on the rpc fullnode.
715    /// Also expects the effects status to be ExecutionStatus::Success.
716    /// This function is recommended for transaction execution since it most resembles the
717    /// production path.
718    pub async fn execute_transaction(&self, tx: Transaction) -> SuiTransactionBlockResponse {
719        self.wallet.execute_transaction_must_succeed(tx).await
720    }
721
722    /// Different from `execute_transaction` which returns RPC effects types, this function
723    /// returns raw effects, events and extra objects returned by the validators,
724    /// aggregated manually (without authority aggregator).
725    /// It also does not check whether the transaction is executed successfully.
726    /// In order to keep the fullnode up-to-date so that latter queries can read consistent
727    /// results, it calls execute_transaction_may_fail again which goes through fullnode.
728    /// This is less efficient and verbose, but can be used if more details are needed
729    /// from the execution results, and if the transaction is expected to fail.
730    pub async fn execute_transaction_return_raw_effects(
731        &self,
732        tx: Transaction,
733    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
734        let results = self
735            .submit_transaction_to_validators(tx.clone(), &self.get_validator_pubkeys())
736            .await?;
737        self.wallet.execute_transaction_may_fail(tx).await.unwrap();
738        Ok(results)
739    }
740
741    pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
742        self.fullnode_handle
743            .sui_node
744            .with(|node| node.clone_authority_aggregator().unwrap())
745    }
746
747    pub async fn create_certificate(
748        &self,
749        tx: Transaction,
750        client_addr: Option<SocketAddr>,
751    ) -> anyhow::Result<CertifiedTransaction> {
752        let agg = self.authority_aggregator();
753        Ok(agg
754            .process_transaction(tx, client_addr)
755            .await?
756            .into_cert_for_testing())
757    }
758
759    /// Execute a transaction on specified list of validators, and bypassing authority aggregator.
760    /// This allows us to obtain the return value directly from validators, so that we can access more
761    /// information directly such as the original effects, events and extra objects returned.
762    /// This also allows us to control which validator to send certificates to, which is useful in
763    /// some tests.
764    pub async fn submit_transaction_to_validators(
765        &self,
766        tx: Transaction,
767        pubkeys: &[AuthorityName],
768    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
769        let agg = self.authority_aggregator();
770        let certificate = agg
771            .process_transaction(tx, None)
772            .await?
773            .into_cert_for_testing();
774        let replies = loop {
775            let futures: Vec<_> = agg
776                .authority_clients
777                .iter()
778                .filter_map(|(name, client)| {
779                    if pubkeys.contains(name) {
780                        Some(client)
781                    } else {
782                        None
783                    }
784                })
785                .map(|client| {
786                    let cert = certificate.clone();
787                    async move { client.handle_certificate_v2(cert, None).await }
788                })
789                .collect();
790
791            let replies: Vec<_> = futures::future::join_all(futures)
792                .await
793                .into_iter()
794                .filter(|result| match result {
795                    Err(e) => !e.to_string().contains("deadline has elapsed"),
796                    _ => true,
797                })
798                .collect();
799
800            if !replies.is_empty() {
801                break replies;
802            }
803        };
804        let replies: SuiResult<Vec<_>> = replies.into_iter().collect();
805        let replies = replies?;
806        let mut all_effects = HashMap::new();
807        let mut all_events = HashMap::new();
808        for reply in replies {
809            let effects = reply.signed_effects.into_data();
810            all_effects.insert(effects.digest(), effects);
811            all_events.insert(reply.events.digest(), reply.events);
812            // reply.fastpath_input_objects is unused.
813        }
814        assert_eq!(all_effects.len(), 1);
815        assert_eq!(all_events.len(), 1);
816        Ok((
817            all_effects.into_values().next().unwrap(),
818            all_events.into_values().next().unwrap(),
819        ))
820    }
821
822    /// This call sends some funds from the seeded address to the funding
823    /// address for the given amount and returns the gas object ref. This
824    /// is useful to construct transactions from the funding address.
825    pub async fn fund_address_and_return_gas(
826        &self,
827        rgp: u64,
828        amount: Option<u64>,
829        funding_address: SuiAddress,
830    ) -> ObjectRef {
831        let context = &self.wallet;
832        let (sender, gas) = context.get_one_gas_object().await.unwrap().unwrap();
833        let tx = context
834            .sign_transaction(
835                &TestTransactionBuilder::new(sender, gas, rgp)
836                    .transfer_sui(amount, funding_address)
837                    .build(),
838            )
839            .await;
840        context.execute_transaction_must_succeed(tx).await;
841
842        context
843            .get_one_gas_object_owned_by_address(funding_address)
844            .await
845            .unwrap()
846            .unwrap()
847    }
848
849    pub async fn transfer_sui_must_exceed(
850        &self,
851        sender: SuiAddress,
852        receiver: SuiAddress,
853        amount: u64,
854    ) -> ObjectID {
855        let tx = self
856            .test_transaction_builder_with_sender(sender)
857            .await
858            .transfer_sui(Some(amount), receiver)
859            .build();
860        let effects = self
861            .sign_and_execute_transaction(&tx)
862            .await
863            .effects
864            .unwrap();
865        assert_eq!(&SuiExecutionStatus::Success, effects.status());
866        effects.created().first().unwrap().object_id()
867    }
868
869    #[cfg(msim)]
870    pub fn set_safe_mode_expected(&self, value: bool) {
871        for n in self.all_node_handles() {
872            n.with(|node| node.set_safe_mode_expected(value));
873        }
874    }
875}
876
877pub struct RandomNodeRestarter {
878    test_cluster: Arc<TestCluster>,
879
880    // How frequently should we kill nodes
881    kill_interval: Uniform<Duration>,
882    // How long should we wait before restarting them.
883    restart_delay: Uniform<Duration>,
884
885    task_handle: Mutex<Option<JoinHandle<()>>>,
886}
887
888impl RandomNodeRestarter {
889    fn new(test_cluster: Arc<TestCluster>) -> Self {
890        Self {
891            test_cluster,
892            kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
893            restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
894            task_handle: Default::default(),
895        }
896    }
897
898    pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
899        self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
900        self
901    }
902
903    pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
904        self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
905        self
906    }
907
908    pub fn run(&self) {
909        let test_cluster = self.test_cluster.clone();
910        let kill_interval = self.kill_interval;
911        let restart_delay = self.restart_delay;
912        let validators = self.test_cluster.get_validator_pubkeys();
913        let mut task_handle = self.task_handle.lock().unwrap();
914        assert!(task_handle.is_none());
915        task_handle.replace(tokio::task::spawn(async move {
916            loop {
917                let delay = kill_interval.sample(&mut OsRng);
918                info!("Sleeping {delay:?} before killing a validator");
919                sleep(delay).await;
920
921                let validator = validators.choose(&mut OsRng).unwrap();
922                info!("Killing validator {:?}", validator.concise());
923                test_cluster.stop_node(validator);
924
925                let delay = restart_delay.sample(&mut OsRng);
926                info!("Sleeping {delay:?} before restarting");
927                sleep(delay).await;
928                info!("Starting validator {:?}", validator.concise());
929                test_cluster.start_node(validator).await;
930            }
931        }));
932    }
933}
934
935impl Drop for RandomNodeRestarter {
936    fn drop(&mut self) {
937        if let Some(handle) = self.task_handle.lock().unwrap().take() {
938            handle.abort();
939        }
940    }
941}
942
943pub struct TestClusterBuilder {
944    genesis_config: Option<GenesisConfig>,
945    network_config: Option<NetworkConfig>,
946    additional_objects: Vec<Object>,
947    num_validators: Option<usize>,
948    validators: Option<Vec<ValidatorGenesisConfig>>,
949    fullnode_rpc_port: Option<u16>,
950    enable_fullnode_events: bool,
951    disable_fullnode_pruning: bool,
952    validator_supported_protocol_versions_config: ProtocolVersionsConfig,
953    // Default to validator_supported_protocol_versions_config, but can be overridden.
954    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
955    db_checkpoint_config_validators: DBCheckpointConfig,
956    db_checkpoint_config_fullnodes: DBCheckpointConfig,
957    num_unpruned_validators: Option<usize>,
958    jwk_fetch_interval: Option<Duration>,
959    config_dir: Option<PathBuf>,
960    default_jwks: bool,
961    authority_overload_config: Option<AuthorityOverloadConfig>,
962    execution_cache_config: Option<ExecutionCacheConfig>,
963    data_ingestion_dir: Option<PathBuf>,
964    fullnode_run_with_range: Option<RunWithRange>,
965    fullnode_policy_config: Option<PolicyConfig>,
966    fullnode_fw_config: Option<RemoteFirewallConfig>,
967
968    max_submit_position: Option<usize>,
969    submit_delay_step_override_millis: Option<u64>,
970    validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
971
972    indexer_backed_rpc: bool,
973    rpc_config: Option<sui_config::RpcConfig>,
974
975    chain_override: Option<Chain>,
976
977    execution_time_observer_config: Option<sui_config::node::ExecutionTimeObserverConfig>,
978
979    state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
980
981    #[cfg(msim)]
982    inject_synthetic_execution_time: bool,
983}
984
985impl TestClusterBuilder {
986    pub fn new() -> Self {
987        TestClusterBuilder {
988            genesis_config: None,
989            network_config: None,
990            chain_override: None,
991            additional_objects: vec![],
992            fullnode_rpc_port: None,
993            num_validators: None,
994            validators: None,
995            enable_fullnode_events: false,
996            disable_fullnode_pruning: false,
997            validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
998            fullnode_supported_protocol_versions_config: None,
999            db_checkpoint_config_validators: DBCheckpointConfig::default(),
1000            db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
1001            num_unpruned_validators: None,
1002            jwk_fetch_interval: None,
1003            config_dir: None,
1004            default_jwks: false,
1005            authority_overload_config: None,
1006            execution_cache_config: None,
1007            data_ingestion_dir: None,
1008            fullnode_run_with_range: None,
1009            fullnode_policy_config: None,
1010            fullnode_fw_config: None,
1011            max_submit_position: None,
1012            submit_delay_step_override_millis: None,
1013            validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(
1014                true,
1015            ),
1016            indexer_backed_rpc: false,
1017            rpc_config: None,
1018            execution_time_observer_config: None,
1019            state_sync_config: None,
1020            #[cfg(msim)]
1021            inject_synthetic_execution_time: false,
1022        }
1023    }
1024
1025    pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
1026        self.state_sync_config = Some(config);
1027        self
1028    }
1029
1030    pub fn with_execution_time_observer_config(
1031        mut self,
1032        config: sui_config::node::ExecutionTimeObserverConfig,
1033    ) -> Self {
1034        self.execution_time_observer_config = Some(config);
1035        self
1036    }
1037
1038    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1039        if let Some(run_with_range) = run_with_range {
1040            self.fullnode_run_with_range = Some(run_with_range);
1041        }
1042        self
1043    }
1044
1045    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1046        self.fullnode_policy_config = config;
1047        self
1048    }
1049
1050    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1051        self.fullnode_fw_config = config;
1052        self
1053    }
1054
1055    pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1056        self.fullnode_rpc_port = Some(rpc_port);
1057        self
1058    }
1059
1060    pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1061        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1062        self.genesis_config = Some(genesis_config);
1063        self
1064    }
1065
1066    pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1067        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1068        self.network_config = Some(network_config);
1069        self
1070    }
1071
1072    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1073        self.additional_objects.extend(objects);
1074        self
1075    }
1076
1077    /// Set the number of default validators to spawn. Can be overridden by `with_validators`, if
1078    /// you need to provide more specific genesis configs for each validator.
1079    pub fn with_num_validators(mut self, num: usize) -> Self {
1080        self.num_validators = Some(num);
1081        self
1082    }
1083
1084    /// Provide validator genesis configs, overrides the `num_validators` setting.
1085    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
1086        self.validators = Some(validators);
1087        self
1088    }
1089
1090    pub fn enable_fullnode_events(mut self) -> Self {
1091        self.enable_fullnode_events = true;
1092        self
1093    }
1094
1095    pub fn disable_fullnode_pruning(mut self) -> Self {
1096        self.disable_fullnode_pruning = true;
1097        self
1098    }
1099
1100    pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1101        self.db_checkpoint_config_validators = DBCheckpointConfig {
1102            perform_db_checkpoints_at_epoch_end: true,
1103            checkpoint_path: None,
1104            object_store_config: None,
1105            perform_index_db_checkpoints_at_epoch_end: None,
1106            prune_and_compact_before_upload: None,
1107        };
1108        self
1109    }
1110
1111    pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1112        self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1113            perform_db_checkpoints_at_epoch_end: true,
1114            checkpoint_path: None,
1115            object_store_config: None,
1116            perform_index_db_checkpoints_at_epoch_end: None,
1117            prune_and_compact_before_upload: Some(true),
1118        };
1119        self
1120    }
1121
1122    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1123        self.get_or_init_genesis_config()
1124            .parameters
1125            .epoch_duration_ms = epoch_duration_ms;
1126        self
1127    }
1128
1129    pub fn with_stake_subsidy_start_epoch(mut self, stake_subsidy_start_epoch: u64) -> Self {
1130        self.get_or_init_genesis_config()
1131            .parameters
1132            .stake_subsidy_start_epoch = stake_subsidy_start_epoch;
1133        self
1134    }
1135
1136    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1137        self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1138        self
1139    }
1140
1141    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1142        self.jwk_fetch_interval = Some(i);
1143        self
1144    }
1145
1146    pub fn with_fullnode_supported_protocol_versions_config(
1147        mut self,
1148        c: SupportedProtocolVersions,
1149    ) -> Self {
1150        self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1151        self
1152    }
1153
1154    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1155        self.get_or_init_genesis_config()
1156            .parameters
1157            .protocol_version = v;
1158        self
1159    }
1160
1161    pub fn with_supported_protocol_version_callback(
1162        mut self,
1163        func: SupportedProtocolVersionsCallback,
1164    ) -> Self {
1165        self.validator_supported_protocol_versions_config =
1166            ProtocolVersionsConfig::PerValidator(func);
1167        self
1168    }
1169
1170    pub fn with_global_state_hash_v2_enabled_callback(
1171        mut self,
1172        func: GlobalStateHashV2EnabledCallback,
1173    ) -> Self {
1174        self.validator_global_state_hash_v2_enabled_config =
1175            GlobalStateHashV2EnabledConfig::PerValidator(func);
1176        self
1177    }
1178
1179    pub fn with_validator_candidates(
1180        mut self,
1181        addresses: impl IntoIterator<Item = SuiAddress>,
1182    ) -> Self {
1183        self.get_or_init_genesis_config()
1184            .accounts
1185            .extend(addresses.into_iter().map(|address| AccountConfig {
1186                address: Some(address),
1187                gas_amounts: vec![DEFAULT_GAS_AMOUNT, DEFAULT_GAS_AMOUNT],
1188            }));
1189        self
1190    }
1191
1192    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1193        self.num_unpruned_validators = Some(n);
1194        self
1195    }
1196
1197    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1198        self.get_or_init_genesis_config().accounts = accounts;
1199        self
1200    }
1201
1202    pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1203        self.get_or_init_genesis_config().accounts.extend(accounts);
1204        self
1205    }
1206
1207    pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1208        self.config_dir = Some(config_dir);
1209        self
1210    }
1211
1212    pub fn with_default_jwks(mut self) -> Self {
1213        self.default_jwks = true;
1214        self
1215    }
1216
1217    pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1218        assert!(self.network_config.is_none());
1219        self.authority_overload_config = Some(config);
1220        self
1221    }
1222
1223    pub fn with_execution_cache_config(mut self, config: ExecutionCacheConfig) -> Self {
1224        assert!(self.network_config.is_none());
1225        self.execution_cache_config = Some(config);
1226        self
1227    }
1228
1229    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1230        self.data_ingestion_dir = Some(path);
1231        self
1232    }
1233
1234    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1235        self.max_submit_position = Some(max_submit_position);
1236        self
1237    }
1238
1239    pub fn with_submit_delay_step_override_millis(
1240        mut self,
1241        submit_delay_step_override_millis: u64,
1242    ) -> Self {
1243        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1244        self
1245    }
1246
1247    pub fn with_indexer_backed_rpc(mut self) -> Self {
1248        self.indexer_backed_rpc = true;
1249        self
1250    }
1251
1252    pub fn with_rpc_config(mut self, config: sui_config::RpcConfig) -> Self {
1253        self.rpc_config = Some(config);
1254        self
1255    }
1256
1257    pub fn with_chain_override(mut self, chain: Chain) -> Self {
1258        self.chain_override = Some(chain);
1259        self
1260    }
1261
1262    #[cfg(msim)]
1263    pub fn with_synthetic_execution_time_injection(mut self) -> Self {
1264        self.inject_synthetic_execution_time = true;
1265        self
1266    }
1267
1268    pub async fn build(mut self) -> TestCluster {
1269        // All test clusters receive a continuous stream of random JWKs.
1270        // If we later use zklogin authenticated transactions in tests we will need to supply
1271        // valid JWKs as well.
1272        #[cfg(msim)]
1273        if !self.default_jwks {
1274            sui_node::set_jwk_injector(Arc::new(|_authority, provider| {
1275                use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1276                use rand::Rng;
1277
1278                // generate random (and possibly conflicting) id/key pairings.
1279                let id_num = rand::thread_rng().gen_range(1..=4);
1280                let key_num = rand::thread_rng().gen_range(1..=4);
1281
1282                let id = JwkId {
1283                    iss: provider.get_config().iss,
1284                    kid: format!("kid{}", id_num),
1285                };
1286
1287                let jwk = JWK {
1288                    kty: "kty".to_string(),
1289                    e: "e".to_string(),
1290                    n: format!("n{}", key_num),
1291                    alg: "alg".to_string(),
1292                };
1293
1294                Ok(vec![(id, jwk)])
1295            }));
1296        }
1297
1298        let mut temp_data_ingestion_dir = None;
1299        let mut data_ingestion_path = None;
1300
1301        if self.indexer_backed_rpc {
1302            if self.data_ingestion_dir.is_none() {
1303                temp_data_ingestion_dir = Some(mysten_common::tempdir().unwrap());
1304                self.data_ingestion_dir = Some(
1305                    temp_data_ingestion_dir
1306                        .as_ref()
1307                        .unwrap()
1308                        .path()
1309                        .to_path_buf(),
1310                );
1311                assert!(self.data_ingestion_dir.is_some());
1312            }
1313            assert!(self.data_ingestion_dir.is_some());
1314            data_ingestion_path = Some(self.data_ingestion_dir.as_ref().unwrap().to_path_buf());
1315        }
1316
1317        let swarm = self.start_swarm().await.unwrap();
1318        let working_dir = swarm.dir();
1319
1320        let fullnode = swarm.fullnodes().next().unwrap();
1321        let json_rpc_address = fullnode.config().json_rpc_address;
1322        let fullnode_handle =
1323            FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1324
1325        let (rpc_url, indexer_handle) = if self.indexer_backed_rpc {
1326            let handle = test_indexer_handle::IndexerHandle::new(
1327                fullnode_handle.rpc_url.clone(),
1328                temp_data_ingestion_dir,
1329                data_ingestion_path.unwrap(),
1330            )
1331            .await;
1332            (handle.rpc_url.clone(), Some(handle))
1333        } else {
1334            (fullnode_handle.rpc_url.clone(), None)
1335        };
1336
1337        let mut wallet_conf: SuiClientConfig =
1338            PersistedConfig::read(&working_dir.join(SUI_CLIENT_CONFIG)).unwrap();
1339        wallet_conf.envs.push(SuiEnv {
1340            alias: "localnet".to_string(),
1341            rpc: rpc_url,
1342            ws: None,
1343            basic_auth: None,
1344            chain_id: None,
1345        });
1346        wallet_conf.active_env = Some("localnet".to_string());
1347
1348        wallet_conf
1349            .persisted(&working_dir.join(SUI_CLIENT_CONFIG))
1350            .save()
1351            .unwrap();
1352
1353        let wallet_conf = swarm.dir().join(SUI_CLIENT_CONFIG);
1354        let wallet = WalletContext::new(&wallet_conf).unwrap();
1355
1356        TestCluster {
1357            swarm,
1358            wallet,
1359            fullnode_handle,
1360            indexer_handle,
1361        }
1362    }
1363
1364    /// Start a Swarm and set up WalletConfig
1365    async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1366        let mut builder: SwarmBuilder = Swarm::builder()
1367            .with_objects(self.additional_objects.clone())
1368            .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1369            .with_supported_protocol_versions_config(
1370                self.validator_supported_protocol_versions_config.clone(),
1371            )
1372            .with_global_state_hash_v2_enabled_config(
1373                self.validator_global_state_hash_v2_enabled_config.clone(),
1374            )
1375            .with_fullnode_count(1)
1376            .with_fullnode_supported_protocol_versions_config(
1377                self.fullnode_supported_protocol_versions_config
1378                    .clone()
1379                    .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1380            )
1381            .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1382            .with_fullnode_run_with_range(self.fullnode_run_with_range)
1383            .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1384            .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1385
1386        if let Some(validators) = self.validators.take() {
1387            builder = builder.with_validators(validators);
1388        } else {
1389            builder = builder.committee_size(
1390                NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1391            )
1392        };
1393
1394        if let Some(chain) = self.chain_override {
1395            builder = builder.with_chain_override(chain);
1396        }
1397
1398        if let Some(genesis_config) = self.genesis_config.take() {
1399            builder = builder.with_genesis_config(genesis_config);
1400        }
1401
1402        if let Some(network_config) = self.network_config.take() {
1403            builder = builder.with_network_config(network_config);
1404        }
1405
1406        if let Some(authority_overload_config) = self.authority_overload_config.take() {
1407            builder = builder.with_authority_overload_config(authority_overload_config);
1408        }
1409
1410        if let Some(execution_cache_config) = self.execution_cache_config.take() {
1411            builder = builder.with_execution_cache_config(execution_cache_config);
1412        }
1413
1414        if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1415            builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1416        }
1417
1418        if let Some(rpc_config) = &self.rpc_config {
1419            builder = builder.with_fullnode_rpc_config(rpc_config.clone());
1420        }
1421        if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1422            builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1423        }
1424
1425        if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1426            builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1427        }
1428
1429        if let Some(config_dir) = self.config_dir.take() {
1430            builder = builder.dir(config_dir);
1431        }
1432
1433        if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1434            builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1435        }
1436
1437        if let Some(max_submit_position) = self.max_submit_position {
1438            builder = builder.with_max_submit_position(max_submit_position);
1439        }
1440
1441        if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1442            builder =
1443                builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1444        }
1445
1446        if let Some(state_sync_config) = self.state_sync_config.clone() {
1447            builder = builder.with_state_sync_config(state_sync_config);
1448        }
1449
1450        if self.disable_fullnode_pruning {
1451            builder = builder.with_disable_fullnode_pruning();
1452        }
1453
1454        #[cfg(msim)]
1455        {
1456            if let Some(mut config) = self.execution_time_observer_config.clone() {
1457                if self.inject_synthetic_execution_time {
1458                    config.inject_synthetic_execution_time = Some(true);
1459                }
1460                builder = builder.with_execution_time_observer_config(config);
1461            } else if self.inject_synthetic_execution_time {
1462                use sui_config::node::ExecutionTimeObserverConfig;
1463
1464                let mut config = ExecutionTimeObserverConfig::default();
1465                config.inject_synthetic_execution_time = Some(true);
1466                builder = builder.with_execution_time_observer_config(config);
1467            }
1468        }
1469
1470        let mut swarm = builder.build();
1471        swarm.launch().await?;
1472
1473        let dir = swarm.dir();
1474
1475        let network_path = dir.join(SUI_NETWORK_CONFIG);
1476        let wallet_path = dir.join(SUI_CLIENT_CONFIG);
1477        let keystore_path = dir.join(SUI_KEYSTORE_FILENAME);
1478
1479        swarm.config().save(network_path)?;
1480        let mut keystore = Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?);
1481        for key in &swarm.config().account_keys {
1482            keystore
1483                .import(None, SuiKeyPair::Ed25519(key.copy()))
1484                .await?;
1485        }
1486
1487        let active_address = keystore.addresses().first().cloned();
1488
1489        // Create wallet config with stated authorities port
1490        SuiClientConfig {
1491            keystore: Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?),
1492            external_keys: None,
1493            envs: Default::default(),
1494            active_address,
1495            active_env: Default::default(),
1496        }
1497        .save(wallet_path)?;
1498
1499        // Return network handle
1500        Ok(swarm)
1501    }
1502
1503    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1504        if self.genesis_config.is_none() {
1505            self.genesis_config = Some(GenesisConfig::for_local_testing());
1506        }
1507        self.genesis_config.as_mut().unwrap()
1508    }
1509}
1510
1511impl Default for TestClusterBuilder {
1512    fn default() -> Self {
1513        Self::new()
1514    }
1515}