1use super::Node;
5use anyhow::Result;
6use futures::future::try_join_all;
7use rand::rngs::OsRng;
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::num::NonZeroUsize;
11use std::time::Duration;
12use std::{
13 ops,
14 path::{Path, PathBuf},
15};
16use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig};
17
18#[cfg(msim)]
19use sui_config::node::ExecutionTimeObserverConfig;
20use sui_config::node::{AuthorityOverloadConfig, DBCheckpointConfig, RunWithRange};
21use sui_config::{ExecutionCacheConfig, NodeConfig};
22use sui_macros::nondeterministic;
23use sui_node::SuiNodeHandle;
24use sui_protocol_config::{Chain, ProtocolVersion};
25use sui_swarm_config::genesis_config::{AccountConfig, GenesisConfig, ValidatorGenesisConfig};
26use sui_swarm_config::network_config::NetworkConfig;
27use sui_swarm_config::network_config_builder::{
28 CommitteeConfig, ConfigBuilder, GlobalStateHashV2EnabledConfig, ProtocolVersionsConfig,
29 SupportedProtocolVersionsCallback,
30};
31use sui_swarm_config::node_config_builder::FullnodeConfigBuilder;
32use sui_types::base_types::AuthorityName;
33use sui_types::object::Object;
34use sui_types::supported_protocol_versions::SupportedProtocolVersions;
35use tempfile::TempDir;
36use tracing::info;
37
38pub struct SwarmBuilder<R = OsRng> {
39 rng: R,
40 dir: Option<PathBuf>,
42 committee: CommitteeConfig,
43 genesis_config: Option<GenesisConfig>,
44 network_config: Option<NetworkConfig>,
45 chain_override: Option<Chain>,
46 additional_objects: Vec<Object>,
47 fullnode_count: usize,
48 fullnode_rpc_port: Option<u16>,
49 fullnode_rpc_addr: Option<SocketAddr>,
50 fullnode_rpc_config: Option<sui_config::RpcConfig>,
51 supported_protocol_versions_config: ProtocolVersionsConfig,
52 fullnode_supported_protocol_versions_config: Option<ProtocolVersionsConfig>,
54 db_checkpoint_config: DBCheckpointConfig,
55 jwk_fetch_interval: Option<Duration>,
56 num_unpruned_validators: Option<usize>,
57 authority_overload_config: Option<AuthorityOverloadConfig>,
58 execution_cache_config: Option<ExecutionCacheConfig>,
59 data_ingestion_dir: Option<PathBuf>,
60 fullnode_run_with_range: Option<RunWithRange>,
61 fullnode_policy_config: Option<PolicyConfig>,
62 fullnode_fw_config: Option<RemoteFirewallConfig>,
63 max_submit_position: Option<usize>,
64 submit_delay_step_override_millis: Option<u64>,
65 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig,
66 disable_fullnode_pruning: bool,
67 state_sync_config: Option<sui_config::p2p::StateSyncConfig>,
68 #[cfg(msim)]
69 execution_time_observer_config: Option<ExecutionTimeObserverConfig>,
70}
71
72impl SwarmBuilder {
73 #[allow(clippy::new_without_default)]
74 pub fn new() -> Self {
75 Self {
76 rng: OsRng,
77 dir: None,
78 committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
79 genesis_config: None,
80 network_config: None,
81 chain_override: None,
82 additional_objects: vec![],
83 fullnode_count: 0,
84 fullnode_rpc_port: None,
85 fullnode_rpc_addr: None,
86 fullnode_rpc_config: None,
87 supported_protocol_versions_config: ProtocolVersionsConfig::Default,
88 fullnode_supported_protocol_versions_config: None,
89 db_checkpoint_config: DBCheckpointConfig::default(),
90 jwk_fetch_interval: None,
91 num_unpruned_validators: None,
92 authority_overload_config: None,
93 execution_cache_config: None,
94 data_ingestion_dir: None,
95 fullnode_run_with_range: None,
96 fullnode_policy_config: None,
97 fullnode_fw_config: None,
98 max_submit_position: None,
99 submit_delay_step_override_millis: None,
100 global_state_hash_v2_enabled_config: GlobalStateHashV2EnabledConfig::Global(true),
101 disable_fullnode_pruning: false,
102 state_sync_config: None,
103 #[cfg(msim)]
104 execution_time_observer_config: None,
105 }
106 }
107}
108
109impl<R> SwarmBuilder<R> {
110 pub fn rng<N: rand::RngCore + rand::CryptoRng>(self, rng: N) -> SwarmBuilder<N> {
111 SwarmBuilder {
112 rng,
113 dir: self.dir,
114 committee: self.committee,
115 genesis_config: self.genesis_config,
116 network_config: self.network_config,
117 chain_override: self.chain_override,
118 additional_objects: self.additional_objects,
119 fullnode_count: self.fullnode_count,
120 fullnode_rpc_port: self.fullnode_rpc_port,
121 fullnode_rpc_addr: self.fullnode_rpc_addr,
122 fullnode_rpc_config: self.fullnode_rpc_config.clone(),
123 supported_protocol_versions_config: self.supported_protocol_versions_config,
124 fullnode_supported_protocol_versions_config: self
125 .fullnode_supported_protocol_versions_config,
126 db_checkpoint_config: self.db_checkpoint_config,
127 jwk_fetch_interval: self.jwk_fetch_interval,
128 num_unpruned_validators: self.num_unpruned_validators,
129 authority_overload_config: self.authority_overload_config,
130 execution_cache_config: self.execution_cache_config,
131 data_ingestion_dir: self.data_ingestion_dir,
132 fullnode_run_with_range: self.fullnode_run_with_range,
133 fullnode_policy_config: self.fullnode_policy_config,
134 fullnode_fw_config: self.fullnode_fw_config,
135 max_submit_position: self.max_submit_position,
136 submit_delay_step_override_millis: self.submit_delay_step_override_millis,
137 global_state_hash_v2_enabled_config: self.global_state_hash_v2_enabled_config,
138 disable_fullnode_pruning: self.disable_fullnode_pruning,
139 state_sync_config: self.state_sync_config,
140 #[cfg(msim)]
141 execution_time_observer_config: self.execution_time_observer_config,
142 }
143 }
144
145 pub fn dir<P: Into<PathBuf>>(mut self, dir: P) -> Self {
151 self.dir = Some(dir.into());
152 self
153 }
154
155 pub fn committee_size(mut self, committee_size: NonZeroUsize) -> Self {
159 self.committee = CommitteeConfig::Size(committee_size);
160 self
161 }
162
163 pub fn with_validators(mut self, validators: Vec<ValidatorGenesisConfig>) -> Self {
164 self.committee = CommitteeConfig::Validators(validators);
165 self
166 }
167
168 pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
169 assert!(self.network_config.is_none() && self.genesis_config.is_none());
170 self.genesis_config = Some(genesis_config);
171 self
172 }
173
174 pub fn with_chain_override(mut self, chain: Chain) -> Self {
175 assert!(self.chain_override.is_none());
176 self.chain_override = Some(chain);
177 self
178 }
179
180 pub fn with_num_unpruned_validators(mut self, n: usize) -> Self {
181 assert!(self.network_config.is_none());
182 self.num_unpruned_validators = Some(n);
183 self
184 }
185
186 pub fn with_jwk_fetch_interval(mut self, i: Duration) -> Self {
187 self.jwk_fetch_interval = Some(i);
188 self
189 }
190
191 pub fn with_network_config(mut self, network_config: NetworkConfig) -> Self {
192 assert!(self.network_config.is_none() && self.genesis_config.is_none());
193 self.network_config = Some(network_config);
194 self
195 }
196
197 pub fn with_accounts(mut self, accounts: Vec<AccountConfig>) -> Self {
198 self.get_or_init_genesis_config().accounts = accounts;
199 self
200 }
201
202 pub fn with_objects<I: IntoIterator<Item = Object>>(mut self, objects: I) -> Self {
203 self.additional_objects.extend(objects);
204 self
205 }
206
207 pub fn with_fullnode_count(mut self, fullnode_count: usize) -> Self {
208 self.fullnode_count = fullnode_count;
209 self
210 }
211
212 pub fn with_fullnode_rpc_port(mut self, fullnode_rpc_port: u16) -> Self {
213 assert!(self.fullnode_rpc_addr.is_none());
214 self.fullnode_rpc_port = Some(fullnode_rpc_port);
215 self
216 }
217
218 pub fn with_fullnode_rpc_addr(mut self, fullnode_rpc_addr: SocketAddr) -> Self {
219 assert!(self.fullnode_rpc_port.is_none());
220 self.fullnode_rpc_addr = Some(fullnode_rpc_addr);
221 self
222 }
223
224 pub fn with_fullnode_rpc_config(mut self, fullnode_rpc_config: sui_config::RpcConfig) -> Self {
225 self.fullnode_rpc_config = Some(fullnode_rpc_config);
226 self
227 }
228
229 pub fn with_epoch_duration_ms(mut self, epoch_duration_ms: u64) -> Self {
230 self.get_or_init_genesis_config()
231 .parameters
232 .epoch_duration_ms = epoch_duration_ms;
233 self
234 }
235
236 pub fn with_protocol_version(mut self, v: ProtocolVersion) -> Self {
237 self.get_or_init_genesis_config()
238 .parameters
239 .protocol_version = v;
240 self
241 }
242
243 pub fn with_supported_protocol_versions(mut self, c: SupportedProtocolVersions) -> Self {
244 self.supported_protocol_versions_config = ProtocolVersionsConfig::Global(c);
245 self
246 }
247
248 pub fn with_supported_protocol_version_callback(
249 mut self,
250 func: SupportedProtocolVersionsCallback,
251 ) -> Self {
252 self.supported_protocol_versions_config = ProtocolVersionsConfig::PerValidator(func);
253 self
254 }
255
256 pub fn with_supported_protocol_versions_config(mut self, c: ProtocolVersionsConfig) -> Self {
257 self.supported_protocol_versions_config = c;
258 self
259 }
260
261 pub fn with_global_state_hash_v2_enabled_config(
262 mut self,
263 c: GlobalStateHashV2EnabledConfig,
264 ) -> Self {
265 self.global_state_hash_v2_enabled_config = c;
266 self
267 }
268
269 #[cfg(msim)]
270 pub fn with_execution_time_observer_config(mut self, c: ExecutionTimeObserverConfig) -> Self {
271 self.execution_time_observer_config = Some(c);
272 self
273 }
274
275 pub fn with_fullnode_supported_protocol_versions_config(
276 mut self,
277 c: ProtocolVersionsConfig,
278 ) -> Self {
279 self.fullnode_supported_protocol_versions_config = Some(c);
280 self
281 }
282
283 pub fn with_db_checkpoint_config(mut self, db_checkpoint_config: DBCheckpointConfig) -> Self {
284 self.db_checkpoint_config = db_checkpoint_config;
285 self
286 }
287
288 pub fn with_authority_overload_config(
289 mut self,
290 authority_overload_config: AuthorityOverloadConfig,
291 ) -> Self {
292 assert!(self.network_config.is_none());
293 self.authority_overload_config = Some(authority_overload_config);
294 self
295 }
296
297 pub fn with_execution_cache_config(
298 mut self,
299 execution_cache_config: ExecutionCacheConfig,
300 ) -> Self {
301 self.execution_cache_config = Some(execution_cache_config);
302 self
303 }
304
305 pub fn with_data_ingestion_dir(mut self, path: PathBuf) -> Self {
306 self.data_ingestion_dir = Some(path);
307 self
308 }
309
310 pub fn with_state_sync_config(mut self, config: sui_config::p2p::StateSyncConfig) -> Self {
311 self.state_sync_config = Some(config);
312 self
313 }
314
315 pub fn with_fullnode_run_with_range(mut self, run_with_range: Option<RunWithRange>) -> Self {
316 if let Some(run_with_range) = run_with_range {
317 self.fullnode_run_with_range = Some(run_with_range);
318 }
319 self
320 }
321
322 pub fn with_fullnode_policy_config(mut self, config: Option<PolicyConfig>) -> Self {
323 self.fullnode_policy_config = config;
324 self
325 }
326
327 pub fn with_fullnode_fw_config(mut self, config: Option<RemoteFirewallConfig>) -> Self {
328 self.fullnode_fw_config = config;
329 self
330 }
331
332 fn get_or_init_genesis_config(&mut self) -> &mut GenesisConfig {
333 if self.genesis_config.is_none() {
334 assert!(self.network_config.is_none());
335 self.genesis_config = Some(GenesisConfig::for_local_testing());
336 }
337 self.genesis_config.as_mut().unwrap()
338 }
339
340 pub fn with_max_submit_position(mut self, max_submit_position: usize) -> Self {
341 self.max_submit_position = Some(max_submit_position);
342 self
343 }
344
345 pub fn with_disable_fullnode_pruning(mut self) -> Self {
346 self.disable_fullnode_pruning = true;
347 self
348 }
349
350 pub fn with_submit_delay_step_override_millis(
351 mut self,
352 submit_delay_step_override_millis: u64,
353 ) -> Self {
354 self.submit_delay_step_override_millis = Some(submit_delay_step_override_millis);
355 self
356 }
357}
358
359impl<R: rand::RngCore + rand::CryptoRng> SwarmBuilder<R> {
360 pub fn build(self) -> Swarm {
362 let dir = if let Some(dir) = self.dir {
363 SwarmDirectory::Persistent(dir)
364 } else {
365 SwarmDirectory::new_temporary()
366 };
367
368 let ingest_data = self.data_ingestion_dir.clone();
369
370 let network_config = self.network_config.unwrap_or_else(|| {
371 let mut config_builder = ConfigBuilder::new(dir.as_ref());
372
373 if let Some(genesis_config) = self.genesis_config {
374 config_builder = config_builder.with_genesis_config(genesis_config);
375 }
376
377 if let Some(chain_override) = self.chain_override {
378 config_builder = config_builder.with_chain_override(chain_override);
379 }
380
381 if let Some(num_unpruned_validators) = self.num_unpruned_validators {
382 config_builder =
383 config_builder.with_num_unpruned_validators(num_unpruned_validators);
384 }
385
386 if let Some(jwk_fetch_interval) = self.jwk_fetch_interval {
387 config_builder = config_builder.with_jwk_fetch_interval(jwk_fetch_interval);
388 }
389
390 if let Some(authority_overload_config) = self.authority_overload_config {
391 config_builder =
392 config_builder.with_authority_overload_config(authority_overload_config);
393 }
394
395 if let Some(execution_cache_config) = self.execution_cache_config {
396 config_builder = config_builder.with_execution_cache_config(execution_cache_config);
397 }
398
399 if let Some(path) = self.data_ingestion_dir {
400 config_builder = config_builder.with_data_ingestion_dir(path);
401 }
402
403 if let Some(max_submit_position) = self.max_submit_position {
404 config_builder = config_builder.with_max_submit_position(max_submit_position);
405 }
406
407 if let Some(submit_delay_step_override_millis) = self.submit_delay_step_override_millis
408 {
409 config_builder = config_builder
410 .with_submit_delay_step_override_millis(submit_delay_step_override_millis);
411 }
412
413 #[allow(unused_mut)]
414 let mut final_builder = config_builder
415 .committee(self.committee)
416 .rng(self.rng)
417 .with_objects(self.additional_objects)
418 .with_supported_protocol_versions_config(
419 self.supported_protocol_versions_config.clone(),
420 )
421 .with_global_state_hash_v2_enabled_config(
422 self.global_state_hash_v2_enabled_config.clone(),
423 );
424
425 if let Some(state_sync_config) = self.state_sync_config.clone() {
426 final_builder = final_builder.with_state_sync_config(state_sync_config);
427 }
428
429 #[cfg(msim)]
430 if let Some(execution_time_observer_config) = self.execution_time_observer_config {
431 final_builder = final_builder
432 .with_execution_time_observer_config(execution_time_observer_config);
433 }
434
435 final_builder.build()
436 });
437
438 let mut nodes: HashMap<_, _> = network_config
439 .validator_configs()
440 .iter()
441 .map(|config| {
442 info!(
443 "SwarmBuilder configuring validator with name {}",
444 config.protocol_public_key()
445 );
446 (config.protocol_public_key(), Node::new(config.to_owned()))
447 })
448 .collect();
449
450 let mut fullnode_config_builder = FullnodeConfigBuilder::new()
451 .with_config_directory(dir.as_ref().into())
452 .with_db_checkpoint_config(self.db_checkpoint_config.clone())
453 .with_run_with_range(self.fullnode_run_with_range)
454 .with_policy_config(self.fullnode_policy_config)
455 .with_data_ingestion_dir(ingest_data)
456 .with_fw_config(self.fullnode_fw_config)
457 .with_disable_pruning(self.disable_fullnode_pruning);
458
459 if let Some(state_sync_config) = self.state_sync_config.clone() {
460 fullnode_config_builder =
461 fullnode_config_builder.with_state_sync_config(state_sync_config);
462 }
463
464 if let Some(chain) = self.chain_override {
465 fullnode_config_builder = fullnode_config_builder.with_chain_override(chain);
466 }
467
468 if let Some(spvc) = &self.fullnode_supported_protocol_versions_config {
469 let supported_versions = match spvc {
470 ProtocolVersionsConfig::Default => SupportedProtocolVersions::SYSTEM_DEFAULT,
471 ProtocolVersionsConfig::Global(v) => *v,
472 ProtocolVersionsConfig::PerValidator(func) => func(0, None),
473 };
474 fullnode_config_builder =
475 fullnode_config_builder.with_supported_protocol_versions(supported_versions);
476 }
477
478 if self.fullnode_count > 0 {
479 (0..self.fullnode_count).for_each(|idx| {
480 let mut builder = fullnode_config_builder.clone();
481 if idx == 0 {
482 if let Some(rpc_addr) = self.fullnode_rpc_addr {
485 builder = builder.with_rpc_addr(rpc_addr);
486 }
487 if let Some(rpc_port) = self.fullnode_rpc_port {
488 builder = builder.with_rpc_port(rpc_port);
489 }
490 if let Some(rpc_config) = &self.fullnode_rpc_config {
491 builder = builder.with_rpc_config(rpc_config.clone());
492 }
493 }
494 let config = builder.build(&mut OsRng, &network_config);
495 info!(
496 "SwarmBuilder configuring full node with name {}",
497 config.protocol_public_key()
498 );
499 nodes.insert(config.protocol_public_key(), Node::new(config));
500 });
501 }
502 Swarm {
503 dir,
504 network_config,
505 nodes,
506 fullnode_config_builder,
507 }
508 }
509}
510
511#[derive(Debug)]
513pub struct Swarm {
514 dir: SwarmDirectory,
515 network_config: NetworkConfig,
516 nodes: HashMap<AuthorityName, Node>,
517 fullnode_config_builder: FullnodeConfigBuilder,
519}
520
521impl Drop for Swarm {
522 fn drop(&mut self) {
523 self.nodes_iter_mut().for_each(|node| node.stop());
524 }
525}
526
527impl Swarm {
528 fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
529 self.nodes.values_mut()
530 }
531
532 pub fn builder() -> SwarmBuilder {
534 SwarmBuilder::new()
535 }
536
537 pub async fn launch(&mut self) -> Result<()> {
539 try_join_all(self.nodes_iter_mut().map(|node| node.start())).await?;
540 tracing::info!("Successfully launched Swarm");
541 Ok(())
542 }
543
544 pub fn dir(&self) -> &Path {
546 self.dir.as_ref()
547 }
548
549 pub fn config(&self) -> &NetworkConfig {
551 &self.network_config
552 }
553
554 pub fn config_mut(&mut self) -> &mut NetworkConfig {
557 &mut self.network_config
558 }
559
560 pub fn all_nodes(&self) -> impl Iterator<Item = &Node> {
561 self.nodes.values()
562 }
563
564 pub fn node(&self, name: &AuthorityName) -> Option<&Node> {
565 self.nodes.get(name)
566 }
567
568 pub fn node_mut(&mut self, name: &AuthorityName) -> Option<&mut Node> {
569 self.nodes.get_mut(name)
570 }
571
572 pub fn validator_nodes(&self) -> impl Iterator<Item = &Node> {
576 self.nodes
577 .values()
578 .filter(|node| node.config().consensus_config.is_some())
579 }
580
581 pub fn validator_node_handles(&self) -> Vec<SuiNodeHandle> {
582 self.validator_nodes()
583 .map(|node| node.get_node_handle().unwrap())
584 .collect()
585 }
586
587 pub fn active_validators(&self) -> impl Iterator<Item = &Node> {
589 self.validator_nodes().filter(|node| {
590 node.get_node_handle().is_some_and(|handle| {
591 let state = handle.state();
592 state.is_validator(&state.epoch_store_for_testing())
593 })
594 })
595 }
596
597 pub fn fullnodes(&self) -> impl Iterator<Item = &Node> {
599 self.nodes
600 .values()
601 .filter(|node| node.config().consensus_config.is_none())
602 }
603
604 pub async fn spawn_new_node(&mut self, config: NodeConfig) -> SuiNodeHandle {
605 let name = config.protocol_public_key();
606 let node = Node::new(config);
607 node.start().await.unwrap();
608 let handle = node.get_node_handle().unwrap();
609 self.nodes.insert(name, node);
610 handle
611 }
612
613 pub fn get_fullnode_config_builder(&self) -> FullnodeConfigBuilder {
614 self.fullnode_config_builder.clone()
615 }
616}
617
618#[derive(Debug)]
619enum SwarmDirectory {
620 Persistent(PathBuf),
621 Temporary(TempDir),
622}
623
624impl SwarmDirectory {
625 fn new_temporary() -> Self {
626 SwarmDirectory::Temporary(nondeterministic!(TempDir::new().unwrap()))
627 }
628}
629
630impl ops::Deref for SwarmDirectory {
631 type Target = Path;
632
633 fn deref(&self) -> &Self::Target {
634 match self {
635 SwarmDirectory::Persistent(dir) => dir.deref(),
636 SwarmDirectory::Temporary(dir) => dir.path(),
637 }
638 }
639}
640
641impl AsRef<Path> for SwarmDirectory {
642 fn as_ref(&self) -> &Path {
643 match self {
644 SwarmDirectory::Persistent(dir) => dir.as_ref(),
645 SwarmDirectory::Temporary(dir) => dir.as_ref(),
646 }
647 }
648}
649
650#[cfg(test)]
651mod test {
652 use super::Swarm;
653 use std::num::NonZeroUsize;
654
655 #[tokio::test]
656 async fn launch() {
657 telemetry_subscribers::init_for_testing();
658 let mut swarm = Swarm::builder()
659 .committee_size(NonZeroUsize::new(4).unwrap())
660 .with_fullnode_count(1)
661 .build();
662
663 swarm.launch().await.unwrap();
664
665 for validator in swarm.validator_nodes() {
666 validator.health_check(true).await.unwrap();
667 }
668
669 for fullnode in swarm.fullnodes() {
670 fullnode.health_check(false).await.unwrap();
671 }
672
673 println!("hello");
674 }
675}