test_cluster/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::{StreamExt, future::join_all};
5use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
6use mysten_common::fatal;
7use rand::{distributions::*, rngs::OsRng, seq::SliceRandom};
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::num::NonZeroUsize;
11use std::path::PathBuf;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use sui_config::genesis::Genesis;
15use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
16use sui_config::{Config, ExecutionCacheConfig, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG};
17use sui_config::{NodeConfig, PersistedConfig, SUI_KEYSTORE_FILENAME};
18use sui_core::authority_aggregator::AuthorityAggregator;
19use sui_core::authority_client::NetworkAuthorityClient;
20use sui_json_rpc_types::{
21    SuiExecutionStatus, SuiTransactionBlockEffectsAPI, SuiTransactionBlockResponse,
22    TransactionFilter,
23};
24use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
25use sui_node::SuiNodeHandle;
26use sui_protocol_config::{Chain, ProtocolVersion};
27use sui_sdk::apis::QuorumDriverApi;
28use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
29use sui_sdk::wallet_context::WalletContext;
30use sui_sdk::{SuiClient, SuiClientBuilder};
31use sui_swarm::memory::{Swarm, SwarmBuilder};
32use sui_swarm_config::genesis_config::{
33    AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig,
34};
35use sui_swarm_config::network_config::NetworkConfig;
36use sui_swarm_config::network_config_builder::{
37    GlobalStateHashV2EnabledCallback, GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig,
38    SupportedProtocolVersionsCallback,
39};
40use sui_swarm_config::node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder};
41use sui_test_transaction_builder::TestTransactionBuilder;
42use sui_types::base_types::ConciseableName;
43use sui_types::base_types::{AuthorityName, ObjectID, ObjectRef, SuiAddress};
44use sui_types::committee::CommitteeTrait;
45use sui_types::committee::{Committee, EpochId};
46use sui_types::crypto::KeypairTraits;
47use sui_types::crypto::SuiKeyPair;
48use sui_types::digests::{ChainIdentifier, TransactionDigest};
49use sui_types::effects::{TransactionEffects, TransactionEvents};
50use sui_types::error::SuiResult;
51use sui_types::message_envelope::Message;
52use sui_types::messages_grpc::{RawSubmitTxRequest, SubmitTxType};
53use sui_types::object::Object;
54use sui_types::sui_system_state::SuiSystemState;
55use sui_types::sui_system_state::SuiSystemStateTrait;
56use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
57use sui_types::supported_protocol_versions::SupportedProtocolVersions;
58use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
59use sui_types::transaction::{
60    CertifiedTransaction, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
61};
62use tokio::time::{Instant, timeout};
63use tokio::{task::JoinHandle, time::sleep};
64use tonic::IntoRequest;
65use tracing::{error, info};
66
67mod test_indexer_handle;
68
69const NUM_VALIDATOR: usize = 4;
70
71pub struct FullNodeHandle {
72    pub sui_node: SuiNodeHandle,
73    pub sui_client: SuiClient,
74    pub rpc_client: HttpClient,
75    pub rpc_url: String,
76}
77
78impl FullNodeHandle {
79    pub async fn new(sui_node: SuiNodeHandle, json_rpc_address: SocketAddr) -> Self {
80        let rpc_url = format!("http://{}", json_rpc_address);
81        let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
82
83        let sui_client = SuiClientBuilder::default().build(&rpc_url).await.unwrap();
84
85        Self {
86            sui_node,
87            sui_client,
88            rpc_client,
89            rpc_url,
90        }
91    }
92}
93
94pub struct TestCluster {
95    pub swarm: Swarm,
96    pub wallet: WalletContext,
97    pub fullnode_handle: FullNodeHandle,
98    indexer_handle: Option<test_indexer_handle::IndexerHandle>,
99}
100
101impl TestCluster {
102    pub fn rpc_client(&self) -> &HttpClient {
103        self.indexer_handle
104            .as_ref()
105            .map(|h| &h.rpc_client)
106            .unwrap_or(&self.fullnode_handle.rpc_client)
107    }
108
109    pub fn sui_client(&self) -> &SuiClient {
110        self.indexer_handle
111            .as_ref()
112            .map(|h| &h.sui_client)
113            .unwrap_or(&self.fullnode_handle.sui_client)
114    }
115
116    pub fn rpc_url(&self) -> &str {
117        self.indexer_handle
118            .as_ref()
119            .map(|h| h.rpc_url.as_str())
120            .unwrap_or(&self.fullnode_handle.rpc_url)
121    }
122
123    pub fn quorum_driver_api(&self) -> &QuorumDriverApi {
124        self.sui_client().quorum_driver_api()
125    }
126
127    pub fn wallet(&mut self) -> &WalletContext {
128        &self.wallet
129    }
130
131    pub fn wallet_mut(&mut self) -> &mut WalletContext {
132        &mut self.wallet
133    }
134
135    pub fn get_addresses(&self) -> Vec<SuiAddress> {
136        self.wallet.get_addresses()
137    }
138
139    // Helper function to get the 0th address in WalletContext
140    pub fn get_address_0(&self) -> SuiAddress {
141        self.get_addresses()[0]
142    }
143
144    // Helper function to get the 1st address in WalletContext
145    pub fn get_address_1(&self) -> SuiAddress {
146        self.get_addresses()[1]
147    }
148
149    // Helper function to get the 2nd address in WalletContext
150    pub fn get_address_2(&self) -> SuiAddress {
151        self.get_addresses()[2]
152    }
153
154    pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
155        self.swarm.get_fullnode_config_builder()
156    }
157
158    pub fn committee(&self) -> Arc<Committee> {
159        self.fullnode_handle
160            .sui_node
161            .with(|node| node.state().epoch_store_for_testing().committee().clone())
162    }
163
164    /// Convenience method to start a new fullnode in the test cluster.
165    pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
166        self.start_fullnode_from_config(
167            self.fullnode_config_builder()
168                .build(&mut OsRng, self.swarm.config()),
169        )
170        .await
171    }
172
173    pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
174        let json_rpc_address = config.json_rpc_address;
175        let node = self.swarm.spawn_new_node(config).await;
176        FullNodeHandle::new(node, json_rpc_address).await
177    }
178
179    pub fn all_node_handles(&self) -> Vec<SuiNodeHandle> {
180        self.swarm
181            .all_nodes()
182            .flat_map(|n| n.get_node_handle())
183            .collect()
184    }
185
186    pub fn all_validator_handles(&self) -> Vec<SuiNodeHandle> {
187        self.swarm
188            .validator_nodes()
189            .map(|n| n.get_node_handle().unwrap())
190            .collect()
191    }
192
193    pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
194        self.swarm.active_validators().map(|v| v.name()).collect()
195    }
196
197    pub fn get_genesis(&self) -> Genesis {
198        self.swarm.config().genesis.clone()
199    }
200
201    pub fn stop_node(&self, name: &AuthorityName) {
202        self.swarm.node(name).unwrap().stop();
203    }
204
205    pub async fn stop_all_validators(&self) {
206        info!("Stopping all validators in the cluster");
207        self.swarm.active_validators().for_each(|v| v.stop());
208        tokio::time::sleep(Duration::from_secs(3)).await;
209    }
210
211    pub async fn start_all_validators(&self) {
212        info!("Starting all validators in the cluster");
213        for v in self.swarm.validator_nodes() {
214            if v.is_running() {
215                continue;
216            }
217            v.start().await.unwrap();
218        }
219        tokio::time::sleep(Duration::from_secs(3)).await;
220    }
221
222    pub async fn start_node(&self, name: &AuthorityName) {
223        let node = self.swarm.node(name).unwrap();
224        if node.is_running() {
225            return;
226        }
227        node.start().await.unwrap();
228    }
229
230    pub async fn spawn_new_validator(
231        &mut self,
232        genesis_config: ValidatorGenesisConfig,
233    ) -> SuiNodeHandle {
234        let node_config = ValidatorConfigBuilder::new()
235            .build(genesis_config, self.swarm.config().genesis.clone());
236        self.swarm.spawn_new_node(node_config).await
237    }
238
239    pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
240        RandomNodeRestarter::new(self.clone())
241    }
242
243    pub async fn get_reference_gas_price(&self) -> u64 {
244        self.sui_client()
245            .governance_api()
246            .get_reference_gas_price()
247            .await
248            .expect("failed to get reference gas price")
249    }
250
251    pub fn get_chain_identifier(&self) -> ChainIdentifier {
252        ChainIdentifier::from(*self.swarm.config().genesis.checkpoint().digest())
253    }
254
255    pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
256        self.fullnode_handle
257            .sui_node
258            .with_async(|node| async { node.state().get_object(object_id).await })
259            .await
260    }
261
262    pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
263        self.get_object_from_fullnode_store(object_id)
264            .await
265            .unwrap()
266            .compute_object_reference()
267    }
268
269    pub async fn get_object_or_tombstone_from_fullnode_store(
270        &self,
271        object_id: ObjectID,
272    ) -> ObjectRef {
273        self.fullnode_handle
274            .sui_node
275            .state()
276            .get_object_cache_reader()
277            .get_latest_object_ref_or_tombstone(object_id)
278            .unwrap()
279    }
280
281    pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
282        self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
283            .await
284    }
285
286    pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
287        &self,
288        timeout_dur: Duration,
289    ) -> Option<RunWithRange> {
290        let mut shutdown_channel_rx = self
291            .fullnode_handle
292            .sui_node
293            .with(|node| node.subscribe_to_shutdown_channel());
294
295        timeout(timeout_dur, async move {
296            tokio::select! {
297                msg = shutdown_channel_rx.recv() =>
298                {
299                    match msg {
300                        Ok(Some(run_with_range)) => Some(run_with_range),
301                        Ok(None) => None,
302                        Err(e) => {
303                            error!("failed recv from sui-node shutdown channel: {}", e);
304                            None
305                        },
306                    }
307                },
308            }
309        })
310        .await
311        .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from sui-node")
312    }
313
314    pub async fn wait_for_protocol_version(
315        &self,
316        target_protocol_version: ProtocolVersion,
317    ) -> SuiSystemState {
318        self.wait_for_protocol_version_with_timeout(
319            target_protocol_version,
320            Duration::from_secs(60),
321        )
322        .await
323    }
324
325    pub async fn wait_for_protocol_version_with_timeout(
326        &self,
327        target_protocol_version: ProtocolVersion,
328        timeout_dur: Duration,
329    ) -> SuiSystemState {
330        timeout(timeout_dur, async move {
331            loop {
332                let system_state = self.wait_for_epoch(None).await;
333                if system_state.protocol_version() >= target_protocol_version.as_u64() {
334                    return system_state;
335                }
336            }
337        })
338        .await
339        .expect("Timed out waiting for cluster to target protocol version")
340    }
341
342    /// Ask 2f+1 validators to close epoch actively, and wait for the entire network to reach the next
343    /// epoch. This requires waiting for both the fullnode and all validators to reach the next epoch.
344    pub async fn trigger_reconfiguration(&self) {
345        info!("Starting reconfiguration");
346        let start = Instant::now();
347
348        // Close epoch on 2f+1 validators.
349        let cur_committee = self
350            .fullnode_handle
351            .sui_node
352            .with(|node| node.state().clone_committee_for_testing());
353        let mut cur_stake = 0;
354        for node in self.swarm.active_validators() {
355            node.get_node_handle()
356                .unwrap()
357                .with_async(|node| async {
358                    node.close_epoch_for_testing().await.unwrap_or_else(|_| {
359                        fatal!(
360                            "Failed to close epoch for validator {:?}",
361                            node.state().name
362                        );
363                    });
364                    cur_stake += cur_committee.weight(&node.state().name);
365                })
366                .await;
367            if cur_stake >= cur_committee.quorum_threshold() {
368                break;
369            }
370        }
371        info!("close_epoch complete after {:?}", start.elapsed());
372
373        self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
374        self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
375
376        info!("reconfiguration complete after {:?}", start.elapsed());
377    }
378
379    /// To detect whether the network has reached such state, we use the fullnode as the
380    /// source of truth, since a fullnode only does epoch transition when the network has
381    /// done so.
382    /// If target_epoch is specified, wait until the cluster reaches that epoch.
383    /// If target_epoch is None, wait until the cluster reaches the next epoch.
384    /// Note that this function does not guarantee that every node is at the target epoch.
385    pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
386        self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
387            .await
388    }
389
390    pub async fn wait_for_epoch_on_node(
391        &self,
392        handle: &SuiNodeHandle,
393        target_epoch: Option<EpochId>,
394        timeout_dur: Duration,
395    ) -> SuiSystemState {
396        let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
397
398        let mut state = None;
399        timeout(timeout_dur, async {
400            let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
401            if Some(epoch) == target_epoch {
402                return handle.with(|node| node.state().get_sui_system_state_object_for_testing().unwrap());
403            }
404            while let Ok(system_state) = epoch_rx.recv().await {
405                info!("received epoch {}", system_state.epoch());
406                state = Some(system_state.clone());
407                match target_epoch {
408                    Some(target_epoch) if system_state.epoch() >= target_epoch => {
409                        return system_state;
410                    }
411                    None => {
412                        return system_state;
413                    }
414                    _ => (),
415                }
416            }
417            unreachable!("Broken reconfig channel");
418        })
419        .await
420        .unwrap_or_else(|_| {
421            error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
422            if let Some(state) = state {
423                panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
424            }
425            panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
426        })
427    }
428
429    pub async fn wait_for_epoch_with_timeout(
430        &self,
431        target_epoch: Option<EpochId>,
432        timeout_dur: Duration,
433    ) -> SuiSystemState {
434        self.wait_for_epoch_on_node(&self.fullnode_handle.sui_node, target_epoch, timeout_dur)
435            .await
436    }
437
438    pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
439        let handles: Vec<_> = self
440            .swarm
441            .all_nodes()
442            .map(|node| node.get_node_handle().unwrap())
443            .collect();
444        let tasks: Vec<_> = handles
445            .iter()
446            .map(|handle| {
447                handle.with_async(|node| async {
448                    let mut retries = 0;
449                    loop {
450                        let epoch = node.state().epoch_store_for_testing().epoch();
451                        if epoch == target_epoch {
452                            if let Some(agg) = node.clone_authority_aggregator() {
453                                // This is a fullnode, we need to wait for its auth aggregator to reconfigure as well.
454                                if agg.committee.epoch() == target_epoch {
455                                    break;
456                                }
457                            } else {
458                                // This is a validator, we don't need to check the auth aggregator.
459                                break;
460                            }
461                        }
462                        tokio::time::sleep(Duration::from_secs(1)).await;
463                        retries += 1;
464                        if retries % 5 == 0 {
465                            tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
466                        }
467                    }
468                })
469            })
470            .collect();
471
472        timeout(Duration::from_secs(40), join_all(tasks))
473            .await
474            .expect("timed out waiting for reconfiguration to complete");
475    }
476
477    /// Upgrade the network protocol version, by restarting every validator with a new
478    /// supported versions.
479    /// Note that we don't restart the fullnode here, and it is assumed that the fulnode supports
480    /// the entire version range.
481    pub async fn update_validator_supported_versions(
482        &self,
483        new_supported_versions: SupportedProtocolVersions,
484    ) {
485        for authority in self.get_validator_pubkeys() {
486            self.stop_node(&authority);
487            tokio::time::sleep(Duration::from_millis(1000)).await;
488            self.swarm
489                .node(&authority)
490                .unwrap()
491                .config()
492                .supported_protocol_versions = Some(new_supported_versions);
493            self.start_node(&authority).await;
494            info!("Restarted validator {}", authority);
495        }
496    }
497
498    /// Wait for all nodes in the network to upgrade to `protocol_version`.
499    pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
500        for h in self.all_node_handles() {
501            h.with_async(|node| async {
502                while node
503                    .state()
504                    .epoch_store_for_testing()
505                    .epoch_start_state()
506                    .protocol_version()
507                    .as_u64()
508                    != protocol_version
509                {
510                    tokio::time::sleep(Duration::from_secs(1)).await;
511                }
512            })
513            .await;
514        }
515    }
516
517    pub async fn wait_for_authenticator_state_update(&self) {
518        timeout(
519            Duration::from_secs(60),
520            self.fullnode_handle.sui_node.with_async(|node| async move {
521                let mut txns = node.state().subscription_handler.subscribe_transactions(
522                    TransactionFilter::ChangedObject(ObjectID::from_hex_literal("0x7").unwrap()),
523                );
524                let state = node.state();
525
526                while let Some(tx) = txns.next().await {
527                    let digest = *tx.transaction_digest();
528                    let tx = state
529                        .get_transaction_cache_reader()
530                        .get_transaction_block(&digest)
531                        .unwrap();
532                    match &tx.data().intent_message().value.kind() {
533                        TransactionKind::EndOfEpochTransaction(_) => (),
534                        TransactionKind::AuthenticatorStateUpdate(_) => break,
535                        _ => panic!("{:?}", tx),
536                    }
537                }
538            }),
539        )
540        .await
541        .expect("Timed out waiting for authenticator state update");
542    }
543
544    /// Return the highest observed protocol version in the test cluster.
545    pub fn highest_protocol_version(&self) -> ProtocolVersion {
546        self.all_node_handles()
547            .into_iter()
548            .map(|h| {
549                h.with(|node| {
550                    node.state()
551                        .epoch_store_for_testing()
552                        .epoch_start_state()
553                        .protocol_version()
554                })
555            })
556            .max()
557            .expect("at least one node must be up to get highest protocol version")
558    }
559
560    pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
561        let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
562        self.test_transaction_builder_with_gas_object(sender, gas)
563            .await
564    }
565
566    pub async fn test_transaction_builder_with_sender(
567        &self,
568        sender: SuiAddress,
569    ) -> TestTransactionBuilder {
570        let gas = self
571            .wallet
572            .get_one_gas_object_owned_by_address(sender)
573            .await
574            .unwrap()
575            .unwrap();
576        self.test_transaction_builder_with_gas_object(sender, gas)
577            .await
578    }
579
580    pub async fn test_transaction_builder_with_gas_object(
581        &self,
582        sender: SuiAddress,
583        gas: ObjectRef,
584    ) -> TestTransactionBuilder {
585        let rgp = self.get_reference_gas_price().await;
586        TestTransactionBuilder::new(sender, gas, rgp)
587    }
588
589    pub async fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
590        self.wallet.sign_transaction(tx_data).await
591    }
592
593    pub async fn sign_and_execute_transaction(
594        &self,
595        tx_data: &TransactionData,
596    ) -> SuiTransactionBlockResponse {
597        let tx = self.wallet.sign_transaction(tx_data).await;
598        self.execute_transaction(tx).await
599    }
600
601    /// Sign and execute multiple transactions in a soft bundle.
602    /// Soft bundles allow submitting multiple transactions together with best-effort
603    /// ordering if they use the same gas price. Transactions in a soft bundle can be
604    /// individually rejected or deferred without affecting other transactions.
605    ///
606    /// NOTE: This is a simplified implementation that processes transactions individually.
607    /// For true soft bundle submission, the test file should use the raw gRPC client directly
608    /// with tonic, as shown in test_soft_bundle_different_gas_payers.
609    pub async fn sign_and_execute_txns_in_soft_bundle(
610        &self,
611        txns: &[TransactionData],
612    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
613        // Sign all transactions
614        let signed_txs: Vec<Transaction> =
615            futures::future::join_all(txns.iter().map(|tx| self.wallet.sign_transaction(tx))).await;
616
617        self.execute_signed_txns_in_soft_bundle(&signed_txs).await
618    }
619
620    pub async fn execute_signed_txns_in_soft_bundle(
621        &self,
622        signed_txs: &[Transaction],
623    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
624        let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
625
626        let request = RawSubmitTxRequest {
627            transactions: signed_txs
628                .iter()
629                .map(|tx| bcs::to_bytes(tx).unwrap().into())
630                .collect(),
631            submit_type: SubmitTxType::SoftBundle.into(),
632        };
633
634        let mut validator_client = self
635            .authority_aggregator()
636            .authority_clients
637            .iter()
638            .next()
639            .unwrap()
640            .1
641            .authority_client()
642            .get_client_for_testing()
643            .unwrap();
644
645        let result = validator_client
646            .submit_transaction(request.into_request())
647            .await
648            .map(tonic::Response::into_inner)?;
649        assert_eq!(result.results.len(), signed_txs.len());
650
651        let effects = self
652            .fullnode_handle
653            .sui_node
654            .with_async(|node| {
655                let digests = digests.clone();
656                async move {
657                    let state = node.state();
658                    let transaction_cache_reader = state.get_transaction_cache_reader();
659                    transaction_cache_reader
660                        .notify_read_executed_effects(
661                            "sign_and_execute_txns_in_soft_bundle",
662                            &digests,
663                        )
664                        .await
665                }
666            })
667            .await;
668
669        Ok(digests.into_iter().zip(effects.into_iter()).collect())
670    }
671
672    pub async fn wait_for_tx_settlement(&self, digests: &[TransactionDigest]) {
673        self.fullnode_handle
674            .sui_node
675            .with_async(|node| async move {
676                let state = node.state();
677                // wait until the transactions are in checkpoints
678                let checkpoint_seqs = state
679                    .epoch_store_for_testing()
680                    .transactions_executed_in_checkpoint_notify(digests.to_vec())
681                    .await
682                    .unwrap();
683
684                // then wait until the highest of the checkpoints is executed
685                let max_checkpoint_seq = checkpoint_seqs.into_iter().max().unwrap();
686                state
687                    .checkpoint_store
688                    .notify_read_executed_checkpoint(max_checkpoint_seq)
689                    .await;
690            })
691            .await;
692    }
693
694    /// Execute a transaction on the network and wait for it to be executed on the rpc fullnode.
695    /// Also expects the effects status to be ExecutionStatus::Success.
696    /// This function is recommended for transaction execution since it most resembles the
697    /// production path.
698    pub async fn execute_transaction(&self, tx: Transaction) -> SuiTransactionBlockResponse {
699        self.wallet.execute_transaction_must_succeed(tx).await
700    }
701
702    /// Different from `execute_transaction` which returns RPC effects types, this function
703    /// returns raw effects, events and extra objects returned by the validators,
704    /// aggregated manually (without authority aggregator).
705    /// It also does not check whether the transaction is executed successfully.
706    /// In order to keep the fullnode up-to-date so that latter queries can read consistent
707    /// results, it calls execute_transaction_may_fail again which goes through fullnode.
708    /// This is less efficient and verbose, but can be used if more details are needed
709    /// from the execution results, and if the transaction is expected to fail.
710    pub async fn execute_transaction_return_raw_effects(
711        &self,
712        tx: Transaction,
713    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
714        let results = self
715            .submit_transaction_to_validators(tx.clone(), &self.get_validator_pubkeys())
716            .await?;
717        self.wallet.execute_transaction_may_fail(tx).await.unwrap();
718        Ok(results)
719    }
720
721    pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
722        self.fullnode_handle
723            .sui_node
724            .with(|node| node.clone_authority_aggregator().unwrap())
725    }
726
727    pub async fn create_certificate(
728        &self,
729        tx: Transaction,
730        client_addr: Option<SocketAddr>,
731    ) -> anyhow::Result<CertifiedTransaction> {
732        let agg = self.authority_aggregator();
733        Ok(agg
734            .process_transaction(tx, client_addr)
735            .await?
736            .into_cert_for_testing())
737    }
738
739    /// Execute a transaction on specified list of validators, and bypassing authority aggregator.
740    /// This allows us to obtain the return value directly from validators, so that we can access more
741    /// information directly such as the original effects, events and extra objects returned.
742    /// This also allows us to control which validator to send certificates to, which is useful in
743    /// some tests.
744    pub async fn submit_transaction_to_validators(
745        &self,
746        tx: Transaction,
747        pubkeys: &[AuthorityName],
748    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
749        let agg = self.authority_aggregator();
750        let certificate = agg
751            .process_transaction(tx, None)
752            .await?
753            .into_cert_for_testing();
754        let replies = loop {
755            let futures: Vec<_> = agg
756                .authority_clients
757                .iter()
758                .filter_map(|(name, client)| {
759                    if pubkeys.contains(name) {
760                        Some(client)
761                    } else {
762                        None
763                    }
764                })
765                .map(|client| {
766                    let cert = certificate.clone();
767                    async move { client.handle_certificate_v2(cert, None).await }
768                })
769                .collect();
770
771            let replies: Vec<_> = futures::future::join_all(futures)
772                .await
773                .into_iter()
774                .filter(|result| match result {
775                    Err(e) => !e.to_string().contains("deadline has elapsed"),
776                    _ => true,
777                })
778                .collect();
779
780            if !replies.is_empty() {
781                break replies;
782            }
783        };
784        let replies: SuiResult<Vec<_>> = replies.into_iter().collect();
785        let replies = replies?;
786        let mut all_effects = HashMap::new();
787        let mut all_events = HashMap::new();
788        for reply in replies {
789            let effects = reply.signed_effects.into_data();
790            all_effects.insert(effects.digest(), effects);
791            all_events.insert(reply.events.digest(), reply.events);
792            // reply.fastpath_input_objects is unused.
793        }
794        assert_eq!(all_effects.len(), 1);
795        assert_eq!(all_events.len(), 1);
796        Ok((
797            all_effects.into_values().next().unwrap(),
798            all_events.into_values().next().unwrap(),
799        ))
800    }
801
802    /// This call sends some funds from the seeded address to the funding
803    /// address for the given amount and returns the gas object ref. This
804    /// is useful to construct transactions from the funding address.
805    pub async fn fund_address_and_return_gas(
806        &self,
807        rgp: u64,
808        amount: Option<u64>,
809        funding_address: SuiAddress,
810    ) -> ObjectRef {
811        let context = &self.wallet;
812        let (sender, gas) = context.get_one_gas_object().await.unwrap().unwrap();
813        let tx = context
814            .sign_transaction(
815                &TestTransactionBuilder::new(sender, gas, rgp)
816                    .transfer_sui(amount, funding_address)
817                    .build(),
818            )
819            .await;
820        context.execute_transaction_must_succeed(tx).await;
821
822        context
823            .get_one_gas_object_owned_by_address(funding_address)
824            .await
825            .unwrap()
826            .unwrap()
827    }
828
829    pub async fn transfer_sui_must_exceed(
830        &self,
831        sender: SuiAddress,
832        receiver: SuiAddress,
833        amount: u64,
834    ) -> ObjectID {
835        let tx = self
836            .test_transaction_builder_with_sender(sender)
837            .await
838            .transfer_sui(Some(amount), receiver)
839            .build();
840        let effects = self
841            .sign_and_execute_transaction(&tx)
842            .await
843            .effects
844            .unwrap();
845        assert_eq!(&SuiExecutionStatus::Success, effects.status());
846        effects.created().first().unwrap().object_id()
847    }
848
849    #[cfg(msim)]
850    pub fn set_safe_mode_expected(&self, value: bool) {
851        for n in self.all_node_handles() {
852            n.with(|node| node.set_safe_mode_expected(value));
853        }
854    }
855}
856
857pub struct RandomNodeRestarter {
858    test_cluster: Arc<TestCluster>,
859
860    // How frequently should we kill nodes
861    kill_interval: Uniform<Duration>,
862    // How long should we wait before restarting them.
863    restart_delay: Uniform<Duration>,
864
865    task_handle: Mutex<Option<JoinHandle<()>>>,
866}
867
868impl RandomNodeRestarter {
869    fn new(test_cluster: Arc<TestCluster>) -> Self {
870        Self {
871            test_cluster,
872            kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
873            restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
874            task_handle: Default::default(),
875        }
876    }
877
878    pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
879        self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
880        self
881    }
882
883    pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
884        self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
885        self
886    }
887
888    pub fn run(&self) {
889        let test_cluster = self.test_cluster.clone();
890        let kill_interval = self.kill_interval;
891        let restart_delay = self.restart_delay;
892        let validators = self.test_cluster.get_validator_pubkeys();
893        let mut task_handle = self.task_handle.lock().unwrap();
894        assert!(task_handle.is_none());
895        task_handle.replace(tokio::task::spawn(async move {
896            loop {
897                let delay = kill_interval.sample(&mut OsRng);
898                info!("Sleeping {delay:?} before killing a validator");
899                sleep(delay).await;
900
901                let validator = validators.choose(&mut OsRng).unwrap();
902                info!("Killing validator {:?}", validator.concise());
903                test_cluster.stop_node(validator);
904
905                let delay = restart_delay.sample(&mut OsRng);
906                info!("Sleeping {delay:?} before restarting");
907                sleep(delay).await;
908                info!("Starting validator {:?}", validator.concise());
909                test_cluster.start_node(validator).await;
910            }
911        }));
912    }
913}
914
915impl Drop for RandomNodeRestarter {
916    fn drop(&mut self) {
917        if let Some(handle) = self.task_handle.lock().unwrap().take() {
918            handle.abort();
919        }
920    }
921}
922
923pub struct TestClusterBuilder {
924    genesis_config: Option<GenesisConfig>,
925    network_config: Option<NetworkConfig>,
926    additional_objects: Vec<Object>,
927    num_validators: Option<usize>,
928    validators: Option<Vec<ValidatorGenesisConfig>>,
929    fullnode_rpc_port: Option<u16>,
930    enable_fullnode_events: bool,
931    disable_fullnode_pruning: bool,
932    validator_supported_protocol_versions_config: ProtocolVersionsConfig,
933    // Default to validator_supported_protocol_versions_config, but can be overridden.
934    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
935    db_checkpoint_config_validators: DBCheckpointConfig,
936    db_checkpoint_config_fullnodes: DBCheckpointConfig,
937    num_unpruned_validators: Option<usize>,
938    jwk_fetch_interval: Option<Duration>,
939    config_dir: Option<PathBuf>,
940    default_jwks: bool,
941    authority_overload_config: Option<AuthorityOverloadConfig>,
942    execution_cache_config: Option<ExecutionCacheConfig>,
943    data_ingestion_dir: Option<PathBuf>,
944    fullnode_run_with_range: Option<RunWithRange>,
945    fullnode_policy_config: Option<PolicyConfig>,
946    fullnode_fw_config: Option<RemoteFirewallConfig>,
947
948    max_submit_position: Option<usize>,
949    submit_delay_step_override_millis: Option<u64>,
950    validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
951
952    indexer_backed_rpc: bool,
953    rpc_config: Option<sui_config::RpcConfig>,
954
955    chain_override: Option<Chain>,
956
957    execution_time_observer_config: Option<sui_config::node::ExecutionTimeObserverConfig>,
958
959    #[cfg(msim)]
960    inject_synthetic_execution_time: bool,
961}
962
963impl TestClusterBuilder {
964    pub fn new() -> Self {
965        TestClusterBuilder {
966            genesis_config: None,
967            network_config: None,
968            chain_override: None,
969            additional_objects: vec![],
970            fullnode_rpc_port: None,
971            num_validators: None,
972            validators: None,
973            enable_fullnode_events: false,
974            disable_fullnode_pruning: false,
975            validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
976            fullnode_supported_protocol_versions_config: None,
977            db_checkpoint_config_validators: DBCheckpointConfig::default(),
978            db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
979            num_unpruned_validators: None,
980            jwk_fetch_interval: None,
981            config_dir: None,
982            default_jwks: false,
983            authority_overload_config: None,
984            execution_cache_config: None,
985            data_ingestion_dir: None,
986            fullnode_run_with_range: None,
987            fullnode_policy_config: None,
988            fullnode_fw_config: None,
989            max_submit_position: None,
990            submit_delay_step_override_millis: None,
991            validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(
992                true,
993            ),
994            indexer_backed_rpc: false,
995            rpc_config: None,
996            execution_time_observer_config: None,
997            #[cfg(msim)]
998            inject_synthetic_execution_time: false,
999        }
1000    }
1001
1002    pub fn with_execution_time_observer_config(
1003        mut self,
1004        config: sui_config::node::ExecutionTimeObserverConfig,
1005    ) -> Self {
1006        self.execution_time_observer_config = Some(config);
1007        self
1008    }
1009
1010    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1011        if let Some(run_with_range) = run_with_range {
1012            self.fullnode_run_with_range = Some(run_with_range);
1013        }
1014        self
1015    }
1016
1017    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1018        self.fullnode_policy_config = config;
1019        self
1020    }
1021
1022    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1023        self.fullnode_fw_config = config;
1024        self
1025    }
1026
1027    pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1028        self.fullnode_rpc_port = Some(rpc_port);
1029        self
1030    }
1031
1032    pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1033        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1034        self.genesis_config = Some(genesis_config);
1035        self
1036    }
1037
1038    pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1039        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1040        self.network_config = Some(network_config);
1041        self
1042    }
1043
1044    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1045        self.additional_objects.extend(objects);
1046        self
1047    }
1048
1049    /// Set the number of default validators to spawn. Can be overridden by `with_validators`, if
1050    /// you need to provide more specific genesis configs for each validator.
1051    pub fn with_num_validators(mut self, num: usize) -> Self {
1052        self.num_validators = Some(num);
1053        self
1054    }
1055
1056    /// Provide validator genesis configs, overrides the `num_validators` setting.
1057    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
1058        self.validators = Some(validators);
1059        self
1060    }
1061
1062    pub fn enable_fullnode_events(mut self) -> Self {
1063        self.enable_fullnode_events = true;
1064        self
1065    }
1066
1067    pub fn disable_fullnode_pruning(mut self) -> Self {
1068        self.disable_fullnode_pruning = true;
1069        self
1070    }
1071
1072    pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1073        self.db_checkpoint_config_validators = DBCheckpointConfig {
1074            perform_db_checkpoints_at_epoch_end: true,
1075            checkpoint_path: None,
1076            object_store_config: None,
1077            perform_index_db_checkpoints_at_epoch_end: None,
1078            prune_and_compact_before_upload: None,
1079        };
1080        self
1081    }
1082
1083    pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1084        self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1085            perform_db_checkpoints_at_epoch_end: true,
1086            checkpoint_path: None,
1087            object_store_config: None,
1088            perform_index_db_checkpoints_at_epoch_end: None,
1089            prune_and_compact_before_upload: Some(true),
1090        };
1091        self
1092    }
1093
1094    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1095        self.get_or_init_genesis_config()
1096            .parameters
1097            .epoch_duration_ms = epoch_duration_ms;
1098        self
1099    }
1100
1101    pub fn with_stake_subsidy_start_epoch(mut self, stake_subsidy_start_epoch: u64) -> Self {
1102        self.get_or_init_genesis_config()
1103            .parameters
1104            .stake_subsidy_start_epoch = stake_subsidy_start_epoch;
1105        self
1106    }
1107
1108    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1109        self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1110        self
1111    }
1112
1113    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1114        self.jwk_fetch_interval = Some(i);
1115        self
1116    }
1117
1118    pub fn with_fullnode_supported_protocol_versions_config(
1119        mut self,
1120        c: SupportedProtocolVersions,
1121    ) -> Self {
1122        self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1123        self
1124    }
1125
1126    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1127        self.get_or_init_genesis_config()
1128            .parameters
1129            .protocol_version = v;
1130        self
1131    }
1132
1133    pub fn with_supported_protocol_version_callback(
1134        mut self,
1135        func: SupportedProtocolVersionsCallback,
1136    ) -> Self {
1137        self.validator_supported_protocol_versions_config =
1138            ProtocolVersionsConfig::PerValidator(func);
1139        self
1140    }
1141
1142    pub fn with_global_state_hash_v2_enabled_callback(
1143        mut self,
1144        func: GlobalStateHashV2EnabledCallback,
1145    ) -> Self {
1146        self.validator_global_state_hash_v2_enabled_config =
1147            GlobalStateHashV2EnabledConfig::PerValidator(func);
1148        self
1149    }
1150
1151    pub fn with_validator_candidates(
1152        mut self,
1153        addresses: impl IntoIterator<Item = SuiAddress>,
1154    ) -> Self {
1155        self.get_or_init_genesis_config()
1156            .accounts
1157            .extend(addresses.into_iter().map(|address| AccountConfig {
1158                address: Some(address),
1159                gas_amounts: vec![DEFAULT_GAS_AMOUNT, DEFAULT_GAS_AMOUNT],
1160            }));
1161        self
1162    }
1163
1164    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1165        self.num_unpruned_validators = Some(n);
1166        self
1167    }
1168
1169    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1170        self.get_or_init_genesis_config().accounts = accounts;
1171        self
1172    }
1173
1174    pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1175        self.get_or_init_genesis_config().accounts.extend(accounts);
1176        self
1177    }
1178
1179    pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1180        self.config_dir = Some(config_dir);
1181        self
1182    }
1183
1184    pub fn with_default_jwks(mut self) -> Self {
1185        self.default_jwks = true;
1186        self
1187    }
1188
1189    pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1190        assert!(self.network_config.is_none());
1191        self.authority_overload_config = Some(config);
1192        self
1193    }
1194
1195    pub fn with_execution_cache_config(mut self, config: ExecutionCacheConfig) -> Self {
1196        assert!(self.network_config.is_none());
1197        self.execution_cache_config = Some(config);
1198        self
1199    }
1200
1201    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1202        self.data_ingestion_dir = Some(path);
1203        self
1204    }
1205
1206    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1207        self.max_submit_position = Some(max_submit_position);
1208        self
1209    }
1210
1211    pub fn with_submit_delay_step_override_millis(
1212        mut self,
1213        submit_delay_step_override_millis: u64,
1214    ) -> Self {
1215        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1216        self
1217    }
1218
1219    pub fn with_indexer_backed_rpc(mut self) -> Self {
1220        self.indexer_backed_rpc = true;
1221        self
1222    }
1223
1224    pub fn with_rpc_config(mut self, config: sui_config::RpcConfig) -> Self {
1225        self.rpc_config = Some(config);
1226        self
1227    }
1228
1229    pub fn with_chain_override(mut self, chain: Chain) -> Self {
1230        self.chain_override = Some(chain);
1231        self
1232    }
1233
1234    #[cfg(msim)]
1235    pub fn with_synthetic_execution_time_injection(mut self) -> Self {
1236        self.inject_synthetic_execution_time = true;
1237        self
1238    }
1239
1240    pub async fn build(mut self) -> TestCluster {
1241        // All test clusters receive a continuous stream of random JWKs.
1242        // If we later use zklogin authenticated transactions in tests we will need to supply
1243        // valid JWKs as well.
1244        #[cfg(msim)]
1245        if !self.default_jwks {
1246            sui_node::set_jwk_injector(Arc::new(|_authority, provider| {
1247                use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1248                use rand::Rng;
1249
1250                // generate random (and possibly conflicting) id/key pairings.
1251                let id_num = rand::thread_rng().gen_range(1..=4);
1252                let key_num = rand::thread_rng().gen_range(1..=4);
1253
1254                let id = JwkId {
1255                    iss: provider.get_config().iss,
1256                    kid: format!("kid{}", id_num),
1257                };
1258
1259                let jwk = JWK {
1260                    kty: "kty".to_string(),
1261                    e: "e".to_string(),
1262                    n: format!("n{}", key_num),
1263                    alg: "alg".to_string(),
1264                };
1265
1266                Ok(vec![(id, jwk)])
1267            }));
1268        }
1269
1270        let mut temp_data_ingestion_dir = None;
1271        let mut data_ingestion_path = None;
1272
1273        if self.indexer_backed_rpc {
1274            if self.data_ingestion_dir.is_none() {
1275                temp_data_ingestion_dir = Some(mysten_common::tempdir().unwrap());
1276                self.data_ingestion_dir = Some(
1277                    temp_data_ingestion_dir
1278                        .as_ref()
1279                        .unwrap()
1280                        .path()
1281                        .to_path_buf(),
1282                );
1283                assert!(self.data_ingestion_dir.is_some());
1284            }
1285            assert!(self.data_ingestion_dir.is_some());
1286            data_ingestion_path = Some(self.data_ingestion_dir.as_ref().unwrap().to_path_buf());
1287        }
1288
1289        let swarm = self.start_swarm().await.unwrap();
1290        let working_dir = swarm.dir();
1291
1292        let fullnode = swarm.fullnodes().next().unwrap();
1293        let json_rpc_address = fullnode.config().json_rpc_address;
1294        let fullnode_handle =
1295            FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1296
1297        let (rpc_url, indexer_handle) = if self.indexer_backed_rpc {
1298            let handle = test_indexer_handle::IndexerHandle::new(
1299                fullnode_handle.rpc_url.clone(),
1300                temp_data_ingestion_dir,
1301                data_ingestion_path.unwrap(),
1302            )
1303            .await;
1304            (handle.rpc_url.clone(), Some(handle))
1305        } else {
1306            (fullnode_handle.rpc_url.clone(), None)
1307        };
1308
1309        let mut wallet_conf: SuiClientConfig =
1310            PersistedConfig::read(&working_dir.join(SUI_CLIENT_CONFIG)).unwrap();
1311        wallet_conf.envs.push(SuiEnv {
1312            alias: "localnet".to_string(),
1313            rpc: rpc_url,
1314            ws: None,
1315            basic_auth: None,
1316            chain_id: None,
1317        });
1318        wallet_conf.active_env = Some("localnet".to_string());
1319
1320        wallet_conf
1321            .persisted(&working_dir.join(SUI_CLIENT_CONFIG))
1322            .save()
1323            .unwrap();
1324
1325        let wallet_conf = swarm.dir().join(SUI_CLIENT_CONFIG);
1326        let wallet = WalletContext::new(&wallet_conf).unwrap();
1327
1328        TestCluster {
1329            swarm,
1330            wallet,
1331            fullnode_handle,
1332            indexer_handle,
1333        }
1334    }
1335
1336    /// Start a Swarm and set up WalletConfig
1337    async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1338        let mut builder: SwarmBuilder = Swarm::builder()
1339            .with_objects(self.additional_objects.clone())
1340            .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1341            .with_supported_protocol_versions_config(
1342                self.validator_supported_protocol_versions_config.clone(),
1343            )
1344            .with_global_state_hash_v2_enabled_config(
1345                self.validator_global_state_hash_v2_enabled_config.clone(),
1346            )
1347            .with_fullnode_count(1)
1348            .with_fullnode_supported_protocol_versions_config(
1349                self.fullnode_supported_protocol_versions_config
1350                    .clone()
1351                    .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1352            )
1353            .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1354            .with_fullnode_run_with_range(self.fullnode_run_with_range)
1355            .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1356            .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1357
1358        if let Some(validators) = self.validators.take() {
1359            builder = builder.with_validators(validators);
1360        } else {
1361            builder = builder.committee_size(
1362                NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1363            )
1364        };
1365
1366        if let Some(chain) = self.chain_override {
1367            builder = builder.with_chain_override(chain);
1368        }
1369
1370        if let Some(genesis_config) = self.genesis_config.take() {
1371            builder = builder.with_genesis_config(genesis_config);
1372        }
1373
1374        if let Some(network_config) = self.network_config.take() {
1375            builder = builder.with_network_config(network_config);
1376        }
1377
1378        if let Some(authority_overload_config) = self.authority_overload_config.take() {
1379            builder = builder.with_authority_overload_config(authority_overload_config);
1380        }
1381
1382        if let Some(execution_cache_config) = self.execution_cache_config.take() {
1383            builder = builder.with_execution_cache_config(execution_cache_config);
1384        }
1385
1386        if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1387            builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1388        }
1389
1390        if let Some(rpc_config) = &self.rpc_config {
1391            builder = builder.with_fullnode_rpc_config(rpc_config.clone());
1392        }
1393        if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1394            builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1395        }
1396
1397        if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1398            builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1399        }
1400
1401        if let Some(config_dir) = self.config_dir.take() {
1402            builder = builder.dir(config_dir);
1403        }
1404
1405        if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1406            builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1407        }
1408
1409        if let Some(max_submit_position) = self.max_submit_position {
1410            builder = builder.with_max_submit_position(max_submit_position);
1411        }
1412
1413        if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1414            builder =
1415                builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1416        }
1417
1418        if self.disable_fullnode_pruning {
1419            builder = builder.with_disable_fullnode_pruning();
1420        }
1421
1422        #[cfg(msim)]
1423        {
1424            if let Some(mut config) = self.execution_time_observer_config.clone() {
1425                if self.inject_synthetic_execution_time {
1426                    config.inject_synthetic_execution_time = Some(true);
1427                }
1428                builder = builder.with_execution_time_observer_config(config);
1429            } else if self.inject_synthetic_execution_time {
1430                use sui_config::node::ExecutionTimeObserverConfig;
1431
1432                let mut config = ExecutionTimeObserverConfig::default();
1433                config.inject_synthetic_execution_time = Some(true);
1434                builder = builder.with_execution_time_observer_config(config);
1435            }
1436        }
1437
1438        let mut swarm = builder.build();
1439        swarm.launch().await?;
1440
1441        let dir = swarm.dir();
1442
1443        let network_path = dir.join(SUI_NETWORK_CONFIG);
1444        let wallet_path = dir.join(SUI_CLIENT_CONFIG);
1445        let keystore_path = dir.join(SUI_KEYSTORE_FILENAME);
1446
1447        swarm.config().save(network_path)?;
1448        let mut keystore = Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?);
1449        for key in &swarm.config().account_keys {
1450            keystore
1451                .import(None, SuiKeyPair::Ed25519(key.copy()))
1452                .await?;
1453        }
1454
1455        let active_address = keystore.addresses().first().cloned();
1456
1457        // Create wallet config with stated authorities port
1458        SuiClientConfig {
1459            keystore: Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?),
1460            external_keys: None,
1461            envs: Default::default(),
1462            active_address,
1463            active_env: Default::default(),
1464        }
1465        .save(wallet_path)?;
1466
1467        // Return network handle
1468        Ok(swarm)
1469    }
1470
1471    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1472        if self.genesis_config.is_none() {
1473            self.genesis_config = Some(GenesisConfig::for_local_testing());
1474        }
1475        self.genesis_config.as_mut().unwrap()
1476    }
1477}
1478
1479impl Default for TestClusterBuilder {
1480    fn default() -> Self {
1481        Self::new()
1482    }
1483}