sui_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::ConnectionConfig;
5use crate::config::ServerConfig;
6use crate::config::ServiceConfig;
7use crate::config::Version;
8use crate::server::graphiql_server::start_graphiql_server;
9use rand::rngs::StdRng;
10use rand::SeedableRng;
11use simulacrum::AdvanceEpochConfig;
12use simulacrum::Simulacrum;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use sui_graphql_rpc_client::simple_client::SimpleClient;
18pub use sui_indexer::config::RetentionConfig;
19pub use sui_indexer::config::SnapshotLagConfig;
20use sui_indexer::errors::IndexerError;
21use sui_indexer::store::PgIndexerStore;
22use sui_indexer::test_utils::start_indexer_writer_for_testing;
23use sui_pg_db::temp::{get_available_port, TempDb};
24use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
25use sui_types::storage::RpcStateReader;
26use tempfile::tempdir;
27use tempfile::TempDir;
28use test_cluster::TestCluster;
29use test_cluster::TestClusterBuilder;
30use tokio::join;
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33use tracing::info;
34
35const VALIDATOR_COUNT: usize = 4;
36/// Set default epoch duration to 300s. This high value is to turn the TestCluster into a lockstep
37/// network of sorts. Tests should call `trigger_reconfiguration` to advance the network's epoch.
38const EPOCH_DURATION_MS: u64 = 300_000;
39
40const ACCOUNT_NUM: usize = 20;
41const GAS_OBJECT_COUNT: usize = 3;
42
43pub struct ExecutorCluster {
44    pub executor_server_handle: JoinHandle<()>,
45    pub indexer_store: PgIndexerStore,
46    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
47    pub graphql_server_join_handle: JoinHandle<()>,
48    pub graphql_client: SimpleClient,
49    pub snapshot_config: SnapshotLagConfig,
50    pub graphql_connection_config: ConnectionConfig,
51    pub cancellation_token: CancellationToken,
52    #[allow(unused)]
53    database: TempDb,
54    tempdir: Option<TempDir>,
55}
56
57pub struct Cluster {
58    pub network: NetworkCluster,
59    pub graphql_server_join_handle: JoinHandle<()>,
60    pub graphql_client: SimpleClient,
61}
62
63pub struct NetworkCluster {
64    pub validator_fullnode_handle: TestCluster,
65    pub indexer_store: PgIndexerStore,
66    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
67    pub cancellation_token: CancellationToken,
68    #[allow(unused)]
69    data_ingestion_path: TempDir,
70    #[allow(unused)]
71    database: TempDb,
72    pub graphql_connection_config: ConnectionConfig,
73}
74
75/// Starts a validator, fullnode, indexer, and graphql service for testing.
76pub async fn start_cluster(service_config: ServiceConfig) -> Cluster {
77    let network_cluster = start_network_cluster().await;
78    let graphql_connection_config = network_cluster.graphql_connection_config.clone();
79
80    let fn_rpc_url: String = network_cluster
81        .validator_fullnode_handle
82        .rpc_url()
83        .to_string();
84
85    let server_url = format!(
86        "http://{}:{}/",
87        graphql_connection_config.host, graphql_connection_config.port
88    );
89
90    let graphql_server_handle = start_graphql_server_with_fn_rpc(
91        graphql_connection_config,
92        Some(fn_rpc_url),
93        Some(network_cluster.cancellation_token.clone()),
94        service_config,
95    )
96    .await;
97
98    // Starts graphql client
99    let client = SimpleClient::new(server_url);
100    wait_for_graphql_server(&client).await;
101
102    Cluster {
103        network: network_cluster,
104        graphql_server_join_handle: graphql_server_handle,
105        graphql_client: client,
106    }
107}
108
109/// Starts a validator, fullnode, indexer (using data ingestion). Re-using GraphQL's ConnectionConfig for convenience.
110/// This does not start any GraphQL services, only the network cluster. You can start a GraphQL service
111/// calling `start_graphql_server`.
112pub async fn start_network_cluster() -> NetworkCluster {
113    let database = TempDb::new().unwrap();
114    let graphql_connection_config = ConnectionConfig {
115        port: get_available_port(),
116        host: "127.0.0.1".to_owned(),
117        db_url: database.database().url().as_str().to_owned(),
118        db_pool_size: 5,
119        prom_host: "127.0.0.1".to_owned(),
120        prom_port: get_available_port(),
121        skip_migration_consistency_check: false,
122    };
123    let data_ingestion_path = tempfile::tempdir().unwrap();
124    let db_url = graphql_connection_config.db_url.clone();
125    let cancellation_token = CancellationToken::new();
126
127    // Starts validator+fullnode
128    let val_fn = start_validator_with_fullnode(data_ingestion_path.path().to_path_buf()).await;
129
130    // Starts indexer
131    let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
132        db_url,
133        None,
134        None,
135        Some(data_ingestion_path.path().to_path_buf()),
136        Some(cancellation_token.clone()),
137        None, /* start_checkpoint */
138        None, /* end_checkpoint */
139    )
140    .await;
141
142    NetworkCluster {
143        validator_fullnode_handle: val_fn,
144        indexer_store: pg_store,
145        indexer_join_handle: pg_handle,
146        cancellation_token,
147        data_ingestion_path,
148        database,
149        graphql_connection_config,
150    }
151}
152
153/// Takes in a simulated instantiation of a Sui blockchain and builds a cluster around it. This
154/// cluster is typically used in e2e tests to emulate and test behaviors.
155pub async fn serve_executor(
156    executor: Arc<dyn RpcStateReader + Send + Sync>,
157    snapshot_config: Option<SnapshotLagConfig>,
158    retention_config: Option<RetentionConfig>,
159    data_ingestion_path: PathBuf,
160) -> ExecutorCluster {
161    let database = TempDb::new().unwrap();
162    let graphql_connection_config = ConnectionConfig {
163        port: get_available_port(),
164        host: "127.0.0.1".to_owned(),
165        db_url: database.database().url().as_str().to_owned(),
166        db_pool_size: 5,
167        prom_host: "127.0.0.1".to_owned(),
168        prom_port: get_available_port(),
169        skip_migration_consistency_check: false,
170    };
171    let db_url = graphql_connection_config.db_url.clone();
172    // Creates a cancellation token and adds this to the ExecutorCluster, so that we can send a
173    // cancellation token on cleanup
174    let cancellation_token = CancellationToken::new();
175
176    let executor_server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
177        .parse()
178        .unwrap();
179
180    info!("Starting executor server on {}", executor_server_url);
181
182    let executor_server_handle = tokio::spawn(async move {
183        sui_rpc_api::RpcService::new(executor)
184            .start_service(executor_server_url)
185            .await;
186    });
187
188    info!("spawned executor server");
189
190    let snapshot_config = snapshot_config.unwrap_or_default();
191
192    let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
193        db_url,
194        Some(snapshot_config.clone()),
195        retention_config,
196        Some(data_ingestion_path),
197        Some(cancellation_token.clone()),
198        None,
199        None,
200    )
201    .await;
202
203    // Starts graphql server
204    let graphql_server_handle = start_graphql_server(
205        graphql_connection_config.clone(),
206        cancellation_token.clone(),
207        ServiceConfig::test_defaults(),
208    )
209    .await;
210
211    let server_url = format!(
212        "http://{}:{}/",
213        graphql_connection_config.host, graphql_connection_config.port
214    );
215
216    // Starts graphql client
217    let client = SimpleClient::new(server_url);
218    wait_for_graphql_server(&client).await;
219
220    ExecutorCluster {
221        executor_server_handle,
222        indexer_store: pg_store,
223        indexer_join_handle: pg_handle,
224        graphql_server_join_handle: graphql_server_handle,
225        graphql_client: client,
226        snapshot_config,
227        graphql_connection_config,
228        cancellation_token,
229        database,
230        tempdir: None,
231    }
232}
233
234pub async fn prep_executor_cluster() -> ExecutorCluster {
235    let rng = StdRng::from_seed([12; 32]);
236    let data_ingestion_path = tempdir().unwrap();
237    let mut sim = Simulacrum::new_with_rng(rng);
238    sim.set_data_ingestion_path(data_ingestion_path.path().to_path_buf());
239
240    sim.create_checkpoint();
241    sim.create_checkpoint();
242    sim.create_checkpoint();
243    sim.advance_epoch(AdvanceEpochConfig {
244        create_random_state: true,
245        ..Default::default()
246    });
247    sim.create_checkpoint();
248    sim.advance_clock(
249        std::time::SystemTime::now()
250            .duration_since(std::time::SystemTime::UNIX_EPOCH)
251            .unwrap(),
252    );
253    sim.create_checkpoint();
254
255    let mut cluster = serve_executor(
256        Arc::new(sim),
257        None,
258        None,
259        data_ingestion_path.path().to_path_buf(),
260    )
261    .await;
262
263    cluster
264        .wait_for_checkpoint_catchup(6, Duration::from_secs(10))
265        .await;
266
267    cluster.tempdir = Some(data_ingestion_path);
268    cluster
269}
270
271pub async fn start_graphql_server(
272    graphql_connection_config: ConnectionConfig,
273    cancellation_token: CancellationToken,
274    service_config: ServiceConfig,
275) -> JoinHandle<()> {
276    start_graphql_server_with_fn_rpc(
277        graphql_connection_config,
278        None,
279        Some(cancellation_token),
280        service_config,
281    )
282    .await
283}
284
285pub async fn start_graphql_server_with_fn_rpc(
286    graphql_connection_config: ConnectionConfig,
287    fn_rpc_url: Option<String>,
288    cancellation_token: Option<CancellationToken>,
289    service_config: ServiceConfig,
290) -> JoinHandle<()> {
291    let cancellation_token = cancellation_token.unwrap_or_default();
292    let mut server_config = ServerConfig {
293        connection: graphql_connection_config,
294        service: service_config,
295        ..ServerConfig::default()
296    };
297    if let Some(fn_rpc_url) = fn_rpc_url {
298        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
299    };
300
301    // Starts graphql server
302    tokio::spawn(async move {
303        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
304            .await
305            .unwrap();
306    })
307}
308
309async fn start_validator_with_fullnode(data_ingestion_dir: PathBuf) -> TestCluster {
310    TestClusterBuilder::new()
311        .with_num_validators(VALIDATOR_COUNT)
312        .with_epoch_duration_ms(EPOCH_DURATION_MS)
313        .with_data_ingestion_dir(data_ingestion_dir)
314        .with_accounts(vec![
315            AccountConfig {
316                address: None,
317                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
318            };
319            ACCOUNT_NUM
320        ])
321        .build()
322        .await
323}
324
325/// Repeatedly ping the GraphQL server for 60s, until it responds
326pub async fn wait_for_graphql_server(client: &SimpleClient) {
327    tokio::time::timeout(Duration::from_secs(60), async {
328        while client.ping().await.is_err() {
329            tokio::time::sleep(Duration::from_millis(500)).await;
330        }
331    })
332    .await
333    .expect("Timeout waiting for graphql server to start");
334}
335
336/// Ping the GraphQL server until its background task has updated the checkpoint watermark to the
337/// desired checkpoint.
338pub async fn wait_for_graphql_checkpoint_catchup(
339    client: &SimpleClient,
340    checkpoint: u64,
341    base_timeout: Duration,
342) {
343    info!(
344        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
345        checkpoint,
346        base_timeout.as_secs()
347    );
348    let query = r#"
349    {
350        availableRange {
351            last {
352                sequenceNumber
353            }
354        }
355    }"#;
356
357    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
358
359    tokio::time::timeout(timeout, async {
360        loop {
361            let resp = client
362                .execute_to_graphql(query.to_string(), false, vec![], vec![])
363                .await
364                .unwrap()
365                .response_body_json();
366
367            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
368            info!("Current checkpoint: {:?}", current_checkpoint);
369            // Indexer has not picked up any checkpoints yet
370            let Some(current_checkpoint) = current_checkpoint else {
371                tokio::time::sleep(Duration::from_secs(1)).await;
372                continue;
373            };
374
375            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
376            let current_checkpoint = current_checkpoint.as_u64().unwrap();
377            if current_checkpoint < checkpoint {
378                tokio::time::sleep(Duration::from_secs(1)).await;
379            } else {
380                break;
381            }
382        }
383    })
384    .await
385    .expect("Timeout waiting for graphql to catchup to checkpoint");
386}
387
388/// Ping the GraphQL server until its background task has updated the checkpoint watermark to the
389/// desired checkpoint.
390pub async fn wait_for_graphql_epoch_catchup(
391    client: &SimpleClient,
392    epoch: u64,
393    base_timeout: Duration,
394) {
395    info!(
396        "Waiting for graphql to catchup to epoch {}, base time out is {}",
397        epoch,
398        base_timeout.as_secs()
399    );
400    let query = r#"
401    {
402        epoch {
403            epochId
404        }
405    }"#;
406
407    let timeout = base_timeout.mul_f64(epoch.max(1) as f64);
408
409    tokio::time::timeout(timeout, async {
410        loop {
411            let resp = client
412                .execute_to_graphql(query.to_string(), false, vec![], vec![])
413                .await
414                .unwrap()
415                .response_body_json();
416
417            let latest_epoch = resp["data"]["epoch"].get("epochId");
418            info!("Latest epoch: {:?}", latest_epoch);
419            // Indexer has not picked up any epochs yet
420            let Some(latest_epoch) = latest_epoch else {
421                tokio::time::sleep(Duration::from_secs(1)).await;
422                continue;
423            };
424
425            // Indexer has picked up an epoch, but it's not the one we're waiting for
426            let latest_epoch = latest_epoch.as_u64().unwrap();
427            if latest_epoch < epoch {
428                tokio::time::sleep(Duration::from_secs(1)).await;
429            } else {
430                break;
431            }
432        }
433    })
434    .await
435    .expect("Timeout waiting for graphql to catchup to epoch");
436}
437
438/// Ping the GraphQL server for a checkpoint until an empty response is returned, indicating that
439/// the checkpoint has been pruned.
440pub async fn wait_for_graphql_checkpoint_pruned(
441    client: &SimpleClient,
442    checkpoint: u64,
443    base_timeout: Duration,
444) {
445    info!(
446        "Waiting for checkpoint to be pruned {}, base time out is {}",
447        checkpoint,
448        base_timeout.as_secs()
449    );
450    let query = format!(
451        r#"
452        {{
453            checkpoint(id: {{ sequenceNumber: {} }}) {{
454                sequenceNumber
455            }}
456        }}"#,
457        checkpoint
458    );
459
460    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
461
462    tokio::time::timeout(timeout, async {
463        loop {
464            let resp = client
465                .execute_to_graphql(query.to_string(), false, vec![], vec![])
466                .await
467                .unwrap()
468                .response_body_json();
469
470            let current_checkpoint = &resp["data"]["checkpoint"];
471            if current_checkpoint.is_null() {
472                break;
473            } else {
474                tokio::time::sleep(Duration::from_secs(1)).await;
475            }
476        }
477    })
478    .await
479    .expect("Timeout waiting for checkpoint to be pruned");
480}
481
482impl Cluster {
483    /// Waits for the indexer to index up to the given checkpoint, then waits for the graphql
484    /// service's background task to update the checkpoint watermark to the given checkpoint.
485    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
486        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
487    }
488
489    /// Waits for the indexer to index up to the given epoch, then waits for the graphql service's
490    /// background task to update the corresponding watermark.
491    pub async fn wait_for_epoch_catchup(&self, epoch: u64, base_timeout: Duration) {
492        wait_for_graphql_epoch_catchup(&self.graphql_client, epoch, base_timeout).await
493    }
494
495    /// Waits for the indexer to prune a given checkpoint.
496    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
497        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
498    }
499}
500
501impl ExecutorCluster {
502    /// Waits for the indexer to index up to the given checkpoint, then waits for the graphql
503    /// service's background task to update the checkpoint watermark to the given checkpoint.
504    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
505        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
506    }
507
508    /// Waits for the indexer to prune a given checkpoint.
509    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
510        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
511    }
512
513    /// The ObjectsSnapshotProcessor is a long-running task that periodically takes a snapshot of
514    /// the objects table. This leads to flakiness in tests, so we wait until the objects_snapshot
515    /// has reached the expected state.
516    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
517        let mut latest_snapshot_cp = 0;
518
519        let latest_cp = self
520            .indexer_store
521            .get_latest_checkpoint_sequence_number()
522            .await
523            .unwrap()
524            .unwrap();
525
526        tokio::time::timeout(base_timeout, async {
527            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
528                tokio::time::sleep(Duration::from_secs(1)).await;
529                latest_snapshot_cp = self
530                    .indexer_store
531                    .get_latest_object_snapshot_checkpoint_sequence_number()
532                    .await
533                    .unwrap()
534                    .unwrap_or_default();
535            }
536        })
537        .await
538        .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {}, latest_snapshot_cp: {}",
539        latest_cp, latest_snapshot_cp));
540    }
541
542    /// Sends a cancellation signal to the graphql and indexer services, waits for them to complete,
543    /// and then deletes the database created for the test.
544    pub async fn cleanup_resources(self) {
545        self.cancellation_token.cancel();
546        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
547    }
548}