1use futures::{StreamExt, future::join_all};
5use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
6use mysten_common::fatal;
7use rand::{Rng, distributions::*, rngs::OsRng, seq::SliceRandom};
8use std::net::SocketAddr;
9use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13use sui_config::genesis::Genesis;
14use sui_config::node::FundsWithdrawSchedulerType;
15use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
16use sui_config::{Config, ExecutionCacheConfig, SUI_CLIENT_CONFIG, SUI_NETWORK_CONFIG};
17use sui_config::{NodeConfig, PersistedConfig, SUI_KEYSTORE_FILENAME};
18use sui_core::authority_aggregator::AuthorityAggregator;
19use sui_core::authority_client::NetworkAuthorityClient;
20use sui_json_rpc_types::{SuiTransactionBlockEffectsAPI, TransactionFilter};
21use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
22use sui_node::SuiNodeHandle;
23use sui_protocol_config::{Chain, ProtocolVersion};
24use sui_rpc_api::Client;
25use sui_rpc_api::client::ExecutedTransaction;
26use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
27use sui_sdk::wallet_context::WalletContext;
28use sui_sdk::{SuiClient, SuiClientBuilder};
29use sui_swarm::memory::{Swarm, SwarmBuilder};
30use sui_swarm_config::genesis_config::{
31 AccountConfig, DEFAULT_GAS_AMOUNT, GenesisConfig, ValidatorGenesisConfig,
32};
33use sui_swarm_config::network_config::NetworkConfig;
34use sui_swarm_config::network_config_builder::{
35 FundsWithdrawSchedulerTypeConfig, GlobalStateHashV2EnabledCallback,
36 GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig, SupportedProtocolVersionsCallback,
37};
38use sui_swarm_config::node_config_builder::{FullnodeConfigBuilder, ValidatorConfigBuilder};
39use sui_test_transaction_builder::TestTransactionBuilder;
40use sui_types::authenticator_state::get_authenticator_state;
41use sui_types::base_types::ConciseableName;
42use sui_types::base_types::{AuthorityName, ObjectID, ObjectRef, SuiAddress};
43use sui_types::committee::CommitteeTrait;
44use sui_types::committee::{Committee, EpochId};
45use sui_types::crypto::KeypairTraits;
46use sui_types::crypto::SuiKeyPair;
47use sui_types::digests::{ChainIdentifier, TransactionDigest};
48use sui_types::effects::TransactionEffectsAPI;
49use sui_types::effects::{TransactionEffects, TransactionEvents};
50use sui_types::error::SuiResult;
51use sui_types::messages_grpc::{
52 RawSubmitTxRequest, SubmitTxRequest, SubmitTxResult, SubmitTxType, WaitForEffectsRequest,
53 WaitForEffectsResponse,
54};
55use sui_types::object::Object;
56use sui_types::sui_system_state::SuiSystemState;
57use sui_types::sui_system_state::SuiSystemStateTrait;
58use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
59use sui_types::supported_protocol_versions::SupportedProtocolVersions;
60use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
61use sui_types::transaction::{Transaction, TransactionData, TransactionDataAPI, TransactionKind};
62use tokio::sync::broadcast;
63use tokio::time::{Instant, timeout};
64use tokio::{task::JoinHandle, time::sleep};
65use tonic::IntoRequest;
66use tracing::{error, info};
67
68pub mod addr_balance_test_env;
69
70const NUM_VALIDATOR: usize = 4;
71
72pub struct FullNodeHandle {
73 pub sui_node: SuiNodeHandle,
74 #[deprecated = "use grpc_client"]
75 pub sui_client: SuiClient,
76 #[deprecated = "use grpc_client"]
77 pub rpc_client: HttpClient,
78 pub grpc_client: Client,
79 pub grpc_channel: tonic::transport::Channel,
80 pub rpc_url: String,
81}
82
83impl FullNodeHandle {
84 pub async fn new(sui_node: SuiNodeHandle, json_rpc_address: SocketAddr) -> Self {
85 let rpc_url = format!("http://{}", json_rpc_address);
86 let rpc_client = HttpClientBuilder::default().build(&rpc_url).unwrap();
87
88 let sui_client = SuiClientBuilder::default().build(&rpc_url).await.unwrap();
89 let grpc_client = Client::new(&rpc_url).unwrap();
90
91 let grpc_channel = Self::connect_channel_with_retry(&rpc_url).await;
96
97 grpc_client
101 .get_reference_gas_price()
102 .await
103 .expect("failed to warm up grpc client");
104
105 Self {
106 sui_node,
107 #[allow(deprecated)]
108 sui_client,
109 #[allow(deprecated)]
110 rpc_client,
111 grpc_client,
112 grpc_channel,
113 rpc_url,
114 }
115 }
116
117 pub async fn connect_channel_with_retry(url: &str) -> tonic::transport::Channel {
127 const MAX_RETRIES: u32 = 10;
128 const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
129
130 let endpoint = tonic::transport::Endpoint::from_shared(url.to_string())
131 .expect("invalid grpc url")
132 .connect_timeout(CONNECT_TIMEOUT);
133
134 for attempt in 0..MAX_RETRIES {
135 match tokio::time::timeout(CONNECT_TIMEOUT, endpoint.connect()).await {
136 Ok(Ok(channel)) => return channel,
137 Ok(Err(e)) if attempt + 1 < MAX_RETRIES => {
138 info!(
139 "grpc channel connect attempt {} failed: {e}, retrying",
140 attempt + 1
141 );
142 tokio::time::sleep(Duration::from_millis(100)).await;
143 }
144 Ok(Err(e)) => {
145 panic!("grpc channel connect failed after {MAX_RETRIES} attempts: {e}")
146 }
147 Err(_) if attempt + 1 < MAX_RETRIES => {
148 info!(
149 "grpc channel connect attempt {} timed out, retrying",
150 attempt + 1
151 );
152 tokio::time::sleep(Duration::from_millis(100)).await;
153 }
154 Err(_) => panic!("grpc channel connect timed out after {MAX_RETRIES} attempts"),
155 }
156 }
157 unreachable!()
158 }
159}
160
161pub struct TestCluster {
162 pub swarm: Swarm,
163 pub wallet: WalletContext,
164 pub fullnode_handle: FullNodeHandle,
165}
166
167impl TestCluster {
168 #[deprecated = "use grpc_client()"]
169 pub fn rpc_client(&self) -> &HttpClient {
170 #[allow(deprecated)]
171 &self.fullnode_handle.rpc_client
172 }
173
174 #[deprecated = "use grpc_client()"]
175 pub fn sui_client(&self) -> &SuiClient {
176 #[allow(deprecated)]
177 &self.fullnode_handle.sui_client
178 }
179
180 pub fn grpc_client(&self) -> Client {
181 self.fullnode_handle.grpc_client.clone()
182 }
183
184 pub fn grpc_channel(&self) -> tonic::transport::Channel {
195 self.fullnode_handle.grpc_channel.clone()
196 }
197
198 pub fn rpc_url(&self) -> &str {
199 &self.fullnode_handle.rpc_url
200 }
201
202 pub fn wallet(&mut self) -> &WalletContext {
203 &self.wallet
204 }
205
206 pub fn wallet_mut(&mut self) -> &mut WalletContext {
207 &mut self.wallet
208 }
209
210 pub fn get_addresses(&self) -> Vec<SuiAddress> {
211 self.wallet.get_addresses()
212 }
213
214 pub fn get_address_0(&self) -> SuiAddress {
216 self.get_addresses()[0]
217 }
218
219 pub fn get_address_1(&self) -> SuiAddress {
221 self.get_addresses()[1]
222 }
223
224 pub fn get_address_2(&self) -> SuiAddress {
226 self.get_addresses()[2]
227 }
228
229 pub fn fullnode_config_builder(&self) -> FullnodeConfigBuilder {
230 self.swarm.get_fullnode_config_builder()
231 }
232
233 pub fn committee(&self) -> Arc<Committee> {
234 self.fullnode_handle
235 .sui_node
236 .with(|node| node.state().epoch_store_for_testing().committee().clone())
237 }
238
239 pub fn get_sui_system_state(&self) -> SuiSystemState {
240 self.fullnode_handle.sui_node.with(|node| {
241 node.state()
242 .get_sui_system_state_object_for_testing()
243 .unwrap()
244 })
245 }
246
247 pub async fn spawn_new_fullnode(&mut self) -> FullNodeHandle {
249 self.start_fullnode_from_config(
250 self.fullnode_config_builder()
251 .build(&mut OsRng, self.swarm.config()),
252 )
253 .await
254 }
255
256 pub async fn start_fullnode_from_config(&mut self, config: NodeConfig) -> FullNodeHandle {
257 let json_rpc_address = config.json_rpc_address;
258 let node = self.swarm.spawn_new_node(config).await;
259 FullNodeHandle::new(node, json_rpc_address).await
260 }
261
262 pub fn all_node_handles(&self) -> Vec<SuiNodeHandle> {
263 self.swarm
264 .all_nodes()
265 .flat_map(|n| n.get_node_handle())
266 .collect()
267 }
268
269 pub fn all_validator_handles(&self) -> Vec<SuiNodeHandle> {
270 self.swarm
271 .validator_nodes()
272 .map(|n| n.get_node_handle().unwrap())
273 .collect()
274 }
275
276 pub fn get_validator_pubkeys(&self) -> Vec<AuthorityName> {
277 self.swarm.active_validators().map(|v| v.name()).collect()
278 }
279
280 pub fn get_genesis(&self) -> Genesis {
281 self.swarm.config().genesis.clone()
282 }
283
284 pub fn stop_node(&self, name: &AuthorityName) {
285 self.swarm.node(name).unwrap().stop();
286 }
287
288 pub async fn stop_all_validators(&self) {
289 info!("Stopping all validators in the cluster");
290 self.swarm.active_validators().for_each(|v| v.stop());
291 tokio::time::sleep(Duration::from_secs(3)).await;
292 }
293
294 pub async fn start_all_validators(&self) {
295 info!("Starting all validators in the cluster");
296 for v in self.swarm.validator_nodes() {
297 if v.is_running() {
298 continue;
299 }
300 v.start().await.unwrap();
301 }
302 tokio::time::sleep(Duration::from_secs(3)).await;
303 }
304
305 pub async fn start_node(&self, name: &AuthorityName) {
306 let node = self.swarm.node(name).unwrap();
307 if node.is_running() {
308 return;
309 }
310 node.start().await.unwrap();
311 }
312
313 pub async fn spawn_new_validator(
314 &mut self,
315 genesis_config: ValidatorGenesisConfig,
316 ) -> SuiNodeHandle {
317 let node_config = ValidatorConfigBuilder::new()
318 .build(genesis_config, self.swarm.config().genesis.clone());
319 self.swarm.spawn_new_node(node_config).await
320 }
321
322 pub fn random_node_restarter(self: &Arc<Self>) -> RandomNodeRestarter {
323 RandomNodeRestarter::new(self.clone())
324 }
325
326 pub async fn get_reference_gas_price(&self) -> u64 {
327 self.grpc_client()
328 .get_reference_gas_price()
329 .await
330 .expect("failed to get reference gas price")
331 }
332
333 pub fn get_chain_identifier(&self) -> ChainIdentifier {
334 ChainIdentifier::from(*self.swarm.config().genesis.checkpoint().digest())
335 }
336
337 pub async fn get_object_from_fullnode_store(&self, object_id: &ObjectID) -> Option<Object> {
338 self.fullnode_handle
339 .sui_node
340 .with_async(|node| async { node.state().get_object(object_id).await })
341 .await
342 }
343
344 pub async fn get_latest_object_ref(&self, object_id: &ObjectID) -> ObjectRef {
345 self.get_object_from_fullnode_store(object_id)
346 .await
347 .unwrap()
348 .compute_object_reference()
349 }
350
351 pub async fn get_object_or_tombstone_from_fullnode_store(
352 &self,
353 object_id: ObjectID,
354 ) -> ObjectRef {
355 self.fullnode_handle
356 .sui_node
357 .state()
358 .get_object_cache_reader()
359 .get_latest_object_ref_or_tombstone(object_id)
360 .unwrap()
361 }
362
363 pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
364 self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
365 .await
366 }
367
368 pub async fn wait_for_run_with_range_shutdown_signal_with_timeout(
369 &self,
370 timeout_dur: Duration,
371 ) -> Option<RunWithRange> {
372 let mut shutdown_channel_rx = self
373 .fullnode_handle
374 .sui_node
375 .with(|node| node.subscribe_to_shutdown_channel());
376
377 timeout(timeout_dur, async move {
378 tokio::select! {
379 msg = shutdown_channel_rx.recv() =>
380 {
381 match msg {
382 Ok(Some(run_with_range)) => Some(run_with_range),
383 Ok(None) => None,
384 Err(e) => {
385 error!("failed recv from sui-node shutdown channel: {}", e);
386 None
387 },
388 }
389 },
390 }
391 })
392 .await
393 .expect("Timed out waiting for cluster to hit target epoch and recv shutdown signal from sui-node")
394 }
395
396 pub async fn wait_for_protocol_version(
397 &self,
398 target_protocol_version: ProtocolVersion,
399 ) -> SuiSystemState {
400 self.wait_for_protocol_version_with_timeout(
401 target_protocol_version,
402 Duration::from_secs(60),
403 )
404 .await
405 }
406
407 pub async fn wait_for_protocol_version_with_timeout(
408 &self,
409 target_protocol_version: ProtocolVersion,
410 timeout_dur: Duration,
411 ) -> SuiSystemState {
412 timeout(timeout_dur, async move {
413 loop {
414 let system_state = self.wait_for_epoch(None).await;
415 if system_state.protocol_version() >= target_protocol_version.as_u64() {
416 return system_state;
417 }
418 }
419 })
420 .await
421 .expect("Timed out waiting for cluster to target protocol version")
422 }
423
424 pub async fn trigger_reconfiguration(&self) {
427 info!("Starting reconfiguration");
428 let start = Instant::now();
429
430 let cur_committee = self
432 .fullnode_handle
433 .sui_node
434 .with(|node| node.state().clone_committee_for_testing());
435 let mut cur_stake = 0;
436 for node in self.swarm.active_validators() {
437 node.get_node_handle()
438 .unwrap()
439 .with_async(|node| async {
440 node.close_epoch_for_testing().await.unwrap_or_else(|_| {
441 fatal!(
442 "Failed to close epoch for validator {:?}",
443 node.state().name
444 );
445 });
446 cur_stake += cur_committee.weight(&node.state().name);
447 })
448 .await;
449 if cur_stake >= cur_committee.quorum_threshold() {
450 break;
451 }
452 }
453 info!("close_epoch complete after {:?}", start.elapsed());
454
455 self.wait_for_epoch(Some(cur_committee.epoch + 1)).await;
456 self.wait_for_epoch_all_nodes(cur_committee.epoch + 1).await;
457
458 info!("reconfiguration complete after {:?}", start.elapsed());
459 }
460
461 pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
468 self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
469 .await
470 }
471
472 pub async fn wait_for_epoch_on_node(
473 &self,
474 handle: &SuiNodeHandle,
475 target_epoch: Option<EpochId>,
476 timeout_dur: Duration,
477 ) -> SuiSystemState {
478 let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());
479
480 let mut state = None;
481 timeout(timeout_dur, async {
482 let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
483 if Some(epoch) == target_epoch {
484 return handle.with(|node| node.state().get_sui_system_state_object_for_testing().unwrap());
485 }
486 while let Ok(system_state) = epoch_rx.recv().await {
487 info!("received epoch {}", system_state.epoch());
488 state = Some(system_state.clone());
489 match target_epoch {
490 Some(target_epoch) if system_state.epoch() >= target_epoch => {
491 return system_state;
492 }
493 None => {
494 return system_state;
495 }
496 _ => (),
497 }
498 }
499 unreachable!("Broken reconfig channel");
500 })
501 .await
502 .unwrap_or_else(|_| {
503 error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
504 if let Some(state) = state {
505 panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
506 }
507 panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
508 })
509 }
510
511 pub async fn wait_for_epoch_with_timeout(
512 &self,
513 target_epoch: Option<EpochId>,
514 timeout_dur: Duration,
515 ) -> SuiSystemState {
516 self.wait_for_epoch_on_node(&self.fullnode_handle.sui_node, target_epoch, timeout_dur)
517 .await
518 }
519
520 pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
521 let handles: Vec<_> = self
522 .swarm
523 .all_nodes()
524 .map(|node| node.get_node_handle().unwrap())
525 .collect();
526 let tasks: Vec<_> = handles
527 .iter()
528 .map(|handle| {
529 handle.with_async(|node| async {
530 let mut retries = 0;
531 loop {
532 let epoch = node.state().epoch_store_for_testing().epoch();
533 if epoch == target_epoch {
534 if let Some(agg) = node.clone_authority_aggregator() {
535 if agg.committee.epoch() == target_epoch {
537 break;
538 }
539 } else {
540 break;
542 }
543 }
544 tokio::time::sleep(Duration::from_secs(1)).await;
545 retries += 1;
546 if retries % 5 == 0 {
547 tracing::warn!(validator=?node.state().name.concise(), "Waiting for {:?} seconds to reach epoch {:?}. Currently at epoch {:?}", retries, target_epoch, epoch);
548 }
549 }
550 })
551 })
552 .collect();
553
554 timeout(Duration::from_secs(40), join_all(tasks))
555 .await
556 .expect("timed out waiting for reconfiguration to complete");
557 }
558
559 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<SuiSystemState> {
560 self.fullnode_handle
562 .sui_node
563 .with(|node| node.subscribe_to_epoch_change())
564 }
565
566 pub async fn update_validator_supported_versions(
571 &self,
572 new_supported_versions: SupportedProtocolVersions,
573 ) {
574 for authority in self.get_validator_pubkeys() {
575 self.stop_node(&authority);
576 tokio::time::sleep(Duration::from_millis(1000)).await;
577 self.swarm
578 .node(&authority)
579 .unwrap()
580 .config()
581 .supported_protocol_versions = Some(new_supported_versions);
582 self.start_node(&authority).await;
583 info!("Restarted validator {}", authority);
584 }
585 }
586
587 pub async fn wait_for_all_nodes_upgrade_to(&self, protocol_version: u64) {
589 for h in self.all_node_handles() {
590 h.with_async(|node| async {
591 while node
592 .state()
593 .epoch_store_for_testing()
594 .epoch_start_state()
595 .protocol_version()
596 .as_u64()
597 != protocol_version
598 {
599 tokio::time::sleep(Duration::from_secs(1)).await;
600 }
601 })
602 .await;
603 }
604 }
605
606 pub async fn wait_for_authenticator_state_update(&self) {
607 timeout(
608 Duration::from_secs(60),
609 self.fullnode_handle.sui_node.with_async(|node| async move {
610 let state = node.state();
611 let mut txns = state.subscription_handler.subscribe_transactions(
612 TransactionFilter::ChangedObject(ObjectID::from_hex_literal("0x7").unwrap()),
613 );
614
615 let has_active_jwks = get_authenticator_state(state.get_object_store())
619 .ok()
620 .flatten()
621 .is_some_and(|state| !state.active_jwks.is_empty());
622 if has_active_jwks {
623 return;
624 }
625
626 while let Some(tx) = txns.next().await {
627 let digest = *tx.transaction_digest();
628 let tx = state
629 .get_transaction_cache_reader()
630 .get_transaction_block(&digest)
631 .unwrap();
632 match &tx.data().intent_message().value.kind() {
633 TransactionKind::EndOfEpochTransaction(_) => (),
634 TransactionKind::AuthenticatorStateUpdate(_) => break,
635 _ => panic!("{:?}", tx),
636 }
637 }
638 }),
639 )
640 .await
641 .expect("Timed out waiting for authenticator state update");
642 }
643
644 pub fn highest_protocol_version(&self) -> ProtocolVersion {
646 self.all_node_handles()
647 .into_iter()
648 .map(|h| {
649 h.with(|node| {
650 node.state()
651 .epoch_store_for_testing()
652 .epoch_start_state()
653 .protocol_version()
654 })
655 })
656 .max()
657 .expect("at least one node must be up to get highest protocol version")
658 }
659
660 pub async fn test_transaction_builder(&self) -> TestTransactionBuilder {
661 let (sender, gas) = self.wallet.get_one_gas_object().await.unwrap().unwrap();
662 self.test_transaction_builder_with_gas_object(sender, gas)
663 .await
664 }
665
666 pub async fn test_transaction_builder_with_sender(
667 &self,
668 sender: SuiAddress,
669 ) -> TestTransactionBuilder {
670 let gas = self
671 .wallet
672 .get_one_gas_object_owned_by_address(sender)
673 .await
674 .unwrap()
675 .unwrap();
676 self.test_transaction_builder_with_gas_object(sender, gas)
677 .await
678 }
679
680 pub async fn test_transaction_builder_with_gas_object(
681 &self,
682 sender: SuiAddress,
683 gas: ObjectRef,
684 ) -> TestTransactionBuilder {
685 let rgp = self.get_reference_gas_price().await;
686 TestTransactionBuilder::new(sender, gas, rgp)
687 }
688
689 pub async fn sign_transaction(&self, tx_data: &TransactionData) -> Transaction {
690 self.wallet.sign_transaction(tx_data).await
691 }
692
693 pub async fn sign_and_execute_transaction(
694 &self,
695 tx_data: &TransactionData,
696 ) -> ExecutedTransaction {
697 let tx = self.wallet.sign_transaction(tx_data).await;
698 self.execute_transaction(tx).await
699 }
700
701 pub async fn sign_and_execute_transaction_directly(
703 &self,
704 tx_data: &TransactionData,
705 ) -> SuiResult<(TransactionDigest, TransactionEffects)> {
706 let mut res = self
707 .sign_and_execute_txns_in_soft_bundle(std::slice::from_ref(tx_data))
708 .await?;
709 assert_eq!(res.len(), 1);
710 Ok(res.pop().unwrap())
711 }
712
713 pub async fn execute_transaction_directly(
715 &self,
716 tx: &Transaction,
717 ) -> SuiResult<(TransactionDigest, TransactionEffects)> {
718 let mut res = self
719 .execute_signed_txns_in_soft_bundle(std::slice::from_ref(tx))
720 .await?;
721 assert_eq!(res.len(), 1);
722 Ok(res.pop().unwrap())
723 }
724
725 pub async fn sign_and_execute_txns_in_soft_bundle(
734 &self,
735 txns: &[TransactionData],
736 ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
737 let signed_txs: Vec<Transaction> =
739 futures::future::join_all(txns.iter().map(|tx| self.wallet.sign_transaction(tx))).await;
740
741 self.execute_signed_txns_in_soft_bundle(&signed_txs).await
742 }
743
744 pub async fn execute_signed_txns_in_soft_bundle(
745 &self,
746 signed_txs: &[Transaction],
747 ) -> SuiResult<Vec<(TransactionDigest, TransactionEffects)>> {
748 let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
749
750 let request = RawSubmitTxRequest {
751 transactions: signed_txs
752 .iter()
753 .map(|tx| bcs::to_bytes(tx).unwrap().into())
754 .collect(),
755 submit_type: SubmitTxType::SoftBundle.into(),
756 };
757
758 let agg = self.authority_aggregator();
759 let clients = &agg.authority_clients;
760 let index = rand::thread_rng().gen_range(0..clients.len());
762 let mut validator_client = clients
763 .iter()
764 .nth(index)
765 .unwrap()
766 .1
767 .authority_client()
768 .get_client_for_testing()
769 .unwrap();
770
771 let result = validator_client
772 .submit_transaction(request.into_request())
773 .await
774 .map(tonic::Response::into_inner)?;
775 assert_eq!(result.results.len(), signed_txs.len());
776
777 for raw_result in result.results.iter() {
778 let submit_result: sui_types::messages_grpc::SubmitTxResult =
779 raw_result.clone().try_into()?;
780 if let sui_types::messages_grpc::SubmitTxResult::Rejected { error } = submit_result {
781 return Err(error);
782 }
783 }
784
785 let effects = self
786 .fullnode_handle
787 .sui_node
788 .with_async(|node| {
789 let digests = digests.clone();
790 async move {
791 let state = node.state();
792 let transaction_cache_reader = state.get_transaction_cache_reader();
793 transaction_cache_reader
794 .notify_read_executed_effects(
795 "sign_and_execute_txns_in_soft_bundle",
796 &digests,
797 )
798 .await
799 }
800 })
801 .await;
802
803 Ok(digests.into_iter().zip(effects.into_iter()).collect())
804 }
805
806 pub async fn execute_soft_bundle_with_conflicts(
812 &self,
813 signed_txs: &[Transaction],
814 ) -> SuiResult<Vec<(TransactionDigest, WaitForEffectsResponse)>> {
815 let digests: Vec<_> = signed_txs.iter().map(|tx| *tx.digest()).collect();
816
817 let request = RawSubmitTxRequest {
818 transactions: signed_txs
819 .iter()
820 .map(|tx| bcs::to_bytes(tx).unwrap().into())
821 .collect(),
822 submit_type: SubmitTxType::SoftBundle.into(),
823 };
824
825 let authority_aggregator = self.authority_aggregator();
826 let (_, safe_client) = authority_aggregator
827 .authority_clients
828 .iter()
829 .next()
830 .unwrap();
831 let mut validator_client = safe_client
832 .authority_client()
833 .get_client_for_testing()
834 .unwrap();
835
836 let result = validator_client
837 .submit_transaction(request.into_request())
838 .await
839 .map(tonic::Response::into_inner)?;
840 assert_eq!(result.results.len(), signed_txs.len());
841
842 let mut consensus_positions = Vec::new();
844 for (i, raw_result) in result.results.iter().enumerate() {
845 let submit_result: SubmitTxResult = raw_result.clone().try_into()?;
846 match submit_result {
847 SubmitTxResult::Submitted { consensus_position } => {
848 consensus_positions.push(consensus_position);
849 }
850 SubmitTxResult::Executed { .. } => {
851 panic!(
852 "Transaction {} was already executed during submission",
853 i + 1
854 );
855 }
856 SubmitTxResult::Rejected { error } => {
857 return Err(error);
858 }
859 }
860 }
861
862 let wait_futures: Vec<_> = digests
864 .iter()
865 .zip(consensus_positions.iter())
866 .map(|(digest, position)| {
867 let request = WaitForEffectsRequest {
868 transaction_digest: Some(*digest),
869 consensus_position: Some(*position),
870 include_details: false,
871 ping_type: None,
872 };
873 safe_client.wait_for_effects(request, None)
874 })
875 .collect();
876
877 let responses = futures::future::join_all(wait_futures).await;
878
879 let results: SuiResult<Vec<_>> = digests
880 .into_iter()
881 .zip(responses.into_iter())
882 .map(|(digest, response)| Ok((digest, response?)))
883 .collect();
884
885 results
886 }
887
888 pub async fn wait_for_tx_settlement(&self, digests: &[TransactionDigest]) {
889 self.fullnode_handle
890 .sui_node
891 .with_async(|node| async move {
892 let state = node.state();
893 let checkpoint_seqs = state
895 .epoch_store_for_testing()
896 .transactions_executed_in_checkpoint_notify(digests.to_vec())
897 .await
898 .unwrap();
899
900 let max_checkpoint_seq = checkpoint_seqs.into_iter().max().unwrap();
902 state
903 .checkpoint_store
904 .notify_read_executed_checkpoint(max_checkpoint_seq)
905 .await;
906 })
907 .await;
908 }
909
910 pub async fn execute_transaction(&self, tx: Transaction) -> ExecutedTransaction {
915 self.wallet.execute_transaction_must_succeed(tx).await
916 }
917
918 pub async fn execute_transaction_return_raw_effects(
927 &self,
928 tx: Transaction,
929 ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
930 let results = self.submit_and_execute(tx.clone(), None).await?;
931 self.wallet.execute_transaction_may_fail(tx).await.unwrap();
932 Ok(results)
933 }
934
935 pub fn authority_aggregator(&self) -> Arc<AuthorityAggregator<NetworkAuthorityClient>> {
936 self.fullnode_handle
937 .sui_node
938 .with(|node| node.clone_authority_aggregator().unwrap())
939 }
940
941 pub async fn submit_and_execute(
945 &self,
946 tx: Transaction,
947 client_addr: Option<SocketAddr>,
948 ) -> anyhow::Result<(TransactionEffects, TransactionEvents)> {
949 let agg = self.authority_aggregator();
950 let clients = &agg.authority_clients;
952 let index = rand::thread_rng().gen_range(0..clients.len());
953 let (_, client) = clients
954 .iter()
955 .nth(index)
956 .ok_or_else(|| anyhow::anyhow!("No authority clients available"))?;
957
958 let submit_request = SubmitTxRequest::new_transaction(tx.clone());
960 let submit_response = client
961 .submit_transaction(submit_request, client_addr)
962 .await?;
963
964 for result in submit_response.results {
966 match result {
967 SubmitTxResult::Executed { details, .. } => {
968 if let Some(data) = details {
969 let events = data.events.unwrap_or_default();
970 return Ok((data.effects, events));
971 }
972 }
973 SubmitTxResult::Rejected { error } => {
974 return Err(error.into());
975 }
976 SubmitTxResult::Submitted { .. } => {
977 }
979 }
980 }
981
982 let wait_request = WaitForEffectsRequest {
984 transaction_digest: Some(*tx.digest()),
985 consensus_position: None,
986 include_details: true,
987 ping_type: None,
988 };
989
990 let response = client.wait_for_effects(wait_request, client_addr).await?;
991 match response {
992 WaitForEffectsResponse::Executed { details, .. } => {
993 let data = details.ok_or_else(|| anyhow::anyhow!("Expected execution details"))?;
994 let events = data.events.unwrap_or_default();
995 Ok((data.effects, events))
996 }
997 WaitForEffectsResponse::Rejected { error } => Err(error
998 .ok_or_else(|| anyhow::anyhow!("Transaction was rejected"))?
999 .into()),
1000 WaitForEffectsResponse::Expired { .. } => Err(anyhow::anyhow!("Transaction expired")),
1001 }
1002 }
1003
1004 pub async fn fund_address_and_return_gas(
1008 &self,
1009 rgp: u64,
1010 amount: Option<u64>,
1011 funding_address: SuiAddress,
1012 ) -> ObjectRef {
1013 let context = &self.wallet;
1014 let (sender, gas) = context.get_one_gas_object().await.unwrap().unwrap();
1015 let tx = context
1016 .sign_transaction(
1017 &TestTransactionBuilder::new(sender, gas, rgp)
1018 .transfer_sui(amount, funding_address)
1019 .build(),
1020 )
1021 .await;
1022 context.execute_transaction_must_succeed(tx).await;
1023
1024 context
1025 .get_one_gas_object_owned_by_address(funding_address)
1026 .await
1027 .unwrap()
1028 .unwrap()
1029 }
1030
1031 pub async fn transfer_sui_must_exceed(
1032 &self,
1033 sender: SuiAddress,
1034 receiver: SuiAddress,
1035 amount: u64,
1036 ) -> ObjectID {
1037 let tx = self
1038 .test_transaction_builder_with_sender(sender)
1039 .await
1040 .transfer_sui(Some(amount), receiver)
1041 .build();
1042 let effects = self.sign_and_execute_transaction(&tx).await.effects;
1043 assert!(effects.status().is_ok());
1044 effects.created().first().unwrap().0.0
1045 }
1046
1047 #[cfg(msim)]
1048 pub fn set_safe_mode_expected(&self, value: bool) {
1049 for n in self.all_node_handles() {
1050 n.with(|node| node.set_safe_mode_expected(value));
1051 }
1052 }
1053}
1054
1055pub struct RandomNodeRestarter {
1056 test_cluster: Arc<TestCluster>,
1057
1058 kill_interval: Uniform<Duration>,
1060 restart_delay: Uniform<Duration>,
1062
1063 task_handle: Mutex<Option<JoinHandle<()>>>,
1064}
1065
1066impl RandomNodeRestarter {
1067 fn new(test_cluster: Arc<TestCluster>) -> Self {
1068 Self {
1069 test_cluster,
1070 kill_interval: Uniform::new(Duration::from_secs(10), Duration::from_secs(11)),
1071 restart_delay: Uniform::new(Duration::from_secs(1), Duration::from_secs(2)),
1072 task_handle: Default::default(),
1073 }
1074 }
1075
1076 pub fn with_kill_interval_secs(mut self, a: u64, b: u64) -> Self {
1077 self.kill_interval = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1078 self
1079 }
1080
1081 pub fn with_restart_delay_secs(mut self, a: u64, b: u64) -> Self {
1082 self.restart_delay = Uniform::new(Duration::from_secs(a), Duration::from_secs(b));
1083 self
1084 }
1085
1086 pub fn run(&self) {
1087 let test_cluster = self.test_cluster.clone();
1088 let kill_interval = self.kill_interval;
1089 let restart_delay = self.restart_delay;
1090 let validators = self.test_cluster.get_validator_pubkeys();
1091 let mut task_handle = self.task_handle.lock().unwrap();
1092 assert!(task_handle.is_none());
1093 task_handle.replace(tokio::task::spawn(async move {
1094 loop {
1095 let delay = kill_interval.sample(&mut OsRng);
1096 info!("Sleeping {delay:?} before killing a validator");
1097 sleep(delay).await;
1098
1099 let validator = validators.choose(&mut OsRng).unwrap();
1100 info!("Killing validator {:?}", validator.concise());
1101 test_cluster.stop_node(validator);
1102
1103 let delay = restart_delay.sample(&mut OsRng);
1104 info!("Sleeping {delay:?} before restarting");
1105 sleep(delay).await;
1106 info!("Starting validator {:?}", validator.concise());
1107 test_cluster.start_node(validator).await;
1108 }
1109 }));
1110 }
1111}
1112
1113impl Drop for RandomNodeRestarter {
1114 fn drop(&mut self) {
1115 if let Some(handle) = self.task_handle.lock().unwrap().take() {
1116 handle.abort();
1117 }
1118 }
1119}
1120
1121pub struct TestClusterBuilder {
1122 genesis_config: Option<GenesisConfig>,
1123 network_config: Option<NetworkConfig>,
1124 additional_objects: Vec<Object>,
1125 num_validators: Option<usize>,
1126 validators: Option<Vec<ValidatorGenesisConfig>>,
1127 fullnode_rpc_port: Option<u16>,
1128 enable_fullnode_events: bool,
1129 disable_fullnode_pruning: bool,
1130 validator_supported_protocol_versions_config: ProtocolVersionsConfig,
1131 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
1133 db_checkpoint_config_validators: DBCheckpointConfig,
1134 db_checkpoint_config_fullnodes: DBCheckpointConfig,
1135 num_unpruned_validators: Option<usize>,
1136 jwk_fetch_interval: Option<Duration>,
1137 config_dir: Option<PathBuf>,
1138 default_jwks: bool,
1139 authority_overload_config: Option<AuthorityOverloadConfig>,
1140 execution_cache_config: Option<ExecutionCacheConfig>,
1141 data_ingestion_dir: Option<PathBuf>,
1142 fullnode_run_with_range: Option<RunWithRange>,
1143 fullnode_policy_config: Option<PolicyConfig>,
1144 fullnode_fw_config: Option<RemoteFirewallConfig>,
1145
1146 max_submit_position: Option<usize>,
1147 submit_delay_step_override_millis: Option<u64>,
1148 validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
1149 validator_funds_withdraw_scheduler_type_config: FundsWithdrawSchedulerTypeConfig,
1150
1151 rpc_config: Option<sui_config::RpcConfig>,
1152
1153 chain_override: Option<Chain>,
1154
1155 execution_time_observer_config: Option<sui_config::node::ExecutionTimeObserverConfig>,
1156
1157 state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
1158
1159 #[cfg(msim)]
1160 inject_synthetic_execution_time: bool,
1161}
1162
1163impl TestClusterBuilder {
1164 pub fn new() -> Self {
1165 TestClusterBuilder {
1166 genesis_config: None,
1167 network_config: None,
1168 chain_override: None,
1169 additional_objects: vec![],
1170 fullnode_rpc_port: None,
1171 num_validators: None,
1172 validators: None,
1173 enable_fullnode_events: false,
1174 disable_fullnode_pruning: false,
1175 validator_supported_protocol_versions_config: ProtocolVersionsConfig::Default,
1176 fullnode_supported_protocol_versions_config: None,
1177 db_checkpoint_config_validators: DBCheckpointConfig::default(),
1178 db_checkpoint_config_fullnodes: DBCheckpointConfig::default(),
1179 num_unpruned_validators: None,
1180 jwk_fetch_interval: None,
1181 config_dir: None,
1182 default_jwks: false,
1183 authority_overload_config: None,
1184 execution_cache_config: None,
1185 data_ingestion_dir: None,
1186 fullnode_run_with_range: None,
1187 fullnode_policy_config: None,
1188 fullnode_fw_config: None,
1189 max_submit_position: None,
1190 submit_delay_step_override_millis: None,
1191 validator_global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(
1192 true,
1193 ),
1194 validator_funds_withdraw_scheduler_type_config:
1195 FundsWithdrawSchedulerTypeConfig::PerValidator(Arc::new(|idx| {
1196 if idx % 2 == 0 {
1197 FundsWithdrawSchedulerType::Eager
1198 } else {
1199 FundsWithdrawSchedulerType::Naive
1200 }
1201 })),
1202 rpc_config: None,
1203 execution_time_observer_config: None,
1204 state_sync_config: None,
1205 #[cfg(msim)]
1206 inject_synthetic_execution_time: false,
1207 }
1208 }
1209
1210 pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
1211 self.state_sync_config = Some(config);
1212 self
1213 }
1214
1215 pub fn with_execution_time_observer_config(
1216 mut self,
1217 config: sui_config::node::ExecutionTimeObserverConfig,
1218 ) -> Self {
1219 self.execution_time_observer_config = Some(config);
1220 self
1221 }
1222
1223 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
1224 if let Some(run_with_range) = run_with_range {
1225 self.fullnode_run_with_range = Some(run_with_range);
1226 }
1227 self
1228 }
1229
1230 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
1231 self.fullnode_policy_config = config;
1232 self
1233 }
1234
1235 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
1236 self.fullnode_fw_config = config;
1237 self
1238 }
1239
1240 pub fn with_fullnode_rpc_port(mut self, rpc_port: u16) -> Self {
1241 self.fullnode_rpc_port = Some(rpc_port);
1242 self
1243 }
1244
1245 pub fn set_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
1246 assert!(self.genesis_config.is_none() && self.network_config.is_none());
1247 self.genesis_config = Some(genesis_config);
1248 self
1249 }
1250
1251 pub fn set_network_config(mut self, network_config: NetworkConfig) -> Self {
1252 assert!(self.genesis_config.is_none() && self.network_config.is_none());
1253 self.network_config = Some(network_config);
1254 self
1255 }
1256
1257 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
1258 self.additional_objects.extend(objects);
1259 self
1260 }
1261
1262 pub fn with_num_validators(mut self, num: usize) -> Self {
1265 self.num_validators = Some(num);
1266 self
1267 }
1268
1269 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
1271 self.validators = Some(validators);
1272 self
1273 }
1274
1275 pub fn enable_fullnode_events(mut self) -> Self {
1276 self.enable_fullnode_events = true;
1277 self
1278 }
1279
1280 pub fn disable_fullnode_pruning(mut self) -> Self {
1281 self.disable_fullnode_pruning = true;
1282 self
1283 }
1284
1285 pub fn with_enable_db_checkpoints_validators(mut self) -> Self {
1286 self.db_checkpoint_config_validators = DBCheckpointConfig {
1287 perform_db_checkpoints_at_epoch_end: true,
1288 checkpoint_path: None,
1289 object_store_config: None,
1290 perform_index_db_checkpoints_at_epoch_end: None,
1291 prune_and_compact_before_upload: None,
1292 };
1293 self
1294 }
1295
1296 pub fn with_enable_db_checkpoints_fullnodes(mut self) -> Self {
1297 self.db_checkpoint_config_fullnodes = DBCheckpointConfig {
1298 perform_db_checkpoints_at_epoch_end: true,
1299 checkpoint_path: None,
1300 object_store_config: None,
1301 perform_index_db_checkpoints_at_epoch_end: None,
1302 prune_and_compact_before_upload: Some(true),
1303 };
1304 self
1305 }
1306
1307 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
1308 assert!(
1309 epoch_duration_ms >= 10000,
1310 "Epoch duration must be at least 10s (10000ms) to avoid flaky tests. Got {epoch_duration_ms}ms."
1311 );
1312 self.get_or_init_genesis_config()
1313 .parameters
1314 .epoch_duration_ms = epoch_duration_ms;
1315 self
1316 }
1317
1318 pub fn with_stake_subsidy_start_epoch(mut self, stake_subsidy_start_epoch: u64) -> Self {
1319 self.get_or_init_genesis_config()
1320 .parameters
1321 .stake_subsidy_start_epoch = stake_subsidy_start_epoch;
1322 self
1323 }
1324
1325 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
1326 self.validator_supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
1327 self
1328 }
1329
1330 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
1331 self.jwk_fetch_interval = Some(i);
1332 self
1333 }
1334
1335 pub fn with_fullnode_supported_protocol_versions_config(
1336 mut self,
1337 c: SupportedProtocolVersions,
1338 ) -> Self {
1339 self.fullnode_supported_protocol_versions_config = Some(ProtocolVersionsConfig::Global(c));
1340 self
1341 }
1342
1343 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
1344 self.get_or_init_genesis_config()
1345 .parameters
1346 .protocol_version = v;
1347 self
1348 }
1349
1350 pub fn with_supported_protocol_version_callback(
1351 mut self,
1352 func: SupportedProtocolVersionsCallback,
1353 ) -> Self {
1354 self.validator_supported_protocol_versions_config =
1355 ProtocolVersionsConfig::PerValidator(func);
1356 self
1357 }
1358
1359 pub fn with_global_state_hash_v2_enabled_callback(
1360 mut self,
1361 func: GlobalStateHashV2EnabledCallback,
1362 ) -> Self {
1363 self.validator_global_state_hash_v2_enabled_config =
1364 GlobalStateHashV2EnabledConfig::PerValidator(func);
1365 self
1366 }
1367
1368 pub fn with_validator_candidates(
1369 mut self,
1370 addresses: impl IntoIterator<Item = SuiAddress>,
1371 ) -> Self {
1372 self.get_or_init_genesis_config()
1373 .accounts
1374 .extend(addresses.into_iter().map(|address| AccountConfig {
1375 address: Some(address),
1376 gas_amounts: vec![DEFAULT_GAS_AMOUNT, DEFAULT_GAS_AMOUNT],
1377 }));
1378 self
1379 }
1380
1381 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
1382 self.num_unpruned_validators = Some(n);
1383 self
1384 }
1385
1386 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1387 self.get_or_init_genesis_config().accounts = accounts;
1388 self
1389 }
1390
1391 pub fn with_additional_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
1392 self.get_or_init_genesis_config().accounts.extend(accounts);
1393 self
1394 }
1395
1396 pub fn with_config_dir(mut self, config_dir: PathBuf) -> Self {
1397 self.config_dir = Some(config_dir);
1398 self
1399 }
1400
1401 pub fn with_default_jwks(mut self) -> Self {
1402 self.default_jwks = true;
1403 self
1404 }
1405
1406 pub fn with_authority_overload_config(mut self, config: AuthorityOverloadConfig) -> Self {
1407 assert!(self.network_config.is_none());
1408 self.authority_overload_config = Some(config);
1409 self
1410 }
1411
1412 pub fn with_execution_cache_config(mut self, config: ExecutionCacheConfig) -> Self {
1413 assert!(self.network_config.is_none());
1414 self.execution_cache_config = Some(config);
1415 self
1416 }
1417
1418 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
1419 self.data_ingestion_dir = Some(path);
1420 self
1421 }
1422
1423 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
1424 self.max_submit_position = Some(max_submit_position);
1425 self
1426 }
1427
1428 pub fn with_submit_delay_step_override_millis(
1429 mut self,
1430 submit_delay_step_override_millis: u64,
1431 ) -> Self {
1432 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
1433 self
1434 }
1435
1436 pub fn with_rpc_config(mut self, config: sui_config::RpcConfig) -> Self {
1437 self.rpc_config = Some(config);
1438 self
1439 }
1440
1441 pub fn with_chain_override(mut self, chain: Chain) -> Self {
1442 self.chain_override = Some(chain);
1443 self
1444 }
1445
1446 #[cfg(msim)]
1447 pub fn with_synthetic_execution_time_injection(mut self) -> Self {
1448 self.inject_synthetic_execution_time = true;
1449 self
1450 }
1451
1452 pub async fn build(mut self) -> TestCluster {
1453 #[cfg(msim)]
1457 if !self.default_jwks {
1458 sui_node::set_jwk_injector(Arc::new(|_authority, provider| {
1459 use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId};
1460 use rand::Rng;
1461
1462 let id_num = rand::thread_rng().gen_range(1..=4);
1464 let key_num = rand::thread_rng().gen_range(1..=4);
1465
1466 let id = JwkId {
1467 iss: provider.get_config().iss,
1468 kid: format!("kid{}", id_num),
1469 };
1470
1471 let jwk = JWK {
1472 kty: "kty".to_string(),
1473 e: "e".to_string(),
1474 n: format!("n{}", key_num),
1475 alg: "alg".to_string(),
1476 };
1477
1478 Ok(vec![(id, jwk)])
1479 }));
1480 }
1481
1482 let swarm = self.start_swarm().await.unwrap();
1483 let working_dir = swarm.dir();
1484
1485 let fullnode = swarm.fullnodes().next().unwrap();
1486 let json_rpc_address = fullnode.config().json_rpc_address;
1487 let fullnode_handle =
1488 FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await;
1489
1490 let mut wallet_conf: SuiClientConfig =
1491 PersistedConfig::read(&working_dir.join(SUI_CLIENT_CONFIG)).unwrap();
1492 wallet_conf.envs.push(SuiEnv {
1493 alias: "localnet".to_string(),
1494 rpc: fullnode_handle.rpc_url.clone(),
1495 ws: None,
1496 basic_auth: None,
1497 chain_id: None,
1498 });
1499 wallet_conf.active_env = Some("localnet".to_string());
1500
1501 wallet_conf
1502 .persisted(&working_dir.join(SUI_CLIENT_CONFIG))
1503 .save()
1504 .unwrap();
1505
1506 let wallet_conf = swarm.dir().join(SUI_CLIENT_CONFIG);
1507 let wallet = WalletContext::new(&wallet_conf).unwrap();
1508
1509 TestCluster {
1510 swarm,
1511 wallet,
1512 fullnode_handle,
1513 }
1514 }
1515
1516 async fn start_swarm(&mut self) -> Result<Swarm, anyhow::Error> {
1518 let mut builder: SwarmBuilder = Swarm::builder()
1519 .with_objects(self.additional_objects.clone())
1520 .with_db_checkpoint_config(self.db_checkpoint_config_validators.clone())
1521 .with_supported_protocol_versions_config(
1522 self.validator_supported_protocol_versions_config.clone(),
1523 )
1524 .with_global_state_hash_v2_enabled_config(
1525 self.validator_global_state_hash_v2_enabled_config.clone(),
1526 )
1527 .with_funds_withdraw_scheduler_type_config(
1528 self.validator_funds_withdraw_scheduler_type_config.clone(),
1529 )
1530 .with_fullnode_count(1)
1531 .with_fullnode_supported_protocol_versions_config(
1532 self.fullnode_supported_protocol_versions_config
1533 .clone()
1534 .unwrap_or(self.validator_supported_protocol_versions_config.clone()),
1535 )
1536 .with_db_checkpoint_config(self.db_checkpoint_config_fullnodes.clone())
1537 .with_fullnode_run_with_range(self.fullnode_run_with_range)
1538 .with_fullnode_policy_config(self.fullnode_policy_config.clone())
1539 .with_fullnode_fw_config(self.fullnode_fw_config.clone());
1540
1541 if let Some(validators) = self.validators.take() {
1542 builder = builder.with_validators(validators);
1543 } else {
1544 builder = builder.committee_size(
1545 NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDATOR)).unwrap(),
1546 )
1547 };
1548
1549 if let Some(chain) = self.chain_override {
1550 builder = builder.with_chain_override(chain);
1551 }
1552
1553 if let Some(genesis_config) = self.genesis_config.take() {
1554 builder = builder.with_genesis_config(genesis_config);
1555 }
1556
1557 if let Some(network_config) = self.network_config.take() {
1558 builder = builder.with_network_config(network_config);
1559 }
1560
1561 if let Some(authority_overload_config) = self.authority_overload_config.take() {
1562 builder = builder.with_authority_overload_config(authority_overload_config);
1563 }
1564
1565 if let Some(execution_cache_config) = self.execution_cache_config.take() {
1566 builder = builder.with_execution_cache_config(execution_cache_config);
1567 }
1568
1569 if let Some(fullnode_rpc_port) = self.fullnode_rpc_port {
1570 builder = builder.with_fullnode_rpc_port(fullnode_rpc_port);
1571 }
1572
1573 if let Some(rpc_config) = &self.rpc_config {
1574 builder = builder.with_fullnode_rpc_config(rpc_config.clone());
1575 }
1576 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
1577 builder = builder.with_num_unpruned_validators(num_unpruned_validators);
1578 }
1579
1580 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
1581 builder = builder.with_jwk_fetch_interval(jwk_fetch_interval);
1582 }
1583
1584 if let Some(config_dir) = self.config_dir.take() {
1585 builder = builder.dir(config_dir);
1586 }
1587
1588 if let Some(data_ingestion_dir) = self.data_ingestion_dir.take() {
1589 builder = builder.with_data_ingestion_dir(data_ingestion_dir);
1590 }
1591
1592 if let Some(max_submit_position) = self.max_submit_position {
1593 builder = builder.with_max_submit_position(max_submit_position);
1594 }
1595
1596 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis {
1597 builder =
1598 builder.with_submit_delay_step_override_millis(submit_delay_step_override_millis);
1599 }
1600
1601 if let Some(state_sync_config) = self.state_sync_config.clone() {
1602 builder = builder.with_state_sync_config(state_sync_config);
1603 }
1604
1605 if self.disable_fullnode_pruning {
1606 builder = builder.with_disable_fullnode_pruning();
1607 }
1608
1609 #[cfg(msim)]
1610 {
1611 if let Some(mut config) = self.execution_time_observer_config.clone() {
1612 if self.inject_synthetic_execution_time {
1613 config.inject_synthetic_execution_time = Some(true);
1614 }
1615 builder = builder.with_execution_time_observer_config(config);
1616 } else if self.inject_synthetic_execution_time {
1617 use sui_config::node::ExecutionTimeObserverConfig;
1618
1619 let mut config = ExecutionTimeObserverConfig::default();
1620 config.inject_synthetic_execution_time = Some(true);
1621 builder = builder.with_execution_time_observer_config(config);
1622 }
1623 }
1624
1625 let mut swarm = builder.build();
1626 swarm.launch().await?;
1627
1628 let dir = swarm.dir();
1629
1630 let network_path = dir.join(SUI_NETWORK_CONFIG);
1631 let wallet_path = dir.join(SUI_CLIENT_CONFIG);
1632 let keystore_path = dir.join(SUI_KEYSTORE_FILENAME);
1633
1634 swarm.config().save(network_path)?;
1635 let mut keystore = Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?);
1636 for key in &swarm.config().account_keys {
1637 keystore
1638 .import(None, SuiKeyPair::Ed25519(key.copy()))
1639 .await?;
1640 }
1641
1642 let active_address = keystore.addresses().first().cloned();
1643
1644 SuiClientConfig {
1646 keystore: Keystore::from(FileBasedKeystore::load_or_create(&keystore_path)?),
1647 external_keys: None,
1648 envs: Default::default(),
1649 active_address,
1650 active_env: Default::default(),
1651 }
1652 .save(wallet_path)?;
1653
1654 Ok(swarm)
1656 }
1657
1658 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
1659 if self.genesis_config.is_none() {
1660 self.genesis_config = Some(GenesisConfig::for_local_testing());
1661 }
1662 self.genesis_config.as_mut().unwrap()
1663 }
1664}
1665
1666impl Default for TestClusterBuilder {
1667 fn default() -> Self {
1668 Self::new()
1669 }
1670}