sui_cluster_test/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::config::{ClusterTestOpt, Env};
5use async_trait::async_trait;
6use std::net::SocketAddr;
7use std::path::Path;
8use sui_config::Config;
9use sui_config::local_ip_utils::get_available_port;
10use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG};
11use sui_graphql_rpc::config::{ConnectionConfig, ServiceConfig};
12use sui_graphql_rpc::test_infra::cluster::start_graphql_server_with_fn_rpc;
13use sui_indexer::test_utils::{
14    start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing,
15};
16use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
17use sui_pg_db::temp::TempDb;
18use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
19use sui_sdk::wallet_context::WalletContext;
20use sui_swarm::memory::Swarm;
21use sui_swarm_config::genesis_config::GenesisConfig;
22use sui_swarm_config::network_config::NetworkConfig;
23use sui_types::base_types::SuiAddress;
24use sui_types::crypto::KeypairTraits;
25use sui_types::crypto::SuiKeyPair;
26use sui_types::crypto::{AccountKeyPair, get_key_pair};
27use tempfile::tempdir;
28use test_cluster::{TestCluster, TestClusterBuilder};
29use tracing::info;
30
31const DEVNET_FAUCET_ADDR: &str = "https://faucet.devnet.sui.io:443";
32const STAGING_FAUCET_ADDR: &str = "https://faucet.staging.sui.io:443";
33const CONTINUOUS_FAUCET_ADDR: &str = "https://faucet.ci.sui.io:443";
34const CONTINUOUS_NOMAD_FAUCET_ADDR: &str = "https://faucet.nomad.ci.sui.io:443";
35const TESTNET_FAUCET_ADDR: &str = "https://faucet.testnet.sui.io:443";
36const DEVNET_FULLNODE_ADDR: &str = "https://rpc.devnet.sui.io:443";
37const STAGING_FULLNODE_ADDR: &str = "https://fullnode.staging.sui.io:443";
38const CONTINUOUS_FULLNODE_ADDR: &str = "https://fullnode.ci.sui.io:443";
39const CONTINUOUS_NOMAD_FULLNODE_ADDR: &str = "https://fullnode.nomad.ci.sui.io:443";
40const TESTNET_FULLNODE_ADDR: &str = "https://fullnode.testnet.sui.io:443";
41
42pub struct ClusterFactory;
43
44impl ClusterFactory {
45    pub async fn start(
46        options: &ClusterTestOpt,
47    ) -> Result<Box<dyn Cluster + Sync + Send>, anyhow::Error> {
48        Ok(match &options.env {
49            Env::NewLocal => Box::new(LocalNewCluster::start(options).await?),
50            _ => Box::new(RemoteRunningCluster::start(options).await?),
51        })
52    }
53}
54
55/// Cluster Abstraction
56#[async_trait]
57pub trait Cluster {
58    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error>
59    where
60        Self: Sized;
61
62    fn fullnode_url(&self) -> &str;
63    fn user_key(&self) -> AccountKeyPair;
64    fn indexer_url(&self) -> &Option<String>;
65
66    /// Returns faucet url in a remote cluster.
67    fn remote_faucet_url(&self) -> Option<&str>;
68
69    /// Returns faucet key in a local cluster.
70    fn local_faucet_key(&self) -> Option<&AccountKeyPair>;
71
72    /// Place to put config for the wallet, and any locally running services.
73    fn config_directory(&self) -> &Path;
74}
75
76/// Represents an up and running cluster deployed remotely.
77pub struct RemoteRunningCluster {
78    fullnode_url: String,
79    faucet_url: String,
80    config_directory: tempfile::TempDir,
81}
82
83#[async_trait]
84impl Cluster for RemoteRunningCluster {
85    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
86        let (fullnode_url, faucet_url) = match options.env {
87            Env::Devnet => (
88                String::from(DEVNET_FULLNODE_ADDR),
89                String::from(DEVNET_FAUCET_ADDR),
90            ),
91            Env::Staging => (
92                String::from(STAGING_FULLNODE_ADDR),
93                String::from(STAGING_FAUCET_ADDR),
94            ),
95            Env::Ci => (
96                String::from(CONTINUOUS_FULLNODE_ADDR),
97                String::from(CONTINUOUS_FAUCET_ADDR),
98            ),
99            Env::CiNomad => (
100                String::from(CONTINUOUS_NOMAD_FULLNODE_ADDR),
101                String::from(CONTINUOUS_NOMAD_FAUCET_ADDR),
102            ),
103            Env::Testnet => (
104                String::from(TESTNET_FULLNODE_ADDR),
105                String::from(TESTNET_FAUCET_ADDR),
106            ),
107            Env::CustomRemote => (
108                options
109                    .fullnode_address
110                    .clone()
111                    .expect("Expect 'fullnode_address' for Env::Custom"),
112                options
113                    .faucet_address
114                    .clone()
115                    .expect("Expect 'faucet_address' for Env::Custom"),
116            ),
117            Env::NewLocal => unreachable!("NewLocal shouldn't use RemoteRunningCluster"),
118        };
119
120        // TODO: test connectivity before proceeding?
121
122        Ok(Self {
123            fullnode_url,
124            faucet_url,
125            config_directory: tempfile::tempdir()?,
126        })
127    }
128
129    fn fullnode_url(&self) -> &str {
130        &self.fullnode_url
131    }
132
133    fn indexer_url(&self) -> &Option<String> {
134        &None
135    }
136
137    fn user_key(&self) -> AccountKeyPair {
138        get_key_pair().1
139    }
140
141    fn remote_faucet_url(&self) -> Option<&str> {
142        Some(&self.faucet_url)
143    }
144
145    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
146        None
147    }
148
149    fn config_directory(&self) -> &Path {
150        self.config_directory.path()
151    }
152}
153
154/// Represents a local Cluster which starts per cluster test run.
155pub struct LocalNewCluster {
156    test_cluster: TestCluster,
157    fullnode_url: String,
158    indexer_url: Option<String>,
159    faucet_key: AccountKeyPair,
160    config_directory: tempfile::TempDir,
161    #[allow(unused)]
162    data_ingestion_path: tempfile::TempDir,
163    #[allow(unused)]
164    cancellation_tokens: Vec<tokio_util::sync::DropGuard>,
165    #[allow(unused)]
166    database: Option<TempDb>,
167    graphql_url: Option<String>,
168}
169
170impl LocalNewCluster {
171    #[allow(unused)]
172    pub fn swarm(&self) -> &Swarm {
173        &self.test_cluster.swarm
174    }
175
176    pub fn graphql_url(&self) -> &Option<String> {
177        &self.graphql_url
178    }
179}
180
181#[async_trait]
182impl Cluster for LocalNewCluster {
183    async fn start(options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
184        let data_ingestion_path = tempdir()?;
185
186        let mut cluster_builder = TestClusterBuilder::new()
187            .enable_fullnode_events()
188            .with_data_ingestion_dir(data_ingestion_path.path().to_path_buf());
189
190        // Check if we already have a config directory that is passed
191        if let Some(config_dir) = options.config_dir.clone() {
192            assert!(options.epoch_duration_ms.is_none());
193            // Load the config of the Sui authority.
194            let network_config_path = config_dir.join(SUI_NETWORK_CONFIG);
195            let network_config: NetworkConfig = PersistedConfig::read(&network_config_path)
196                .map_err(|err| {
197                    err.context(format!(
198                        "Cannot open Sui network config file at {:?}",
199                        network_config_path
200                    ))
201                })?;
202
203            cluster_builder = cluster_builder.set_network_config(network_config);
204            cluster_builder = cluster_builder.with_config_dir(config_dir);
205        } else {
206            // Let the faucet account hold 1000 gas objects on genesis
207            let genesis_config = GenesisConfig::custom_genesis(1, 100);
208            // Custom genesis should be build here where we add the extra accounts
209            cluster_builder = cluster_builder.set_genesis_config(genesis_config);
210
211            if let Some(epoch_duration_ms) = options.epoch_duration_ms {
212                cluster_builder = cluster_builder.with_epoch_duration_ms(epoch_duration_ms);
213            }
214        }
215
216        let mut test_cluster = cluster_builder.build().await;
217
218        // Use the wealthy account for faucet
219        let faucet_key = test_cluster.swarm.config_mut().account_keys.swap_remove(0);
220        let faucet_address = SuiAddress::from(faucet_key.public());
221        info!(?faucet_address, "faucet_address");
222
223        // This cluster has fullnode handle, safe to unwrap
224        let fullnode_url = test_cluster.fullnode_handle.rpc_url.clone();
225
226        // TODO: with TestCluster supporting indexer backed rpc as well, we can remove the indexer related logic here.
227        let mut cancellation_tokens = vec![];
228        let (database, indexer_url, graphql_url) = if options.with_indexer_and_graphql {
229            let database = TempDb::new()?;
230            let pg_address = database.database().url().as_str().to_owned();
231            let indexer_jsonrpc_address = format!("127.0.0.1:{}", get_available_port("127.0.0.1"));
232            let graphql_address = format!("127.0.0.1:{}", get_available_port("127.0.0.1"));
233            let graphql_url = format!("http://{graphql_address}");
234
235            let (_, _, writer_token) = start_indexer_writer_for_testing(
236                pg_address.clone(),
237                None,
238                None,
239                Some(data_ingestion_path.path().to_path_buf()),
240                None, /* cancel */
241                None, /* start_checkpoint */
242                None, /* end_checkpoint */
243            )
244            .await;
245            cancellation_tokens.push(writer_token.drop_guard());
246
247            // Start indexer jsonrpc service
248            let (_, reader_token) = start_indexer_jsonrpc_for_testing(
249                pg_address.clone(),
250                fullnode_url.clone(),
251                indexer_jsonrpc_address.clone(),
252                None, /* cancel */
253            )
254            .await;
255            cancellation_tokens.push(reader_token.drop_guard());
256
257            // Start the graphql service
258            let graphql_address = graphql_address.parse::<SocketAddr>()?;
259            let graphql_connection_config = ConnectionConfig {
260                port: graphql_address.port(),
261                host: graphql_address.ip().to_string(),
262                db_url: pg_address,
263                ..Default::default()
264            };
265
266            start_graphql_server_with_fn_rpc(
267                graphql_connection_config.clone(),
268                Some(fullnode_url.clone()),
269                /* cancellation_token */ None,
270                ServiceConfig::test_defaults(),
271            )
272            .await;
273
274            (
275                Some(database),
276                Some(indexer_jsonrpc_address),
277                Some(graphql_url),
278            )
279        } else {
280            (None, None, None)
281        };
282
283        // Let nodes connect to one another
284        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
285
286        // TODO: test connectivity before proceeding?
287        Ok(Self {
288            test_cluster,
289            fullnode_url,
290            faucet_key,
291            config_directory: tempfile::tempdir()?,
292            data_ingestion_path,
293            indexer_url,
294            cancellation_tokens,
295            database,
296            graphql_url,
297        })
298    }
299
300    fn fullnode_url(&self) -> &str {
301        &self.fullnode_url
302    }
303
304    fn indexer_url(&self) -> &Option<String> {
305        &self.indexer_url
306    }
307
308    fn user_key(&self) -> AccountKeyPair {
309        get_key_pair().1
310    }
311
312    fn remote_faucet_url(&self) -> Option<&str> {
313        None
314    }
315
316    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
317        Some(&self.faucet_key)
318    }
319
320    fn config_directory(&self) -> &Path {
321        self.config_directory.path()
322    }
323}
324
325// Make linter happy
326#[async_trait]
327impl Cluster for Box<dyn Cluster + Send + Sync> {
328    async fn start(_options: &ClusterTestOpt) -> Result<Self, anyhow::Error> {
329        unreachable!(
330            "If we already have a boxed Cluster trait object we wouldn't have to call this function"
331        );
332    }
333    fn fullnode_url(&self) -> &str {
334        (**self).fullnode_url()
335    }
336    fn indexer_url(&self) -> &Option<String> {
337        (**self).indexer_url()
338    }
339
340    fn user_key(&self) -> AccountKeyPair {
341        (**self).user_key()
342    }
343
344    fn remote_faucet_url(&self) -> Option<&str> {
345        (**self).remote_faucet_url()
346    }
347
348    fn local_faucet_key(&self) -> Option<&AccountKeyPair> {
349        (**self).local_faucet_key()
350    }
351
352    fn config_directory(&self) -> &Path {
353        (**self).config_directory()
354    }
355}
356
357pub async fn new_wallet_context_from_cluster(
358    cluster: &(dyn Cluster + Sync + Send),
359    key_pair: AccountKeyPair,
360) -> WalletContext {
361    let config_dir = cluster.config_directory();
362    let wallet_config_path = config_dir.join("client.yaml");
363    let fullnode_url = cluster.fullnode_url();
364    info!("Use RPC: {}", &fullnode_url);
365    let keystore_path = config_dir.join(SUI_KEYSTORE_FILENAME);
366    let mut keystore = Keystore::from(FileBasedKeystore::load_or_create(&keystore_path).unwrap());
367    let address: SuiAddress = key_pair.public().into();
368    keystore
369        .import(None, SuiKeyPair::Ed25519(key_pair))
370        .await
371        .unwrap();
372    SuiClientConfig {
373        keystore,
374        external_keys: None,
375        envs: vec![SuiEnv {
376            alias: "localnet".to_string(),
377            rpc: fullnode_url.into(),
378            ws: None,
379            basic_auth: None,
380            chain_id: None,
381        }],
382        active_address: Some(address),
383        active_env: Some("localnet".to_string()),
384    }
385    .persisted(&wallet_config_path)
386    .save()
387    .unwrap();
388
389    info!(
390        "Initialize wallet from config path: {:?}",
391        wallet_config_path
392    );
393
394    WalletContext::new(&wallet_config_path).unwrap_or_else(|e| {
395        panic!(
396            "Failed to init wallet context from path {:?}, error: {e}",
397            wallet_config_path
398        )
399    })
400}