1use super::Node;
5use anyhow::Result;
6use futures::future::try_join_all;
7use rand::rngs::OsRng;
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::num::NonZeroUsize;
11use std::time::Duration;
12use std::{
13 ops,
14 path::{Path, PathBuf},
15};
16use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
17
18#[cfg(msim)]
19use sui_config::node::ExecutionTimeObserverConfig;
20use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
21use sui_config::{ExecutionCacheConfig, NodeConfig};
22use sui_macros::nondeterministic;
23use sui_node::SuiNodeHandle;
24use sui_protocol_config::{Chain, ProtocolVersion};
25use sui_swarm_config::genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig};
26use sui_swarm_config::network_config::NetworkConfig;
27use sui_swarm_config::network_config_builder::{
28 CommitteeConfig, ConfigBuilder, FundsWithdrawSchedulerTypeConfig,
29 GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig, SupportedProtocolVersionsCallback,
30};
31use sui_swarm_config::node_config_builder::FullnodeConfigBuilder;
32use sui_types::base_types::AuthorityName;
33use sui_types::object::Object;
34use sui_types::supported_protocol_versions::SupportedProtocolVersions;
35use tempfile::TempDir;
36use tracing::info;
37
38pub struct SwarmBuilder<R = OsRng> {
39 rng: R,
40 dir: Option<PathBuf>,
42 committee: CommitteeConfig,
43 genesis_config: Option<GenesisConfig>,
44 network_config: Option<NetworkConfig>,
45 chain_override: Option<Chain>,
46 additional_objects: Vec<Object>,
47 fullnode_count: usize,
48 fullnode_rpc_port: Option<u16>,
49 fullnode_rpc_addr: Option<SocketAddr>,
50 fullnode_rpc_config: Option<sui_config::RpcConfig>,
51 supported_protocol_versions_config: ProtocolVersionsConfig,
52 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
54 db_checkpoint_config: DBCheckpointConfig,
55 jwk_fetch_interval: Option<Duration>,
56 num_unpruned_validators: Option<usize>,
57 authority_overload_config: Option<AuthorityOverloadConfig>,
58 execution_cache_config: Option<ExecutionCacheConfig>,
59 data_ingestion_dir: Option<PathBuf>,
60 fullnode_run_with_range: Option<RunWithRange>,
61 fullnode_policy_config: Option<PolicyConfig>,
62 fullnode_fw_config: Option<RemoteFirewallConfig>,
63 max_submit_position: Option<usize>,
64 submit_delay_step_override_millis: Option<u64>,
65 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
66 funds_withdraw_scheduler_type_config: Option<FundsWithdrawSchedulerTypeConfig>,
67 disable_fullnode_pruning: bool,
68 state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
69 #[cfg(msim)]
70 execution_time_observer_config: Option<ExecutionTimeObserverConfig>,
71}
72
73impl SwarmBuilder {
74 #[allow(clippy::new_without_default)]
75 pub fn new() -> Self {
76 Self {
77 rng: OsRng,
78 dir: None,
79 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
80 genesis_config: None,
81 network_config: None,
82 chain_override: None,
83 additional_objects: vec![],
84 fullnode_count: 0,
85 fullnode_rpc_port: None,
86 fullnode_rpc_addr: None,
87 fullnode_rpc_config: None,
88 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
89 fullnode_supported_protocol_versions_config: None,
90 db_checkpoint_config: DBCheckpointConfig::default(),
91 jwk_fetch_interval: None,
92 num_unpruned_validators: None,
93 authority_overload_config: None,
94 execution_cache_config: None,
95 data_ingestion_dir: None,
96 fullnode_run_with_range: None,
97 fullnode_policy_config: None,
98 fullnode_fw_config: None,
99 max_submit_position: None,
100 submit_delay_step_override_millis: None,
101 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(true),
102 funds_withdraw_scheduler_type_config: None,
103 disable_fullnode_pruning: false,
104 state_sync_config: None,
105 #[cfg(msim)]
106 execution_time_observer_config: None,
107 }
108 }
109}
110
111impl<R> SwarmBuilder<R> {
112 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
113 SwarmBuilder {
114 rng,
115 dir: self.dir,
116 committee: self.committee,
117 genesis_config: self.genesis_config,
118 network_config: self.network_config,
119 chain_override: self.chain_override,
120 additional_objects: self.additional_objects,
121 fullnode_count: self.fullnode_count,
122 fullnode_rpc_port: self.fullnode_rpc_port,
123 fullnode_rpc_addr: self.fullnode_rpc_addr,
124 fullnode_rpc_config: self.fullnode_rpc_config.clone(),
125 supported_protocol_versions_config: self.supported_protocol_versions_config,
126 fullnode_supported_protocol_versions_config: self
127 .fullnode_supported_protocol_versions_config,
128 db_checkpoint_config: self.db_checkpoint_config,
129 jwk_fetch_interval: self.jwk_fetch_interval,
130 num_unpruned_validators: self.num_unpruned_validators,
131 authority_overload_config: self.authority_overload_config,
132 execution_cache_config: self.execution_cache_config,
133 data_ingestion_dir: self.data_ingestion_dir,
134 fullnode_run_with_range: self.fullnode_run_with_range,
135 fullnode_policy_config: self.fullnode_policy_config,
136 fullnode_fw_config: self.fullnode_fw_config,
137 max_submit_position: self.max_submit_position,
138 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
139 global_state_hash_v2_enabled_config: self.global_state_hash_v2_enabled_config,
140 funds_withdraw_scheduler_type_config: self.funds_withdraw_scheduler_type_config,
141 disable_fullnode_pruning: self.disable_fullnode_pruning,
142 state_sync_config: self.state_sync_config,
143 #[cfg(msim)]
144 execution_time_observer_config: self.execution_time_observer_config,
145 }
146 }
147
148 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
154 self.dir = Some(dir.into());
155 self
156 }
157
158 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
162 self.committee = CommitteeConfig::Size(committee_size);
163 self
164 }
165
166 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
167 self.committee = CommitteeConfig::Validators(validators);
168 self
169 }
170
171 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
172 assert!(self.network_config.is_none() && self.genesis_config.is_none());
173 self.genesis_config = Some(genesis_config);
174 self
175 }
176
177 pub fn with_chain_override(mut self, chain: Chain) -> Self {
178 assert!(self.chain_override.is_none());
179 self.chain_override = Some(chain);
180 self
181 }
182
183 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
184 assert!(self.network_config.is_none());
185 self.num_unpruned_validators = Some(n);
186 self
187 }
188
189 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
190 self.jwk_fetch_interval = Some(i);
191 self
192 }
193
194 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
195 assert!(self.network_config.is_none() && self.genesis_config.is_none());
196 self.network_config = Some(network_config);
197 self
198 }
199
200 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
201 self.get_or_init_genesis_config().accounts = accounts;
202 self
203 }
204
205 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
206 self.additional_objects.extend(objects);
207 self
208 }
209
210 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
211 self.fullnode_count = fullnode_count;
212 self
213 }
214
215 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
216 assert!(self.fullnode_rpc_addr.is_none());
217 self.fullnode_rpc_port = Some(fullnode_rpc_port);
218 self
219 }
220
221 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
222 assert!(self.fullnode_rpc_port.is_none());
223 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
224 self
225 }
226
227 pub fn with_fullnode_rpc_config(mut self, fullnode_rpc_config: sui_config::RpcConfig) -> Self {
228 self.fullnode_rpc_config = Some(fullnode_rpc_config);
229 self
230 }
231
232 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
233 assert!(
234 epoch_duration_ms >= 10000,
235 "Epoch duration must be at least 10s (10000ms) to avoid flaky tests. Got {epoch_duration_ms}ms."
236 );
237 self.get_or_init_genesis_config()
238 .parameters
239 .epoch_duration_ms = epoch_duration_ms;
240 self
241 }
242
243 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
244 self.get_or_init_genesis_config()
245 .parameters
246 .protocol_version = v;
247 self
248 }
249
250 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
251 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
252 self
253 }
254
255 pub fn with_supported_protocol_version_callback(
256 mut self,
257 func: SupportedProtocolVersionsCallback,
258 ) -> Self {
259 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
260 self
261 }
262
263 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
264 self.supported_protocol_versions_config = c;
265 self
266 }
267
268 pub fn with_global_state_hash_v2_enabled_config(
269 mut self,
270 c: GlobalStateHashV2EnabledConfig,
271 ) -> Self {
272 self.global_state_hash_v2_enabled_config = c;
273 self
274 }
275
276 pub fn with_funds_withdraw_scheduler_type_config(
277 mut self,
278 c: FundsWithdrawSchedulerTypeConfig,
279 ) -> Self {
280 self.funds_withdraw_scheduler_type_config = Some(c);
281 self
282 }
283
284 #[cfg(msim)]
285 pub fn with_execution_time_observer_config(mut self, c: ExecutionTimeObserverConfig) -> Self {
286 self.execution_time_observer_config = Some(c);
287 self
288 }
289
290 pub fn with_fullnode_supported_protocol_versions_config(
291 mut self,
292 c: ProtocolVersionsConfig,
293 ) -> Self {
294 self.fullnode_supported_protocol_versions_config = Some(c);
295 self
296 }
297
298 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
299 self.db_checkpoint_config = db_checkpoint_config;
300 self
301 }
302
303 pub fn with_authority_overload_config(
304 mut self,
305 authority_overload_config: AuthorityOverloadConfig,
306 ) -> Self {
307 assert!(self.network_config.is_none());
308 self.authority_overload_config = Some(authority_overload_config);
309 self
310 }
311
312 pub fn with_execution_cache_config(
313 mut self,
314 execution_cache_config: ExecutionCacheConfig,
315 ) -> Self {
316 self.execution_cache_config = Some(execution_cache_config);
317 self
318 }
319
320 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
321 self.data_ingestion_dir = Some(path);
322 self
323 }
324
325 pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
326 self.state_sync_config = Some(config);
327 self
328 }
329
330 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
331 if let Some(run_with_range) = run_with_range {
332 self.fullnode_run_with_range = Some(run_with_range);
333 }
334 self
335 }
336
337 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
338 self.fullnode_policy_config = config;
339 self
340 }
341
342 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
343 self.fullnode_fw_config = config;
344 self
345 }
346
347 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
348 if self.genesis_config.is_none() {
349 assert!(self.network_config.is_none());
350 self.genesis_config = Some(GenesisConfig::for_local_testing());
351 }
352 self.genesis_config.as_mut().unwrap()
353 }
354
355 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
356 self.max_submit_position = Some(max_submit_position);
357 self
358 }
359
360 pub fn with_disable_fullnode_pruning(mut self) -> Self {
361 self.disable_fullnode_pruning = true;
362 self
363 }
364
365 pub fn with_submit_delay_step_override_millis(
366 mut self,
367 submit_delay_step_override_millis: u64,
368 ) -> Self {
369 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
370 self
371 }
372}
373
374impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
375 pub fn build(self) -> Swarm {
377 let dir = if let Some(dir) = self.dir {
378 SwarmDirectory::Persistent(dir)
379 } else {
380 SwarmDirectory::new_temporary()
381 };
382
383 let ingest_data = self.data_ingestion_dir.clone();
384
385 let network_config = self.network_config.unwrap_or_else(|| {
386 let mut config_builder = ConfigBuilder::new(dir.as_ref());
387
388 if let Some(genesis_config) = self.genesis_config {
389 config_builder = config_builder.with_genesis_config(genesis_config);
390 }
391
392 if let Some(chain_override) = self.chain_override {
393 config_builder = config_builder.with_chain_override(chain_override);
394 }
395
396 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
397 config_builder =
398 config_builder.with_num_unpruned_validators(num_unpruned_validators);
399 }
400
401 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
402 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
403 }
404
405 if let Some(authority_overload_config) = self.authority_overload_config {
406 config_builder =
407 config_builder.with_authority_overload_config(authority_overload_config);
408 }
409
410 if let Some(execution_cache_config) = self.execution_cache_config {
411 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
412 }
413
414 if let Some(path) = self.data_ingestion_dir {
415 config_builder = config_builder.with_data_ingestion_dir(path);
416 }
417
418 if let Some(max_submit_position) = self.max_submit_position {
419 config_builder = config_builder.with_max_submit_position(max_submit_position);
420 }
421
422 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
423 {
424 config_builder = config_builder
425 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
426 }
427
428 #[allow(unused_mut)]
429 let mut final_builder = config_builder
430 .committee(self.committee)
431 .rng(self.rng)
432 .with_objects(self.additional_objects)
433 .with_supported_protocol_versions_config(
434 self.supported_protocol_versions_config.clone(),
435 )
436 .with_global_state_hash_v2_enabled_config(
437 self.global_state_hash_v2_enabled_config.clone(),
438 );
439
440 if let Some(funds_withdraw_scheduler_type_config) =
441 self.funds_withdraw_scheduler_type_config.clone()
442 {
443 final_builder = final_builder.with_funds_withdraw_scheduler_type_config(
444 funds_withdraw_scheduler_type_config,
445 );
446 }
447
448 if let Some(state_sync_config) = self.state_sync_config.clone() {
449 final_builder = final_builder.with_state_sync_config(state_sync_config);
450 }
451
452 #[cfg(msim)]
453 if let Some(execution_time_observer_config) = self.execution_time_observer_config {
454 final_builder = final_builder
455 .with_execution_time_observer_config(execution_time_observer_config);
456 }
457
458 final_builder.build()
459 });
460
461 let mut nodes: HashMap<_, _> = network_config
462 .validator_configs()
463 .iter()
464 .map(|config| {
465 info!(
466 "SwarmBuilder configuring validator with name {}",
467 config.protocol_public_key()
468 );
469 (config.protocol_public_key(), Node::new(config.to_owned()))
470 })
471 .collect();
472
473 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
474 .with_config_directory(dir.as_ref().into())
475 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
476 .with_run_with_range(self.fullnode_run_with_range)
477 .with_policy_config(self.fullnode_policy_config)
478 .with_data_ingestion_dir(ingest_data)
479 .with_fw_config(self.fullnode_fw_config)
480 .with_disable_pruning(self.disable_fullnode_pruning);
481
482 if let Some(state_sync_config) = self.state_sync_config.clone() {
483 fullnode_config_builder =
484 fullnode_config_builder.with_state_sync_config(state_sync_config);
485 }
486
487 if let Some(chain) = self.chain_override {
488 fullnode_config_builder = fullnode_config_builder.with_chain_override(chain);
489 }
490
491 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
492 let supported_versions = match spvc {
493 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
494 ProtocolVersionsConfig::Global(v) => *v,
495 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
496 };
497 fullnode_config_builder =
498 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
499 }
500
501 if self.fullnode_count > 0 {
502 (0..self.fullnode_count).for_each(|idx| {
503 let mut builder = fullnode_config_builder.clone();
504 if idx == 0 {
505 if let Some(rpc_addr) = self.fullnode_rpc_addr {
508 builder = builder.with_rpc_addr(rpc_addr);
509 }
510 if let Some(rpc_port) = self.fullnode_rpc_port {
511 builder = builder.with_rpc_port(rpc_port);
512 }
513 if let Some(rpc_config) = &self.fullnode_rpc_config {
514 builder = builder.with_rpc_config(rpc_config.clone());
515 }
516 }
517 let config = builder.build(&mut OsRng, &network_config);
518 info!(
519 "SwarmBuilder configuring full node with name {}",
520 config.protocol_public_key()
521 );
522 nodes.insert(config.protocol_public_key(), Node::new(config));
523 });
524 }
525 Swarm {
526 dir,
527 network_config,
528 nodes,
529 fullnode_config_builder,
530 }
531 }
532}
533
534#[derive(Debug)]
536pub struct Swarm {
537 dir: SwarmDirectory,
538 network_config: NetworkConfig,
539 nodes: HashMap<AuthorityName, Node>,
540 fullnode_config_builder: FullnodeConfigBuilder,
542}
543
544impl Drop for Swarm {
545 fn drop(&mut self) {
546 self.nodes_iter_mut().for_each(|node| node.stop());
547 }
548}
549
550impl Swarm {
551 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
552 self.nodes.values_mut()
553 }
554
555 pub fn builder() -> SwarmBuilder {
557 SwarmBuilder::new()
558 }
559
560 pub async fn launch(&mut self) -> Result<()> {
562 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
563 tracing::info!("Successfully launched Swarm");
564 Ok(())
565 }
566
567 pub fn dir(&self) -> &Path {
569 self.dir.as_ref()
570 }
571
572 pub fn config(&self) -> &NetworkConfig {
574 &self.network_config
575 }
576
577 pub fn config_mut(&mut self) -> &mut NetworkConfig {
580 &mut self.network_config
581 }
582
583 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
584 self.nodes.values()
585 }
586
587 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
588 self.nodes.get(name)
589 }
590
591 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
592 self.nodes.get_mut(name)
593 }
594
595 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
599 self.nodes
600 .values()
601 .filter(|node| node.config().consensus_config.is_some())
602 }
603
604 pub fn validator_node_handles(&self) -> Vec<SuiNodeHandle> {
605 self.validator_nodes()
606 .map(|node| node.get_node_handle().unwrap())
607 .collect()
608 }
609
610 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
612 self.validator_nodes().filter(|node| {
613 node.get_node_handle().is_some_and(|handle| {
614 let state = handle.state();
615 state.is_validator(&state.epoch_store_for_testing())
616 })
617 })
618 }
619
620 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
622 self.nodes
623 .values()
624 .filter(|node| node.config().consensus_config.is_none())
625 }
626
627 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> SuiNodeHandle {
628 let name = config.protocol_public_key();
629 let node = Node::new(config);
630 node.start().await.unwrap();
631 let handle = node.get_node_handle().unwrap();
632 self.nodes.insert(name, node);
633 handle
634 }
635
636 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
637 self.fullnode_config_builder.clone()
638 }
639}
640
641#[derive(Debug)]
642enum SwarmDirectory {
643 Persistent(PathBuf),
644 Temporary(TempDir),
645}
646
647impl SwarmDirectory {
648 fn new_temporary() -> Self {
649 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
650 }
651}
652
653impl ops::Deref for SwarmDirectory {
654 type Target = Path;
655
656 fn deref(&self) -> &Self::Target {
657 match self {
658 SwarmDirectory::Persistent(dir) => dir.deref(),
659 SwarmDirectory::Temporary(dir) => dir.path(),
660 }
661 }
662}
663
664impl AsRef<Path> for SwarmDirectory {
665 fn as_ref(&self) -> &Path {
666 match self {
667 SwarmDirectory::Persistent(dir) => dir.as_ref(),
668 SwarmDirectory::Temporary(dir) => dir.as_ref(),
669 }
670 }
671}
672
673#[cfg(test)]
674mod test {
675 use super::Swarm;
676 use std::num::NonZeroUsize;
677
678 #[tokio::test]
679 async fn launch() {
680 telemetry_subscribers::init_for_testing();
681 let mut swarm = Swarm::builder()
682 .committee_size(NonZeroUsize::new(4).unwrap())
683 .with_fullnode_count(1)
684 .build();
685
686 swarm.launch().await.unwrap();
687
688 for validator in swarm.validator_nodes() {
689 validator.health_check(true).await.unwrap();
690 }
691
692 for fullnode in swarm.fullnodes() {
693 fullnode.health_check(false).await.unwrap();
694 }
695
696 println!("hello");
697 }
698}