test_cluster/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::{StreamExt, future::join_all};
5use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
6use mysten_common::fatal;
7use rand::{Rng, distributions::*, rngs::OsRng, seq::SliceRandom};
8use std::net::SocketAddr;
9use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13use sui_config::genesis::Genesis;
14use sui_config::node::FundsWithdrawSchedulerType;
15use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
16use sui_config::{Config, ExecutionCacheConfig, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG};
17use sui_config::{NodeConfig, PersistedConfig, SUI_KEYSTORE_FILENAME};
18use sui_core::authority_aggregator::AuthorityAggregator;
19use sui_core::authority_client::NetworkAuthorityClient;
20use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, TransactionFilter};
21use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
22use sui_node::SuiNodeHandle;
23use sui_protocol_config::{Chain, ProtocolVersion};
24use sui_rpc_api::Client;
25use sui_rpc_api::client::ExecutedTransaction;
26use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
27use sui_sdk::wallet_context::WalletContext;
28use sui_sdk::{SuiClient, SuiClientBuilder};
29use sui_swarm::memory::{Swarm, SwarmBuilder};
30use sui_swarm_config::genesis_config::{
31    AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig,
32};
33use sui_swarm_config::network_config::NetworkConfig;
34use sui_swarm_config::network_config_builder::{
35    FundsWithdrawSchedulerTypeConfig, GlobalStateHashV2EnabledCallback,
36    GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig, SupportedProtocolVersionsCallback,
37};
38use sui_swarm_config::node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder};
39use sui_test_transaction_builder::TestTransactionBuilder;
40use sui_types::authenticator_state::get_authenticator_state;
41use sui_types::base_types::ConciseableName;
42use sui_types::base_types::{AuthorityName, ObjectID, ObjectRef, SuiAddress};
43use sui_types::committee::CommitteeTrait;
44use sui_types::committee::{Committee, EpochId};
45use sui_types::crypto::KeypairTraits;
46use sui_types::crypto::SuiKeyPair;
47use sui_types::digests::{ChainIdentifier, TransactionDigest};
48use sui_types::effects::TransactionEffectsAPI;
49use sui_types::effects::{TransactionEffects, TransactionEvents};
50use sui_types::error::SuiResult;
51use sui_types::messages_grpc::{
52    RawSubmitTxRequest, SubmitTxRequest, SubmitTxResult, SubmitTxType, WaitForEffectsRequest,
53    WaitForEffectsResponse,
54};
55use sui_types::object::Object;
56use sui_types::sui_system_state::SuiSystemState;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
59use sui_types::supported_protocol_versions::SupportedProtocolVersions;
60use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
61use sui_types::transaction::{Transaction, TransactionData, TransactionDataAPI, TransactionKind};
62use tokio::sync::broadcast;
63use tokio::time::{Instant, timeout};
64use tokio::{task::JoinHandle, time::sleep};
65use tonic::IntoRequest;
66use tracing::{error, info};
67
68pub mod addr_balance_test_env;
69
70const NUM_VALIDATOR: usize = 4;
71
72pub struct FullNodeHandle {
73    pub sui_node: SuiNodeHandle,
74    #[deprecated = "use grpc_client"]
75    pub sui_client: SuiClient,
76    #[deprecated = "use grpc_client"]
77    pub rpc_client: HttpClient,
78    pub grpc_client: Client,
79    pub grpc_channel: tonic::transport::Channel,
80    pub rpc_url: String,
81}
82
83impl FullNodeHandle {
84    pub async fn new(sui_node: SuiNodeHandle, json_rpc_address: SocketAddr) -> Self {
85        let rpc_url = format!("http://{}", json_rpc_address);
86        let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
87
88        let sui_client = SuiClientBuilder::default().build(&rpc_url).await.unwrap();
89        let grpc_client = Client::new(&rpc_url).unwrap();
90
91        // Eagerly connect a shared gRPC channel with retry logic. Tests should
92        // create tonic service clients via `XxxServiceClient::new(channel)` instead
93        // of `XxxServiceClient::connect(url)` so that connections are established
94        // once during cluster setup rather than ad-hoc in each test.
95        let grpc_channel = Self::connect_channel_with_retry(&rpc_url).await;
96
97        // Warm up the sui_rpc_api::Client's internal lazy channel. Without this,
98        // the connect_lazy() channel may time out on its first use under MSIM's
99        // deterministic scheduler on certain seeds.
100        grpc_client
101            .get_reference_gas_price()
102            .await
103            .expect("failed to warm up grpc client");
104
105        Self {
106            sui_node,
107            #[allow(deprecated)]
108            sui_client,
109            #[allow(deprecated)]
110            rpc_client,
111            grpc_client,
112            grpc_channel,
113            rpc_url,
114        }
115    }
116
117    /// Connect a tonic channel to the given URL with retries.
118    ///
119    /// In the simulator (MSIM), deterministic scheduling can cause gRPC connection
120    /// handshakes to exceed timeouts on certain seeds. This method retries to ensure
121    /// test infrastructure setup is robust regardless of seed.
122    ///
123    /// Use [`TestCluster::grpc_channel()`] when connecting to the test cluster's
124    /// fullnode. Use this method directly only when connecting to a custom URL
125    /// (e.g. a separately-spawned fullnode with different configuration).
126    pub async fn connect_channel_with_retry(url: &str) -> tonic::transport::Channel {
127        const MAX_RETRIES: u32 = 10;
128        const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
129
130        let endpoint = tonic::transport::Endpoint::from_shared(url.to_string())
131            .expect("invalid grpc url")
132            .connect_timeout(CONNECT_TIMEOUT);
133
134        for attempt in 0..MAX_RETRIES {
135            match tokio::time::timeout(CONNECT_TIMEOUT, endpoint.connect()).await {
136                Ok(Ok(channel)) => return channel,
137                Ok(Err(e)) if attempt + 1 < MAX_RETRIES => {
138                    info!(
139                        "grpc channel connect attempt {} failed: {e}, retrying",
140                        attempt + 1
141                    );
142                    tokio::time::sleep(Duration::from_millis(100)).await;
143                }
144                Ok(Err(e)) => {
145                    panic!("grpc channel connect failed after {MAX_RETRIES} attempts: {e}")
146                }
147                Err(_) if attempt + 1 < MAX_RETRIES => {
148                    info!(
149                        "grpc channel connect attempt {} timed out, retrying",
150                        attempt + 1
151                    );
152                    tokio::time::sleep(Duration::from_millis(100)).await;
153                }
154                Err(_) => panic!("grpc channel connect timed out after {MAX_RETRIES} attempts"),
155            }
156        }
157        unreachable!()
158    }
159}
160
161pub struct TestCluster {
162    pub swarm: Swarm,
163    pub wallet: WalletContext,
164    pub fullnode_handle: FullNodeHandle,
165}
166
167impl TestCluster {
168    #[deprecated = "use grpc_client()"]
169    pub fn rpc_client(&self) -> &HttpClient {
170        #[allow(deprecated)]
171        &self.fullnode_handle.rpc_client
172    }
173
174    #[deprecated = "use grpc_client()"]
175    pub fn sui_client(&self) -> &SuiClient {
176        #[allow(deprecated)]
177        &self.fullnode_handle.sui_client
178    }
179
180    pub fn grpc_client(&self) -> Client {
181        self.fullnode_handle.grpc_client.clone()
182    }
183
184    /// Returns a pre-connected gRPC channel to the fullnode.
185    ///
186    /// Use this to create tonic service clients in tests:
187    /// ```ignore
188    /// let mut client = LedgerServiceClient::new(test_cluster.grpc_channel());
189    /// ```
190    ///
191    /// This channel is connected during test cluster setup with retry logic,
192    /// so it is resilient to MSIM scheduling-induced connection timeouts.
193    /// Prefer this over `XxxServiceClient::connect(url)` which has no retry.
194    pub fn grpc_channel(&self) -> tonic::transport::Channel {
195        self.fullnode_handle.grpc_channel.clone()
196    }
197
198    pub fn rpc_url(&self) -> &str {
199        &self.fullnode_handle.rpc_url
200    }
201
202    pub fn wallet(&mut self) -> &WalletContext {
203        &self.wallet
204    }
205
206    pub fn wallet_mut(&mut self) -> &mut WalletContext {
207        &mut self.wallet
208    }
209
210    pub fn get_addresses(&self) -> Vec<SuiAddress> {
211        self.wallet.get_addresses()
212    }
213
214    // Helper function to get the 0th address in WalletContext
215    pub fn get_address_0(&self) -> SuiAddress {
216        self.get_addresses()[0]
217    }
218
219    // Helper function to get the 1st address in WalletContext
220    pub fn get_address_1(&self) -> SuiAddress {
221        self.get_addresses()[1]
222    }
223
224    // Helper function to get the 2nd address in WalletContext
225    pub fn get_address_2(&self) -> SuiAddress {
226        self.get_addresses()[2]
227    }
228
229    pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
230        self.swarm.get_fullnode_config_builder()
231    }
232
233    pub fn committee(&self) -> Arc<Committee> {
234        self.fullnode_handle
235            .sui_node
236            .with(|node| node.state().epoch_store_for_testing().committee().clone())
237    }
238
239    pub fn get_sui_system_state(&self) -> SuiSystemState {
240        self.fullnode_handle.sui_node.with(|node| {
241            node.state()
242                .get_sui_system_state_object_for_testing()
243                .unwrap()
244        })
245    }
246
247    /// Convenience method to start a new fullnode in the test cluster.
248    pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
249        self.start_fullnode_from_config(
250            self.fullnode_config_builder()
251                .build(&mut OsRng, self.swarm.config()),
252        )
253        .await
254    }
255
256    pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
257        let json_rpc_address = config.json_rpc_address;
258        let node = self.swarm.spawn_new_node(config).await;
259        FullNodeHandle::new(node, json_rpc_address).await
260    }
261
262    pub fn all_node_handles(&self) -> Vec<SuiNodeHandle> {
263        self.swarm
264            .all_nodes()
265            .flat_map(|n| n.get_node_handle())
266            .collect()
267    }
268
269    pub fn all_validator_handles(&self) -> Vec<SuiNodeHandle> {
270        self.swarm
271            .validator_nodes()
272            .map(|n| n.get_node_handle().unwrap())
273            .collect()
274    }
275
276    pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
277        self.swarm.active_validators().map(|v| v.name()).collect()
278    }
279
280    pub fn get_genesis(&self) -> Genesis {
281        self.swarm.config().genesis.clone()
282    }
283
284    pub fn stop_node(&self, name: &AuthorityName) {
285        self.swarm.node(name).unwrap().stop();
286    }
287
288    pub async fn stop_all_validators(&self) {
289        info!("Stopping all validators in the cluster");
290        self.swarm.active_validators().for_each(|v| v.stop());
291        tokio::time::sleep(Duration::from_secs(3)).await;
292    }
293
294    pub async fn start_all_validators(&self) {
295        info!("Starting all validators in the cluster");
296        for v in self.swarm.validator_nodes() {
297            if v.is_running() {
298                continue;
299            }
300            v.start().await.unwrap();
301        }
302        tokio::time::sleep(Duration::from_secs(3)).await;
303    }
304
305    pub async fn start_node(&self, name: &AuthorityName) {
306        let node = self.swarm.node(name).unwrap();
307        if node.is_running() {
308            return;
309        }
310        node.start().await.unwrap();
311    }
312
313    pub async fn spawn_new_validator(
314        &mut self,
315        genesis_config: ValidatorGenesisConfig,
316    ) -> SuiNodeHandle {
317        let node_config = ValidatorConfigBuilder::new()
318            .build(genesis_config, self.swarm.config().genesis.clone());
319        self.swarm.spawn_new_node(node_config).await
320    }
321
322    pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
323        RandomNodeRestarter::new(self.clone())
324    }
325
326    pub async fn get_reference_gas_price(&self) -> u64 {
327        self.grpc_client()
328            .get_reference_gas_price()
329            .await
330            .expect("failed to get reference gas price")
331    }
332
333    pub fn get_chain_identifier(&self) -> ChainIdentifier {
334        ChainIdentifier::from(*self.swarm.config().genesis.checkpoint().digest())
335    }
336
337    pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
338        self.fullnode_handle
339            .sui_node
340            .with_async(|node| async { node.state().get_object(object_id).await })
341            .await
342    }
343
344    pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
345        self.get_object_from_fullnode_store(object_id)
346            .await
347            .unwrap()
348            .compute_object_reference()
349    }
350
351    pub async fn get_object_or_tombstone_from_fullnode_store(
352        &self,
353        object_id: ObjectID,
354    ) -> ObjectRef {
355        self.fullnode_handle
356            .sui_node
357            .state()
358            .get_object_cache_reader()
359            .get_latest_object_ref_or_tombstone(object_id)
360            .unwrap()
361    }
362
363    pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
364        self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
365            .await
366    }
367
368    pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
369        &self,
370        timeout_dur: Duration,
371    ) -> Option<RunWithRange> {
372        let mut shutdown_channel_rx = self
373            .fullnode_handle
374            .sui_node
375            .with(|node| node.subscribe_to_shutdown_channel());
376
377        timeout(timeout_dur, async move {
378            tokio::select! {
379                msg = shutdown_channel_rx.recv() =>
380                {
381                    match msg {
382                        Ok(Some(run_with_range)) => Some(run_with_range),
383                        Ok(None) => None,
384                        Err(e) => {
385                            error!("failed recv from sui-node shutdown channel: {}", e);
386                            None
387                        },
388                    }
389                },
390            }
391        })
392        .await
393        .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from sui-node")
394    }
395
396    pub async fn wait_for_protocol_version(
397        &self,
398        target_protocol_version: ProtocolVersion,
399    ) -> SuiSystemState {
400        self.wait_for_protocol_version_with_timeout(
401            target_protocol_version,
402            Duration::from_secs(60),
403        )
404        .await
405    }
406
407    pub async fn wait_for_protocol_version_with_timeout(
408        &self,
409        target_protocol_version: ProtocolVersion,
410        timeout_dur: Duration,
411    ) -> SuiSystemState {
412        timeout(timeout_dur, async move {
413            loop {
414                let system_state = self.wait_for_epoch(None).await;
415                if system_state.protocol_version() >= target_protocol_version.as_u64() {
416                    return system_state;
417                }
418            }
419        })
420        .await
421        .expect("Timed out waiting for cluster to target protocol version")
422    }
423
424    /// Ask 2f+1 validators to close epoch actively, and wait for the entire network to reach the next
425    /// epoch. This requires waiting for both the fullnode and all validators to reach the next epoch.
426    pub async fn trigger_reconfiguration(&self) {
427        info!("Starting reconfiguration");
428        let start = Instant::now();
429
430        // Close epoch on 2f+1 validators.
431        let cur_committee = self
432            .fullnode_handle
433            .sui_node
434            .with(|node| node.state().clone_committee_for_testing());
435        let mut cur_stake = 0;
436        for node in self.swarm.active_validators() {
437            node.get_node_handle()
438                .unwrap()
439                .with_async(|node| async {
440                    node.close_epoch_for_testing().await.unwrap_or_else(|_| {
441                        fatal!(
442                            "Failed to close epoch for validator {:?}",
443                            node.state().name
444                        );
445                    });
446                    cur_stake += cur_committee.weight(&node.state().name);
447                })
448                .await;
449            if cur_stake >= cur_committee.quorum_threshold() {
450                break;
451            }
452        }
453        info!("close_epoch complete after {:?}", start.elapsed());
454
455        self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
456        self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
457
458        info!("reconfiguration complete after {:?}", start.elapsed());
459    }
460
461    /// To detect whether the network has reached such state, we use the fullnode as the
462    /// source of truth, since a fullnode only does epoch transition when the network has
463    /// done so.
464    /// If target_epoch is specified, wait until the cluster reaches that epoch.
465    /// If target_epoch is None, wait until the cluster reaches the next epoch.
466    /// Note that this function does not guarantee that every node is at the target epoch.
467    pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
468        self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
469            .await
470    }
471
472    pub async fn wait_for_epoch_on_node(
473        &self,
474        handle: &SuiNodeHandle,
475        target_epoch: Option<EpochId>,
476        timeout_dur: Duration,
477    ) -> SuiSystemState {
478        let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
479
480        let mut state = None;
481        timeout(timeout_dur, async {
482            let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
483            if Some(epoch) == target_epoch {
484                return handle.with(|node| node.state().get_sui_system_state_object_for_testing().unwrap());
485            }
486            while let Ok(system_state) = epoch_rx.recv().await {
487                info!("received epoch {}", system_state.epoch());
488                state = Some(system_state.clone());
489                match target_epoch {
490                    Some(target_epoch) if system_state.epoch() >= target_epoch => {
491                        return system_state;
492                    }
493                    None => {
494                        return system_state;
495                    }
496                    _ => (),
497                }
498            }
499            unreachable!("Broken reconfig channel");
500        })
501        .await
502        .unwrap_or_else(|_| {
503            error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
504            if let Some(state) = state {
505                panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
506            }
507            panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
508        })
509    }
510
511    pub async fn wait_for_epoch_with_timeout(
512        &self,
513        target_epoch: Option<EpochId>,
514        timeout_dur: Duration,
515    ) -> SuiSystemState {
516        self.wait_for_epoch_on_node(&self.fullnode_handle.sui_node, target_epoch, timeout_dur)
517            .await
518    }
519
520    pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
521        let handles: Vec<_> = self
522            .swarm
523            .all_nodes()
524            .map(|node| node.get_node_handle().unwrap())
525            .collect();
526        let tasks: Vec<_> = handles
527            .iter()
528            .map(|handle| {
529                handle.with_async(|node| async {
530                    let mut retries = 0;
531                    loop {
532                        let epoch = node.state().epoch_store_for_testing().epoch();
533                        if epoch == target_epoch {
534                            if let Some(agg) = node.clone_authority_aggregator() {
535                                // This is a fullnode, we need to wait for its auth aggregator to reconfigure as well.
536                                if agg.committee.epoch() == target_epoch {
537                                    break;
538                                }
539                            } else {
540                                // This is a validator, we don't need to check the auth aggregator.
541                                break;
542                            }
543                        }
544                        tokio::time::sleep(Duration::from_secs(1)).await;
545                        retries += 1;
546                        if retries % 5 == 0 {
547                            tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
548                        }
549                    }
550                })
551            })
552            .collect();
553
554        timeout(Duration::from_secs(40), join_all(tasks))
555            .await
556            .expect("timed out waiting for reconfiguration to complete");
557    }
558
559    pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
560        // fullnode_handle is not part of swarm and cannot be dropped / killed
561        self.fullnode_handle
562            .sui_node
563            .with(|node| node.subscribe_to_epoch_change())
564    }
565
566    /// Upgrade the network protocol version, by restarting every validator with a new
567    /// supported versions.
568    /// Note that we don't restart the fullnode here, and it is assumed that the fulnode supports
569    /// the entire version range.
570    pub async fn update_validator_supported_versions(
571        &self,
572        new_supported_versions: SupportedProtocolVersions,
573    ) {
574        for authority in self.get_validator_pubkeys() {
575            self.stop_node(&authority);
576            tokio::time::sleep(Duration::from_millis(1000)).await;
577            self.swarm
578                .node(&authority)
579                .unwrap()
580                .config()
581                .supported_protocol_versions = Some(new_supported_versions);
582            self.start_node(&authority).await;
583            info!("Restarted validator {}", authority);
584        }
585    }
586
587    /// Wait for all nodes in the network to upgrade to `protocol_version`.
588    pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
589        for h in self.all_node_handles() {
590            h.with_async(|node| async {
591                while node
592                    .state()
593                    .epoch_store_for_testing()
594                    .epoch_start_state()
595                    .protocol_version()
596                    .as_u64()
597                    != protocol_version
598                {
599                    tokio::time::sleep(Duration::from_secs(1)).await;
600                }
601            })
602            .await;
603        }
604    }
605
606    pub async fn wait_for_authenticator_state_update(&self) {
607        timeout(
608            Duration::from_secs(60),
609            self.fullnode_handle.sui_node.with_async(|node| async move {
610                let state = node.state();
611                let mut txns = state.subscription_handler.subscribe_transactions(
612                    TransactionFilter::ChangedObject(ObjectID::from_hex_literal("0x7").unwrap()),
613                );
614
615                // Check if the state was already updated before subscribe_transactions was called
616                // above (after trigger_reconfiguration completes, the AuthenticatorStateUpdate
617                // transaction may have already been committed).
618                let has_active_jwks = get_authenticator_state(state.get_object_store())
619                    .ok()
620                    .flatten()
621                    .is_some_and(|state| !state.active_jwks.is_empty());
622                if has_active_jwks {
623                    return;
624                }
625
626                while let Some(tx) = txns.next().await {
627                    let digest = *tx.transaction_digest();
628                    let tx = state
629                        .get_transaction_cache_reader()
630                        .get_transaction_block(&digest)
631                        .unwrap();
632                    match &tx.data().intent_message().value.kind() {
633                        TransactionKind::EndOfEpochTransaction(_) => (),
634                        TransactionKind::AuthenticatorStateUpdate(_) => break,
635                        _ => panic!("{:?}", tx),
636                    }
637                }
638            }),
639        )
640        .await
641        .expect("Timed out waiting for authenticator state update");
642    }
643
644    /// Return the highest observed protocol version in the test cluster.
645    pub fn highest_protocol_version(&self) -> ProtocolVersion {
646        self.all_node_handles()
647            .into_iter()
648            .map(|h| {
649                h.with(|node| {
650                    node.state()
651                        .epoch_store_for_testing()
652                        .epoch_start_state()
653                        .protocol_version()
654                })
655            })
656            .max()
657            .expect("at least one node must be up to get highest protocol version")
658    }
659
660    pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
661        let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
662        self.test_transaction_builder_with_gas_object(sender, gas)
663            .await
664    }
665
666    pub async fn test_transaction_builder_with_sender(
667        &self,
668        sender: SuiAddress,
669    ) -> TestTransactionBuilder {
670        let gas = self
671            .wallet
672            .get_one_gas_object_owned_by_address(sender)
673            .await
674            .unwrap()
675            .unwrap();
676        self.test_transaction_builder_with_gas_object(sender, gas)
677            .await
678    }
679
680    pub async fn test_transaction_builder_with_gas_object(
681        &self,
682        sender: SuiAddress,
683        gas: ObjectRef,
684    ) -> TestTransactionBuilder {
685        let rgp = self.get_reference_gas_price().await;
686        TestTransactionBuilder::new(sender, gas, rgp)
687    }
688
689    pub async fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
690        self.wallet.sign_transaction(tx_data).await
691    }
692
693    pub async fn sign_and_execute_transaction(
694        &self,
695        tx_data: &TransactionData,
696    ) -> ExecutedTransaction {
697        let tx = self.wallet.sign_transaction(tx_data).await;
698        self.execute_transaction(tx).await
699    }
700
701    /// Sign and execute the transaction via direct validator submission, bypassing the fullnode.
702    pub async fn sign_and_execute_transaction_directly(
703        &self,
704        tx_data: &TransactionData,
705    ) -> SuiResult<(TransactionDigest, TransactionEffects)> {
706        let mut res = self
707            .sign_and_execute_txns_in_soft_bundle(std::slice::from_ref(tx_data))
708            .await?;
709        assert_eq!(res.len(), 1);
710        Ok(res.pop().unwrap())
711    }
712
713    /// Execute an already-signed transaction via direct validator submission, bypassing the fullnode.
714    pub async fn execute_transaction_directly(
715        &self,
716        tx: &Transaction,
717    ) -> SuiResult<(TransactionDigest, TransactionEffects)> {
718        let mut res = self
719            .execute_signed_txns_in_soft_bundle(std::slice::from_ref(tx))
720            .await?;
721        assert_eq!(res.len(), 1);
722        Ok(res.pop().unwrap())
723    }
724
725    /// Sign and execute multiple transactions in a soft bundle.
726    /// Soft bundles allow submitting multiple transactions together with best-effort
727    /// ordering if they use the same gas price. Transactions in a soft bundle can be
728    /// individually rejected or deferred without affecting other transactions.
729    ///
730    /// NOTE: This is a simplified implementation that processes transactions individually.
731    /// For true soft bundle submission, the test file should use the raw gRPC client directly
732    /// with tonic, as shown in test_soft_bundle_different_gas_payers.
733    pub async fn sign_and_execute_txns_in_soft_bundle(
734        &self,
735        txns: &[TransactionData],
736    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
737        // Sign all transactions
738        let signed_txs: Vec<Transaction> =
739            futures::future::join_all(txns.iter().map(|tx| self.wallet.sign_transaction(tx))).await;
740
741        self.execute_signed_txns_in_soft_bundle(&signed_txs).await
742    }
743
744    pub async fn execute_signed_txns_in_soft_bundle(
745        &self,
746        signed_txs: &[Transaction],
747    ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
748        let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
749
750        let request = RawSubmitTxRequest {
751            transactions: signed_txs
752                .iter()
753                .map(|tx| bcs::to_bytes(tx).unwrap().into())
754                .collect(),
755            submit_type: SubmitTxType::SoftBundle.into(),
756        };
757
758        let agg = self.authority_aggregator();
759        let clients = &agg.authority_clients;
760        // Use seeded RNG for deterministic but varying validator selection in simtests
761        let index = rand::thread_rng().gen_range(0..clients.len());
762        let mut validator_client = clients
763            .iter()
764            .nth(index)
765            .unwrap()
766            .1
767            .authority_client()
768            .get_client_for_testing()
769            .unwrap();
770
771        let result = validator_client
772            .submit_transaction(request.into_request())
773            .await
774            .map(tonic::Response::into_inner)?;
775        assert_eq!(result.results.len(), signed_txs.len());
776
777        for raw_result in result.results.iter() {
778            let submit_result: sui_types::messages_grpc::SubmitTxResult =
779                raw_result.clone().try_into()?;
780            if let sui_types::messages_grpc::SubmitTxResult::Rejected { error } = submit_result {
781                return Err(error);
782            }
783        }
784
785        let effects = self
786            .fullnode_handle
787            .sui_node
788            .with_async(|node| {
789                let digests = digests.clone();
790                async move {
791                    let state = node.state();
792                    let transaction_cache_reader = state.get_transaction_cache_reader();
793                    transaction_cache_reader
794                        .notify_read_executed_effects(
795                            "sign_and_execute_txns_in_soft_bundle",
796                            &digests,
797                        )
798                        .await
799                }
800            })
801            .await;
802
803        Ok(digests.into_iter().zip(effects.into_iter()).collect())
804    }
805
806    /// Execute signed transactions in a soft bundle and return results for each transaction.
807    /// Unlike `execute_signed_txns_in_soft_bundle`, this method handles conflicting transactions
808    /// where some may be executed and others rejected.
809    ///
810    /// Returns a vector of (digest, WaitForEffectsResponse) for each transaction.
811    pub async fn execute_soft_bundle_with_conflicts(
812        &self,
813        signed_txs: &[Transaction],
814    ) -> SuiResult<Vec<(TransactionDigest, WaitForEffectsResponse)>> {
815        let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
816
817        let request = RawSubmitTxRequest {
818            transactions: signed_txs
819                .iter()
820                .map(|tx| bcs::to_bytes(tx).unwrap().into())
821                .collect(),
822            submit_type: SubmitTxType::SoftBundle.into(),
823        };
824
825        let authority_aggregator = self.authority_aggregator();
826        let (_, safe_client) = authority_aggregator
827            .authority_clients
828            .iter()
829            .next()
830            .unwrap();
831        let mut validator_client = safe_client
832            .authority_client()
833            .get_client_for_testing()
834            .unwrap();
835
836        let result = validator_client
837            .submit_transaction(request.into_request())
838            .await
839            .map(tonic::Response::into_inner)?;
840        assert_eq!(result.results.len(), signed_txs.len());
841
842        // Extract consensus positions from submission results
843        let mut consensus_positions = Vec::new();
844        for (i, raw_result) in result.results.iter().enumerate() {
845            let submit_result: SubmitTxResult = raw_result.clone().try_into()?;
846            match submit_result {
847                SubmitTxResult::Submitted { consensus_position } => {
848                    consensus_positions.push(consensus_position);
849                }
850                SubmitTxResult::Executed { .. } => {
851                    panic!(
852                        "Transaction {} was already executed during submission",
853                        i + 1
854                    );
855                }
856                SubmitTxResult::Rejected { error } => {
857                    return Err(error);
858                }
859            }
860        }
861
862        // Wait for effects using consensus positions
863        let wait_futures: Vec<_> = digests
864            .iter()
865            .zip(consensus_positions.iter())
866            .map(|(digest, position)| {
867                let request = WaitForEffectsRequest {
868                    transaction_digest: Some(*digest),
869                    consensus_position: Some(*position),
870                    include_details: false,
871                    ping_type: None,
872                };
873                safe_client.wait_for_effects(request, None)
874            })
875            .collect();
876
877        let responses = futures::future::join_all(wait_futures).await;
878
879        let results: SuiResult<Vec<_>> = digests
880            .into_iter()
881            .zip(responses.into_iter())
882            .map(|(digest, response)| Ok((digest, response?)))
883            .collect();
884
885        results
886    }
887
888    pub async fn wait_for_tx_settlement(&self, digests: &[TransactionDigest]) {
889        self.fullnode_handle
890            .sui_node
891            .with_async(|node| async move {
892                let state = node.state();
893                // wait until the transactions are in checkpoints
894                let checkpoint_seqs = state
895                    .epoch_store_for_testing()
896                    .transactions_executed_in_checkpoint_notify(digests.to_vec())
897                    .await
898                    .unwrap();
899
900                // then wait until the highest of the checkpoints is executed
901                let max_checkpoint_seq = checkpoint_seqs.into_iter().max().unwrap();
902                state
903                    .checkpoint_store
904                    .notify_read_executed_checkpoint(max_checkpoint_seq)
905                    .await;
906            })
907            .await;
908    }
909
910    /// Execute a transaction on the network and wait for it to be executed on the rpc fullnode.
911    /// Also expects the effects status to be ExecutionStatus::Success.
912    /// This function is recommended for transaction execution since it most resembles the
913    /// production path.
914    pub async fn execute_transaction(&self, tx: Transaction) -> ExecutedTransaction {
915        self.wallet.execute_transaction_must_succeed(tx).await
916    }
917
918    /// Different from `execute_transaction` which returns RPC effects types, this function
919    /// returns raw effects, events and extra objects returned by the validators,
920    /// aggregated manually (without authority aggregator).
921    /// It also does not check whether the transaction is executed successfully.
922    /// In order to keep the fullnode up-to-date so that latter queries can read consistent
923    /// results, it calls execute_transaction_may_fail again which goes through fullnode.
924    /// This is less efficient and verbose, but can be used if more details are needed
925    /// from the execution results, and if the transaction is expected to fail.
926    pub async fn execute_transaction_return_raw_effects(
927        &self,
928        tx: Transaction,
929    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
930        let results = self.submit_and_execute(tx.clone(), None).await?;
931        self.wallet.execute_transaction_may_fail(tx).await.unwrap();
932        Ok(results)
933    }
934
935    pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
936        self.fullnode_handle
937            .sui_node
938            .with(|node| node.clone_authority_aggregator().unwrap())
939    }
940
941    /// Submit a transaction and wait for it to be executed.
942    /// With MFP, transactions are submitted to consensus and executed by validators.
943    /// Returns the transaction effects and events on success.
944    pub async fn submit_and_execute(
945        &self,
946        tx: Transaction,
947        client_addr: Option<SocketAddr>,
948    ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
949        let agg = self.authority_aggregator();
950        // Pick a validator to submit to using seeded RNG for deterministic simtest selection
951        let clients = &agg.authority_clients;
952        let index = rand::thread_rng().gen_range(0..clients.len());
953        let (_, client) = clients
954            .iter()
955            .nth(index)
956            .ok_or_else(|| anyhow::anyhow!("No authority clients available"))?;
957
958        // Submit the transaction
959        let submit_request = SubmitTxRequest::new_transaction(tx.clone());
960        let submit_response = client
961            .submit_transaction(submit_request, client_addr)
962            .await?;
963
964        // Check if already executed
965        for result in submit_response.results {
966            match result {
967                SubmitTxResult::Executed { details, .. } => {
968                    if let Some(data) = details {
969                        let events = data.events.unwrap_or_default();
970                        return Ok((data.effects, events));
971                    }
972                }
973                SubmitTxResult::Rejected { error } => {
974                    return Err(error.into());
975                }
976                SubmitTxResult::Submitted { .. } => {
977                    // Need to wait for effects
978                }
979            }
980        }
981
982        // Wait for effects
983        let wait_request = WaitForEffectsRequest {
984            transaction_digest: Some(*tx.digest()),
985            consensus_position: None,
986            include_details: true,
987            ping_type: None,
988        };
989
990        let response = client.wait_for_effects(wait_request, client_addr).await?;
991        match response {
992            WaitForEffectsResponse::Executed { details, .. } => {
993                let data = details.ok_or_else(|| anyhow::anyhow!("Expected execution details"))?;
994                let events = data.events.unwrap_or_default();
995                Ok((data.effects, events))
996            }
997            WaitForEffectsResponse::Rejected { error } => Err(error
998                .ok_or_else(|| anyhow::anyhow!("Transaction was rejected"))?
999                .into()),
1000            WaitForEffectsResponse::Expired { .. } => Err(anyhow::anyhow!("Transaction expired")),
1001        }
1002    }
1003
1004    /// This call sends some funds from the seeded address to the funding
1005    /// address for the given amount and returns the gas object ref. This
1006    /// is useful to construct transactions from the funding address.
1007    pub async fn fund_address_and_return_gas(
1008        &self,
1009        rgp: u64,
1010        amount: Option<u64>,
1011        funding_address: SuiAddress,
1012    ) -> ObjectRef {
1013        let context = &self.wallet;
1014        let (sender, gas) = context.get_one_gas_object().await.unwrap().unwrap();
1015        let tx = context
1016            .sign_transaction(
1017                &TestTransactionBuilder::new(sender, gas, rgp)
1018                    .transfer_sui(amount, funding_address)
1019                    .build(),
1020            )
1021            .await;
1022        context.execute_transaction_must_succeed(tx).await;
1023
1024        context
1025            .get_one_gas_object_owned_by_address(funding_address)
1026            .await
1027            .unwrap()
1028            .unwrap()
1029    }
1030
1031    pub async fn transfer_sui_must_exceed(
1032        &self,
1033        sender: SuiAddress,
1034        receiver: SuiAddress,
1035        amount: u64,
1036    ) -> ObjectID {
1037        let tx = self
1038            .test_transaction_builder_with_sender(sender)
1039            .await
1040            .transfer_sui(Some(amount), receiver)
1041            .build();
1042        let effects = self.sign_and_execute_transaction(&tx).await.effects;
1043        assert!(effects.status().is_ok());
1044        effects.created().first().unwrap().0.0
1045    }
1046
1047    #[cfg(msim)]
1048    pub fn set_safe_mode_expected(&self, value: bool) {
1049        for n in self.all_node_handles() {
1050            n.with(|node| node.set_safe_mode_expected(value));
1051        }
1052    }
1053}
1054
1055pub struct RandomNodeRestarter {
1056    test_cluster: Arc<TestCluster>,
1057
1058    // How frequently should we kill nodes
1059    kill_interval: Uniform<Duration>,
1060    // How long should we wait before restarting them.
1061    restart_delay: Uniform<Duration>,
1062
1063    task_handle: Mutex<Option<JoinHandle<()>>>,
1064}
1065
1066impl RandomNodeRestarter {
1067    fn new(test_cluster: Arc<TestCluster>) -> Self {
1068        Self {
1069            test_cluster,
1070            kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
1071            restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
1072            task_handle: Default::default(),
1073        }
1074    }
1075
1076    pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
1077        self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1078        self
1079    }
1080
1081    pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
1082        self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1083        self
1084    }
1085
1086    pub fn run(&self) {
1087        let test_cluster = self.test_cluster.clone();
1088        let kill_interval = self.kill_interval;
1089        let restart_delay = self.restart_delay;
1090        let validators = self.test_cluster.get_validator_pubkeys();
1091        let mut task_handle = self.task_handle.lock().unwrap();
1092        assert!(task_handle.is_none());
1093        task_handle.replace(tokio::task::spawn(async move {
1094            loop {
1095                let delay = kill_interval.sample(&mut OsRng);
1096                info!("Sleeping {delay:?} before killing a validator");
1097                sleep(delay).await;
1098
1099                let validator = validators.choose(&mut OsRng).unwrap();
1100                info!("Killing validator {:?}", validator.concise());
1101                test_cluster.stop_node(validator);
1102
1103                let delay = restart_delay.sample(&mut OsRng);
1104                info!("Sleeping {delay:?} before restarting");
1105                sleep(delay).await;
1106                info!("Starting validator {:?}", validator.concise());
1107                test_cluster.start_node(validator).await;
1108            }
1109        }));
1110    }
1111}
1112
1113impl Drop for RandomNodeRestarter {
1114    fn drop(&mut self) {
1115        if let Some(handle) = self.task_handle.lock().unwrap().take() {
1116            handle.abort();
1117        }
1118    }
1119}
1120
1121pub struct TestClusterBuilder {
1122    genesis_config: Option<GenesisConfig>,
1123    network_config: Option<NetworkConfig>,
1124    additional_objects: Vec<Object>,
1125    num_validators: Option<usize>,
1126    validators: Option<Vec<ValidatorGenesisConfig>>,
1127    fullnode_rpc_port: Option<u16>,
1128    enable_fullnode_events: bool,
1129    disable_fullnode_pruning: bool,
1130    validator_supported_protocol_versions_config: ProtocolVersionsConfig,
1131    // Default to validator_supported_protocol_versions_config, but can be overridden.
1132    fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
1133    db_checkpoint_config_validators: DBCheckpointConfig,
1134    db_checkpoint_config_fullnodes: DBCheckpointConfig,
1135    num_unpruned_validators: Option<usize>,
1136    jwk_fetch_interval: Option<Duration>,
1137    config_dir: Option<PathBuf>,
1138    default_jwks: bool,
1139    authority_overload_config: Option<AuthorityOverloadConfig>,
1140    execution_cache_config: Option<ExecutionCacheConfig>,
1141    data_ingestion_dir: Option<PathBuf>,
1142    fullnode_run_with_range: Option<RunWithRange>,
1143    fullnode_policy_config: Option<PolicyConfig>,
1144    fullnode_fw_config: Option<RemoteFirewallConfig>,
1145
1146    max_submit_position: Option<usize>,
1147    submit_delay_step_override_millis: Option<u64>,
1148    validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
1149    validator_funds_withdraw_scheduler_type_config: FundsWithdrawSchedulerTypeConfig,
1150
1151    rpc_config: Option<sui_config::RpcConfig>,
1152
1153    chain_override: Option<Chain>,
1154
1155    execution_time_observer_config: Option<sui_config::node::ExecutionTimeObserverConfig>,
1156
1157    state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
1158
1159    #[cfg(msim)]
1160    inject_synthetic_execution_time: bool,
1161}
1162
1163impl TestClusterBuilder {
1164    pub fn new() -> Self {
1165        TestClusterBuilder {
1166            genesis_config: None,
1167            network_config: None,
1168            chain_override: None,
1169            additional_objects: vec![],
1170            fullnode_rpc_port: None,
1171            num_validators: None,
1172            validators: None,
1173            enable_fullnode_events: false,
1174            disable_fullnode_pruning: false,
1175            validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
1176            fullnode_supported_protocol_versions_config: None,
1177            db_checkpoint_config_validators: DBCheckpointConfig::default(),
1178            db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
1179            num_unpruned_validators: None,
1180            jwk_fetch_interval: None,
1181            config_dir: None,
1182            default_jwks: false,
1183            authority_overload_config: None,
1184            execution_cache_config: None,
1185            data_ingestion_dir: None,
1186            fullnode_run_with_range: None,
1187            fullnode_policy_config: None,
1188            fullnode_fw_config: None,
1189            max_submit_position: None,
1190            submit_delay_step_override_millis: None,
1191            validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(
1192                true,
1193            ),
1194            validator_funds_withdraw_scheduler_type_config:
1195                FundsWithdrawSchedulerTypeConfig::PerValidator(Arc::new(|idx| {
1196                    if idx % 2 == 0 {
1197                        FundsWithdrawSchedulerType::Eager
1198                    } else {
1199                        FundsWithdrawSchedulerType::Naive
1200                    }
1201                })),
1202            rpc_config: None,
1203            execution_time_observer_config: None,
1204            state_sync_config: None,
1205            #[cfg(msim)]
1206            inject_synthetic_execution_time: false,
1207        }
1208    }
1209
1210    pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
1211        self.state_sync_config = Some(config);
1212        self
1213    }
1214
1215    pub fn with_execution_time_observer_config(
1216        mut self,
1217        config: sui_config::node::ExecutionTimeObserverConfig,
1218    ) -> Self {
1219        self.execution_time_observer_config = Some(config);
1220        self
1221    }
1222
1223    pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1224        if let Some(run_with_range) = run_with_range {
1225            self.fullnode_run_with_range = Some(run_with_range);
1226        }
1227        self
1228    }
1229
1230    pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1231        self.fullnode_policy_config = config;
1232        self
1233    }
1234
1235    pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1236        self.fullnode_fw_config = config;
1237        self
1238    }
1239
1240    pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1241        self.fullnode_rpc_port = Some(rpc_port);
1242        self
1243    }
1244
1245    pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1246        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1247        self.genesis_config = Some(genesis_config);
1248        self
1249    }
1250
1251    pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1252        assert!(self.genesis_config.is_none() && self.network_config.is_none());
1253        self.network_config = Some(network_config);
1254        self
1255    }
1256
1257    pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1258        self.additional_objects.extend(objects);
1259        self
1260    }
1261
1262    /// Set the number of default validators to spawn. Can be overridden by `with_validators`, if
1263    /// you need to provide more specific genesis configs for each validator.
1264    pub fn with_num_validators(mut self, num: usize) -> Self {
1265        self.num_validators = Some(num);
1266        self
1267    }
1268
1269    /// Provide validator genesis configs, overrides the `num_validators` setting.
1270    pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
1271        self.validators = Some(validators);
1272        self
1273    }
1274
1275    pub fn enable_fullnode_events(mut self) -> Self {
1276        self.enable_fullnode_events = true;
1277        self
1278    }
1279
1280    pub fn disable_fullnode_pruning(mut self) -> Self {
1281        self.disable_fullnode_pruning = true;
1282        self
1283    }
1284
1285    pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1286        self.db_checkpoint_config_validators = DBCheckpointConfig {
1287            perform_db_checkpoints_at_epoch_end: true,
1288            checkpoint_path: None,
1289            object_store_config: None,
1290            perform_index_db_checkpoints_at_epoch_end: None,
1291            prune_and_compact_before_upload: None,
1292        };
1293        self
1294    }
1295
1296    pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1297        self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1298            perform_db_checkpoints_at_epoch_end: true,
1299            checkpoint_path: None,
1300            object_store_config: None,
1301            perform_index_db_checkpoints_at_epoch_end: None,
1302            prune_and_compact_before_upload: Some(true),
1303        };
1304        self
1305    }
1306
1307    pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1308        assert!(
1309            epoch_duration_ms >= 10000,
1310            "Epoch duration must be at least 10s (10000ms) to avoid flaky tests. Got {epoch_duration_ms}ms."
1311        );
1312        self.get_or_init_genesis_config()
1313            .parameters
1314            .epoch_duration_ms = epoch_duration_ms;
1315        self
1316    }
1317
1318    pub fn with_stake_subsidy_start_epoch(mut self, stake_subsidy_start_epoch: u64) -> Self {
1319        self.get_or_init_genesis_config()
1320            .parameters
1321            .stake_subsidy_start_epoch = stake_subsidy_start_epoch;
1322        self
1323    }
1324
1325    pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1326        self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1327        self
1328    }
1329
1330    pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1331        self.jwk_fetch_interval = Some(i);
1332        self
1333    }
1334
1335    pub fn with_fullnode_supported_protocol_versions_config(
1336        mut self,
1337        c: SupportedProtocolVersions,
1338    ) -> Self {
1339        self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1340        self
1341    }
1342
1343    pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1344        self.get_or_init_genesis_config()
1345            .parameters
1346            .protocol_version = v;
1347        self
1348    }
1349
1350    pub fn with_supported_protocol_version_callback(
1351        mut self,
1352        func: SupportedProtocolVersionsCallback,
1353    ) -> Self {
1354        self.validator_supported_protocol_versions_config =
1355            ProtocolVersionsConfig::PerValidator(func);
1356        self
1357    }
1358
1359    pub fn with_global_state_hash_v2_enabled_callback(
1360        mut self,
1361        func: GlobalStateHashV2EnabledCallback,
1362    ) -> Self {
1363        self.validator_global_state_hash_v2_enabled_config =
1364            GlobalStateHashV2EnabledConfig::PerValidator(func);
1365        self
1366    }
1367
1368    pub fn with_validator_candidates(
1369        mut self,
1370        addresses: impl IntoIterator<Item = SuiAddress>,
1371    ) -> Self {
1372        self.get_or_init_genesis_config()
1373            .accounts
1374            .extend(addresses.into_iter().map(|address| AccountConfig {
1375                address: Some(address),
1376                gas_amounts: vec![DEFAULT_GAS_AMOUNT, DEFAULT_GAS_AMOUNT],
1377            }));
1378        self
1379    }
1380
1381    pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1382        self.num_unpruned_validators = Some(n);
1383        self
1384    }
1385
1386    pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1387        self.get_or_init_genesis_config().accounts = accounts;
1388        self
1389    }
1390
1391    pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1392        self.get_or_init_genesis_config().accounts.extend(accounts);
1393        self
1394    }
1395
1396    pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1397        self.config_dir = Some(config_dir);
1398        self
1399    }
1400
1401    pub fn with_default_jwks(mut self) -> Self {
1402        self.default_jwks = true;
1403        self
1404    }
1405
1406    pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1407        assert!(self.network_config.is_none());
1408        self.authority_overload_config = Some(config);
1409        self
1410    }
1411
1412    pub fn with_execution_cache_config(mut self, config: ExecutionCacheConfig) -> Self {
1413        assert!(self.network_config.is_none());
1414        self.execution_cache_config = Some(config);
1415        self
1416    }
1417
1418    pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1419        self.data_ingestion_dir = Some(path);
1420        self
1421    }
1422
1423    pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1424        self.max_submit_position = Some(max_submit_position);
1425        self
1426    }
1427
1428    pub fn with_submit_delay_step_override_millis(
1429        mut self,
1430        submit_delay_step_override_millis: u64,
1431    ) -> Self {
1432        self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1433        self
1434    }
1435
1436    pub fn with_rpc_config(mut self, config: sui_config::RpcConfig) -> Self {
1437        self.rpc_config = Some(config);
1438        self
1439    }
1440
1441    pub fn with_chain_override(mut self, chain: Chain) -> Self {
1442        self.chain_override = Some(chain);
1443        self
1444    }
1445
1446    #[cfg(msim)]
1447    pub fn with_synthetic_execution_time_injection(mut self) -> Self {
1448        self.inject_synthetic_execution_time = true;
1449        self
1450    }
1451
1452    pub async fn build(mut self) -> TestCluster {
1453        // All test clusters receive a continuous stream of random JWKs.
1454        // If we later use zklogin authenticated transactions in tests we will need to supply
1455        // valid JWKs as well.
1456        #[cfg(msim)]
1457        if !self.default_jwks {
1458            sui_node::set_jwk_injector(Arc::new(|_authority, provider| {
1459                use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1460                use rand::Rng;
1461
1462                // generate random (and possibly conflicting) id/key pairings.
1463                let id_num = rand::thread_rng().gen_range(1..=4);
1464                let key_num = rand::thread_rng().gen_range(1..=4);
1465
1466                let id = JwkId {
1467                    iss: provider.get_config().iss,
1468                    kid: format!("kid{}", id_num),
1469                };
1470
1471                let jwk = JWK {
1472                    kty: "kty".to_string(),
1473                    e: "e".to_string(),
1474                    n: format!("n{}", key_num),
1475                    alg: "alg".to_string(),
1476                };
1477
1478                Ok(vec![(id, jwk)])
1479            }));
1480        }
1481
1482        let swarm = self.start_swarm().await.unwrap();
1483        let working_dir = swarm.dir();
1484
1485        let fullnode = swarm.fullnodes().next().unwrap();
1486        let json_rpc_address = fullnode.config().json_rpc_address;
1487        let fullnode_handle =
1488            FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1489
1490        let mut wallet_conf: SuiClientConfig =
1491            PersistedConfig::read(&working_dir.join(SUI_CLIENT_CONFIG)).unwrap();
1492        wallet_conf.envs.push(SuiEnv {
1493            alias: "localnet".to_string(),
1494            rpc: fullnode_handle.rpc_url.clone(),
1495            ws: None,
1496            basic_auth: None,
1497            chain_id: None,
1498        });
1499        wallet_conf.active_env = Some("localnet".to_string());
1500
1501        wallet_conf
1502            .persisted(&working_dir.join(SUI_CLIENT_CONFIG))
1503            .save()
1504            .unwrap();
1505
1506        let wallet_conf = swarm.dir().join(SUI_CLIENT_CONFIG);
1507        let wallet = WalletContext::new(&wallet_conf).unwrap();
1508
1509        TestCluster {
1510            swarm,
1511            wallet,
1512            fullnode_handle,
1513        }
1514    }
1515
1516    /// Start a Swarm and set up WalletConfig
1517    async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1518        let mut builder: SwarmBuilder = Swarm::builder()
1519            .with_objects(self.additional_objects.clone())
1520            .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1521            .with_supported_protocol_versions_config(
1522                self.validator_supported_protocol_versions_config.clone(),
1523            )
1524            .with_global_state_hash_v2_enabled_config(
1525                self.validator_global_state_hash_v2_enabled_config.clone(),
1526            )
1527            .with_funds_withdraw_scheduler_type_config(
1528                self.validator_funds_withdraw_scheduler_type_config.clone(),
1529            )
1530            .with_fullnode_count(1)
1531            .with_fullnode_supported_protocol_versions_config(
1532                self.fullnode_supported_protocol_versions_config
1533                    .clone()
1534                    .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1535            )
1536            .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1537            .with_fullnode_run_with_range(self.fullnode_run_with_range)
1538            .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1539            .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1540
1541        if let Some(validators) = self.validators.take() {
1542            builder = builder.with_validators(validators);
1543        } else {
1544            builder = builder.committee_size(
1545                NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1546            )
1547        };
1548
1549        if let Some(chain) = self.chain_override {
1550            builder = builder.with_chain_override(chain);
1551        }
1552
1553        if let Some(genesis_config) = self.genesis_config.take() {
1554            builder = builder.with_genesis_config(genesis_config);
1555        }
1556
1557        if let Some(network_config) = self.network_config.take() {
1558            builder = builder.with_network_config(network_config);
1559        }
1560
1561        if let Some(authority_overload_config) = self.authority_overload_config.take() {
1562            builder = builder.with_authority_overload_config(authority_overload_config);
1563        }
1564
1565        if let Some(execution_cache_config) = self.execution_cache_config.take() {
1566            builder = builder.with_execution_cache_config(execution_cache_config);
1567        }
1568
1569        if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1570            builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1571        }
1572
1573        if let Some(rpc_config) = &self.rpc_config {
1574            builder = builder.with_fullnode_rpc_config(rpc_config.clone());
1575        }
1576        if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1577            builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1578        }
1579
1580        if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1581            builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1582        }
1583
1584        if let Some(config_dir) = self.config_dir.take() {
1585            builder = builder.dir(config_dir);
1586        }
1587
1588        if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1589            builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1590        }
1591
1592        if let Some(max_submit_position) = self.max_submit_position {
1593            builder = builder.with_max_submit_position(max_submit_position);
1594        }
1595
1596        if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1597            builder =
1598                builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1599        }
1600
1601        if let Some(state_sync_config) = self.state_sync_config.clone() {
1602            builder = builder.with_state_sync_config(state_sync_config);
1603        }
1604
1605        if self.disable_fullnode_pruning {
1606            builder = builder.with_disable_fullnode_pruning();
1607        }
1608
1609        #[cfg(msim)]
1610        {
1611            if let Some(mut config) = self.execution_time_observer_config.clone() {
1612                if self.inject_synthetic_execution_time {
1613                    config.inject_synthetic_execution_time = Some(true);
1614                }
1615                builder = builder.with_execution_time_observer_config(config);
1616            } else if self.inject_synthetic_execution_time {
1617                use sui_config::node::ExecutionTimeObserverConfig;
1618
1619                let mut config = ExecutionTimeObserverConfig::default();
1620                config.inject_synthetic_execution_time = Some(true);
1621                builder = builder.with_execution_time_observer_config(config);
1622            }
1623        }
1624
1625        let mut swarm = builder.build();
1626        swarm.launch().await?;
1627
1628        let dir = swarm.dir();
1629
1630        let network_path = dir.join(SUI_NETWORK_CONFIG);
1631        let wallet_path = dir.join(SUI_CLIENT_CONFIG);
1632        let keystore_path = dir.join(SUI_KEYSTORE_FILENAME);
1633
1634        swarm.config().save(network_path)?;
1635        let mut keystore = Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?);
1636        for key in &swarm.config().account_keys {
1637            keystore
1638                .import(None, SuiKeyPair::Ed25519(key.copy()))
1639                .await?;
1640        }
1641
1642        let active_address = keystore.addresses().first().cloned();
1643
1644        // Create wallet config with stated authorities port
1645        SuiClientConfig {
1646            keystore: Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?),
1647            external_keys: None,
1648            envs: Default::default(),
1649            active_address,
1650            active_env: Default::default(),
1651        }
1652        .save(wallet_path)?;
1653
1654        // Return network handle
1655        Ok(swarm)
1656    }
1657
1658    fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1659        if self.genesis_config.is_none() {
1660            self.genesis_config = Some(GenesisConfig::for_local_testing());
1661        }
1662        self.genesis_config.as_mut().unwrap()
1663    }
1664}
1665
1666impl Default for TestClusterBuilder {
1667    fn default() -> Self {
1668        Self::new()
1669    }
1670}