1use super::Node;
5use anyhow::Result;
6use futures::future::try_join_all;
7use rand::rngs::OsRng;
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::num::NonZeroUsize;
11use std::time::Duration;
12use std::{
13 ops,
14 path::{Path, PathBuf},
15};
16use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
17
18#[cfg(msim)]
19use sui_config::node::ExecutionTimeObserverConfig;
20use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
21use sui_config::{ExecutionCacheConfig, NodeConfig};
22use sui_macros::nondeterministic;
23use sui_node::SuiNodeHandle;
24use sui_protocol_config::{Chain, ProtocolVersion};
25use sui_swarm_config::genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig};
26use sui_swarm_config::network_config::NetworkConfig;
27use sui_swarm_config::network_config_builder::{
28 CommitteeConfig, ConfigBuilder, FundsWithdrawSchedulerTypeConfig,
29 GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig, SupportedProtocolVersionsCallback,
30};
31use sui_swarm_config::node_config_builder::FullnodeConfigBuilder;
32use sui_types::base_types::AuthorityName;
33use sui_types::object::Object;
34use sui_types::supported_protocol_versions::SupportedProtocolVersions;
35use tempfile::TempDir;
36use tracing::info;
37
38pub struct SwarmBuilder<R = OsRng> {
39 rng: R,
40 dir: Option<PathBuf>,
42 committee: CommitteeConfig,
43 genesis_config: Option<GenesisConfig>,
44 network_config: Option<NetworkConfig>,
45 chain_override: Option<Chain>,
46 additional_objects: Vec<Object>,
47 fullnode_count: usize,
48 fullnode_rpc_port: Option<u16>,
49 fullnode_rpc_addr: Option<SocketAddr>,
50 fullnode_rpc_config: Option<sui_config::RpcConfig>,
51 supported_protocol_versions_config: ProtocolVersionsConfig,
52 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
54 db_checkpoint_config: DBCheckpointConfig,
55 jwk_fetch_interval: Option<Duration>,
56 num_unpruned_validators: Option<usize>,
57 authority_overload_config: Option<AuthorityOverloadConfig>,
58 execution_cache_config: Option<ExecutionCacheConfig>,
59 data_ingestion_dir: Option<PathBuf>,
60 fullnode_run_with_range: Option<RunWithRange>,
61 fullnode_policy_config: Option<PolicyConfig>,
62 fullnode_fw_config: Option<RemoteFirewallConfig>,
63 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
64 funds_withdraw_scheduler_type_config: Option<FundsWithdrawSchedulerTypeConfig>,
65 disable_fullnode_pruning: bool,
66 state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
67 #[cfg(msim)]
68 execution_time_observer_config: Option<ExecutionTimeObserverConfig>,
69}
70
71impl SwarmBuilder {
72 #[allow(clippy::new_without_default)]
73 pub fn new() -> Self {
74 Self {
75 rng: OsRng,
76 dir: None,
77 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
78 genesis_config: None,
79 network_config: None,
80 chain_override: None,
81 additional_objects: vec![],
82 fullnode_count: 0,
83 fullnode_rpc_port: None,
84 fullnode_rpc_addr: None,
85 fullnode_rpc_config: None,
86 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
87 fullnode_supported_protocol_versions_config: None,
88 db_checkpoint_config: DBCheckpointConfig::default(),
89 jwk_fetch_interval: None,
90 num_unpruned_validators: None,
91 authority_overload_config: None,
92 execution_cache_config: None,
93 data_ingestion_dir: None,
94 fullnode_run_with_range: None,
95 fullnode_policy_config: None,
96 fullnode_fw_config: None,
97 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(true),
98 funds_withdraw_scheduler_type_config: None,
99 disable_fullnode_pruning: false,
100 state_sync_config: None,
101 #[cfg(msim)]
102 execution_time_observer_config: None,
103 }
104 }
105}
106
107impl<R> SwarmBuilder<R> {
108 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
109 SwarmBuilder {
110 rng,
111 dir: self.dir,
112 committee: self.committee,
113 genesis_config: self.genesis_config,
114 network_config: self.network_config,
115 chain_override: self.chain_override,
116 additional_objects: self.additional_objects,
117 fullnode_count: self.fullnode_count,
118 fullnode_rpc_port: self.fullnode_rpc_port,
119 fullnode_rpc_addr: self.fullnode_rpc_addr,
120 fullnode_rpc_config: self.fullnode_rpc_config.clone(),
121 supported_protocol_versions_config: self.supported_protocol_versions_config,
122 fullnode_supported_protocol_versions_config: self
123 .fullnode_supported_protocol_versions_config,
124 db_checkpoint_config: self.db_checkpoint_config,
125 jwk_fetch_interval: self.jwk_fetch_interval,
126 num_unpruned_validators: self.num_unpruned_validators,
127 authority_overload_config: self.authority_overload_config,
128 execution_cache_config: self.execution_cache_config,
129 data_ingestion_dir: self.data_ingestion_dir,
130 fullnode_run_with_range: self.fullnode_run_with_range,
131 fullnode_policy_config: self.fullnode_policy_config,
132 fullnode_fw_config: self.fullnode_fw_config,
133 global_state_hash_v2_enabled_config: self.global_state_hash_v2_enabled_config,
134 funds_withdraw_scheduler_type_config: self.funds_withdraw_scheduler_type_config,
135 disable_fullnode_pruning: self.disable_fullnode_pruning,
136 state_sync_config: self.state_sync_config,
137 #[cfg(msim)]
138 execution_time_observer_config: self.execution_time_observer_config,
139 }
140 }
141
142 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
148 self.dir = Some(dir.into());
149 self
150 }
151
152 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
156 self.committee = CommitteeConfig::Size(committee_size);
157 self
158 }
159
160 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
161 self.committee = CommitteeConfig::Validators(validators);
162 self
163 }
164
165 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
166 assert!(self.network_config.is_none() && self.genesis_config.is_none());
167 self.genesis_config = Some(genesis_config);
168 self
169 }
170
171 pub fn with_chain_override(mut self, chain: Chain) -> Self {
172 assert!(self.chain_override.is_none());
173 self.chain_override = Some(chain);
174 self
175 }
176
177 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
178 assert!(self.network_config.is_none());
179 self.num_unpruned_validators = Some(n);
180 self
181 }
182
183 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
184 self.jwk_fetch_interval = Some(i);
185 self
186 }
187
188 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
189 assert!(self.network_config.is_none() && self.genesis_config.is_none());
190 self.network_config = Some(network_config);
191 self
192 }
193
194 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
195 self.get_or_init_genesis_config().accounts = accounts;
196 self
197 }
198
199 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
200 self.additional_objects.extend(objects);
201 self
202 }
203
204 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
205 self.fullnode_count = fullnode_count;
206 self
207 }
208
209 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
210 assert!(self.fullnode_rpc_addr.is_none());
211 self.fullnode_rpc_port = Some(fullnode_rpc_port);
212 self
213 }
214
215 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
216 assert!(self.fullnode_rpc_port.is_none());
217 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
218 self
219 }
220
221 pub fn with_fullnode_rpc_config(mut self, fullnode_rpc_config: sui_config::RpcConfig) -> Self {
222 self.fullnode_rpc_config = Some(fullnode_rpc_config);
223 self
224 }
225
226 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
227 assert!(
228 epoch_duration_ms >= 10000,
229 "Epoch duration must be at least 10s (10000ms) to avoid flaky tests. Got {epoch_duration_ms}ms."
230 );
231 self.get_or_init_genesis_config()
232 .parameters
233 .epoch_duration_ms = epoch_duration_ms;
234 self
235 }
236
237 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
238 self.get_or_init_genesis_config()
239 .parameters
240 .protocol_version = v;
241 self
242 }
243
244 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
245 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
246 self
247 }
248
249 pub fn with_supported_protocol_version_callback(
250 mut self,
251 func: SupportedProtocolVersionsCallback,
252 ) -> Self {
253 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
254 self
255 }
256
257 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
258 self.supported_protocol_versions_config = c;
259 self
260 }
261
262 pub fn with_global_state_hash_v2_enabled_config(
263 mut self,
264 c: GlobalStateHashV2EnabledConfig,
265 ) -> Self {
266 self.global_state_hash_v2_enabled_config = c;
267 self
268 }
269
270 pub fn with_funds_withdraw_scheduler_type_config(
271 mut self,
272 c: FundsWithdrawSchedulerTypeConfig,
273 ) -> Self {
274 self.funds_withdraw_scheduler_type_config = Some(c);
275 self
276 }
277
278 #[cfg(msim)]
279 pub fn with_execution_time_observer_config(mut self, c: ExecutionTimeObserverConfig) -> Self {
280 self.execution_time_observer_config = Some(c);
281 self
282 }
283
284 pub fn with_fullnode_supported_protocol_versions_config(
285 mut self,
286 c: ProtocolVersionsConfig,
287 ) -> Self {
288 self.fullnode_supported_protocol_versions_config = Some(c);
289 self
290 }
291
292 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
293 self.db_checkpoint_config = db_checkpoint_config;
294 self
295 }
296
297 pub fn with_authority_overload_config(
298 mut self,
299 authority_overload_config: AuthorityOverloadConfig,
300 ) -> Self {
301 assert!(self.network_config.is_none());
302 self.authority_overload_config = Some(authority_overload_config);
303 self
304 }
305
306 pub fn with_execution_cache_config(
307 mut self,
308 execution_cache_config: ExecutionCacheConfig,
309 ) -> Self {
310 self.execution_cache_config = Some(execution_cache_config);
311 self
312 }
313
314 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
315 self.data_ingestion_dir = Some(path);
316 self
317 }
318
319 pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
320 self.state_sync_config = Some(config);
321 self
322 }
323
324 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
325 if let Some(run_with_range) = run_with_range {
326 self.fullnode_run_with_range = Some(run_with_range);
327 }
328 self
329 }
330
331 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
332 self.fullnode_policy_config = config;
333 self
334 }
335
336 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
337 self.fullnode_fw_config = config;
338 self
339 }
340
341 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
342 if self.genesis_config.is_none() {
343 assert!(self.network_config.is_none());
344 self.genesis_config = Some(GenesisConfig::for_local_testing());
345 }
346 self.genesis_config.as_mut().unwrap()
347 }
348
349 pub fn with_disable_fullnode_pruning(mut self) -> Self {
350 self.disable_fullnode_pruning = true;
351 self
352 }
353}
354
355impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
356 pub fn build(self) -> Swarm {
358 let dir = if let Some(dir) = self.dir {
359 SwarmDirectory::Persistent(dir)
360 } else {
361 SwarmDirectory::new_temporary()
362 };
363
364 let ingest_data = self.data_ingestion_dir.clone();
365
366 let network_config = self.network_config.unwrap_or_else(|| {
367 let mut config_builder = ConfigBuilder::new(dir.as_ref());
368
369 if let Some(genesis_config) = self.genesis_config {
370 config_builder = config_builder.with_genesis_config(genesis_config);
371 }
372
373 if let Some(chain_override) = self.chain_override {
374 config_builder = config_builder.with_chain_override(chain_override);
375 }
376
377 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
378 config_builder =
379 config_builder.with_num_unpruned_validators(num_unpruned_validators);
380 }
381
382 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
383 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
384 }
385
386 if let Some(authority_overload_config) = self.authority_overload_config {
387 config_builder =
388 config_builder.with_authority_overload_config(authority_overload_config);
389 }
390
391 if let Some(execution_cache_config) = self.execution_cache_config {
392 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
393 }
394
395 if let Some(path) = self.data_ingestion_dir {
396 config_builder = config_builder.with_data_ingestion_dir(path);
397 }
398
399 #[allow(unused_mut)]
400 let mut final_builder = config_builder
401 .committee(self.committee)
402 .rng(self.rng)
403 .with_objects(self.additional_objects)
404 .with_supported_protocol_versions_config(
405 self.supported_protocol_versions_config.clone(),
406 )
407 .with_global_state_hash_v2_enabled_config(
408 self.global_state_hash_v2_enabled_config.clone(),
409 );
410
411 if let Some(funds_withdraw_scheduler_type_config) =
412 self.funds_withdraw_scheduler_type_config.clone()
413 {
414 final_builder = final_builder.with_funds_withdraw_scheduler_type_config(
415 funds_withdraw_scheduler_type_config,
416 );
417 }
418
419 if let Some(state_sync_config) = self.state_sync_config.clone() {
420 final_builder = final_builder.with_state_sync_config(state_sync_config);
421 }
422
423 #[cfg(msim)]
424 if let Some(execution_time_observer_config) = self.execution_time_observer_config {
425 final_builder = final_builder
426 .with_execution_time_observer_config(execution_time_observer_config);
427 }
428
429 final_builder.build()
430 });
431
432 let mut nodes: HashMap<_, _> = network_config
433 .validator_configs()
434 .iter()
435 .map(|config| {
436 info!(
437 "SwarmBuilder configuring validator with name {}",
438 config.protocol_public_key()
439 );
440 (config.protocol_public_key(), Node::new(config.to_owned()))
441 })
442 .collect();
443
444 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
445 .with_config_directory(dir.as_ref().into())
446 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
447 .with_run_with_range(self.fullnode_run_with_range)
448 .with_policy_config(self.fullnode_policy_config)
449 .with_data_ingestion_dir(ingest_data)
450 .with_fw_config(self.fullnode_fw_config)
451 .with_disable_pruning(self.disable_fullnode_pruning);
452
453 if let Some(state_sync_config) = self.state_sync_config.clone() {
454 fullnode_config_builder =
455 fullnode_config_builder.with_state_sync_config(state_sync_config);
456 }
457
458 if let Some(chain) = self.chain_override {
459 fullnode_config_builder = fullnode_config_builder.with_chain_override(chain);
460 }
461
462 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
463 let supported_versions = match spvc {
464 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
465 ProtocolVersionsConfig::Global(v) => *v,
466 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
467 };
468 fullnode_config_builder =
469 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
470 }
471
472 if self.fullnode_count > 0 {
473 (0..self.fullnode_count).for_each(|idx| {
474 let mut builder = fullnode_config_builder.clone();
475 if idx == 0 {
476 if let Some(rpc_addr) = self.fullnode_rpc_addr {
479 builder = builder.with_rpc_addr(rpc_addr);
480 }
481 if let Some(rpc_port) = self.fullnode_rpc_port {
482 builder = builder.with_rpc_port(rpc_port);
483 }
484 if let Some(rpc_config) = &self.fullnode_rpc_config {
485 builder = builder.with_rpc_config(rpc_config.clone());
486 }
487 }
488 let config = builder.build(&mut OsRng, &network_config);
489 info!(
490 "SwarmBuilder configuring full node with name {}",
491 config.protocol_public_key()
492 );
493 nodes.insert(config.protocol_public_key(), Node::new(config));
494 });
495 }
496 Swarm {
497 dir,
498 network_config,
499 nodes,
500 fullnode_config_builder,
501 }
502 }
503}
504
505#[derive(Debug)]
507pub struct Swarm {
508 dir: SwarmDirectory,
509 network_config: NetworkConfig,
510 nodes: HashMap<AuthorityName, Node>,
511 fullnode_config_builder: FullnodeConfigBuilder,
513}
514
515impl Drop for Swarm {
516 fn drop(&mut self) {
517 self.nodes_iter_mut().for_each(|node| node.stop());
518 }
519}
520
521impl Swarm {
522 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
523 self.nodes.values_mut()
524 }
525
526 pub fn builder() -> SwarmBuilder {
528 SwarmBuilder::new()
529 }
530
531 pub async fn launch(&mut self) -> Result<()> {
533 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
534 tracing::info!("Successfully launched Swarm");
535 Ok(())
536 }
537
538 pub fn dir(&self) -> &Path {
540 self.dir.as_ref()
541 }
542
543 pub fn config(&self) -> &NetworkConfig {
545 &self.network_config
546 }
547
548 pub fn config_mut(&mut self) -> &mut NetworkConfig {
551 &mut self.network_config
552 }
553
554 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
555 self.nodes.values()
556 }
557
558 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
559 self.nodes.get(name)
560 }
561
562 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
563 self.nodes.get_mut(name)
564 }
565
566 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
570 self.nodes
571 .values()
572 .filter(|node| node.config().consensus_config.is_some())
573 }
574
575 pub fn validator_node_handles(&self) -> Vec<SuiNodeHandle> {
576 self.validator_nodes()
577 .map(|node| node.get_node_handle().unwrap())
578 .collect()
579 }
580
581 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
583 self.validator_nodes().filter(|node| {
584 node.get_node_handle().is_some_and(|handle| {
585 let state = handle.state();
586 state.is_validator(&state.epoch_store_for_testing())
587 })
588 })
589 }
590
591 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
593 self.nodes
594 .values()
595 .filter(|node| node.config().consensus_config.is_none())
596 }
597
598 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> SuiNodeHandle {
599 let name = config.protocol_public_key();
600 let node = Node::new(config);
601 node.start().await.unwrap();
602 let handle = node.get_node_handle().unwrap();
603 self.nodes.insert(name, node);
604 handle
605 }
606
607 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
608 self.fullnode_config_builder.clone()
609 }
610}
611
612#[derive(Debug)]
613enum SwarmDirectory {
614 Persistent(PathBuf),
615 Temporary(TempDir),
616}
617
618impl SwarmDirectory {
619 fn new_temporary() -> Self {
620 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
621 }
622}
623
624impl ops::Deref for SwarmDirectory {
625 type Target = Path;
626
627 fn deref(&self) -> &Self::Target {
628 match self {
629 SwarmDirectory::Persistent(dir) => dir.deref(),
630 SwarmDirectory::Temporary(dir) => dir.path(),
631 }
632 }
633}
634
635impl AsRef<Path> for SwarmDirectory {
636 fn as_ref(&self) -> &Path {
637 match self {
638 SwarmDirectory::Persistent(dir) => dir.as_ref(),
639 SwarmDirectory::Temporary(dir) => dir.as_ref(),
640 }
641 }
642}
643
644#[cfg(test)]
645mod test {
646 use super::Swarm;
647 use std::num::NonZeroUsize;
648
649 #[tokio::test]
650 async fn launch() {
651 telemetry_subscribers::init_for_testing();
652 let mut swarm = Swarm::builder()
653 .committee_size(NonZeroUsize::new(4).unwrap())
654 .with_fullnode_count(1)
655 .build();
656
657 swarm.launch().await.unwrap();
658
659 for validator in swarm.validator_nodes() {
660 validator.health_check(true).await.unwrap();
661 }
662
663 for fullnode in swarm.fullnodes() {
664 fullnode.health_check(false).await.unwrap();
665 }
666
667 println!("hello");
668 }
669}