1use crate::config::ConnectionConfig;
5use crate::config::ServerConfig;
6use crate::config::ServiceConfig;
7use crate::config::Version;
8use crate::server::graphiql_server::start_graphiql_server;
9use rand::rngs::StdRng;
10use rand::SeedableRng;
11use simulacrum::AdvanceEpochConfig;
12use simulacrum::Simulacrum;
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use sui_graphql_rpc_client::simple_client::SimpleClient;
18pub use sui_indexer::config::RetentionConfig;
19pub use sui_indexer::config::SnapshotLagConfig;
20use sui_indexer::errors::IndexerError;
21use sui_indexer::store::PgIndexerStore;
22use sui_indexer::test_utils::start_indexer_writer_for_testing;
23use sui_pg_db::temp::{get_available_port, TempDb};
24use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
25use sui_types::storage::RpcStateReader;
26use tempfile::tempdir;
27use tempfile::TempDir;
28use test_cluster::TestCluster;
29use test_cluster::TestClusterBuilder;
30use tokio::join;
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33use tracing::info;
34
35const VALIDATOR_COUNT: usize = 4;
36const EPOCH_DURATION_MS: u64 = 300_000;
39
40const ACCOUNT_NUM: usize = 20;
41const GAS_OBJECT_COUNT: usize = 3;
42
43pub struct ExecutorCluster {
44 pub executor_server_handle: JoinHandle<()>,
45 pub indexer_store: PgIndexerStore,
46 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
47 pub graphql_server_join_handle: JoinHandle<()>,
48 pub graphql_client: SimpleClient,
49 pub snapshot_config: SnapshotLagConfig,
50 pub graphql_connection_config: ConnectionConfig,
51 pub cancellation_token: CancellationToken,
52 #[allow(unused)]
53 database: TempDb,
54 tempdir: Option<TempDir>,
55}
56
57pub struct Cluster {
58 pub network: NetworkCluster,
59 pub graphql_server_join_handle: JoinHandle<()>,
60 pub graphql_client: SimpleClient,
61}
62
63pub struct NetworkCluster {
64 pub validator_fullnode_handle: TestCluster,
65 pub indexer_store: PgIndexerStore,
66 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
67 pub cancellation_token: CancellationToken,
68 #[allow(unused)]
69 data_ingestion_path: TempDir,
70 #[allow(unused)]
71 database: TempDb,
72 pub graphql_connection_config: ConnectionConfig,
73}
74
75pub async fn start_cluster(service_config: ServiceConfig) -> Cluster {
77 let network_cluster = start_network_cluster().await;
78 let graphql_connection_config = network_cluster.graphql_connection_config.clone();
79
80 let fn_rpc_url: String = network_cluster
81 .validator_fullnode_handle
82 .rpc_url()
83 .to_string();
84
85 let server_url = format!(
86 "http://{}:{}/",
87 graphql_connection_config.host, graphql_connection_config.port
88 );
89
90 let graphql_server_handle = start_graphql_server_with_fn_rpc(
91 graphql_connection_config,
92 Some(fn_rpc_url),
93 Some(network_cluster.cancellation_token.clone()),
94 service_config,
95 )
96 .await;
97
98 let client = SimpleClient::new(server_url);
100 wait_for_graphql_server(&client).await;
101
102 Cluster {
103 network: network_cluster,
104 graphql_server_join_handle: graphql_server_handle,
105 graphql_client: client,
106 }
107}
108
109pub async fn start_network_cluster() -> NetworkCluster {
113 let database = TempDb::new().unwrap();
114 let graphql_connection_config = ConnectionConfig {
115 port: get_available_port(),
116 host: "127.0.0.1".to_owned(),
117 db_url: database.database().url().as_str().to_owned(),
118 db_pool_size: 5,
119 prom_host: "127.0.0.1".to_owned(),
120 prom_port: get_available_port(),
121 skip_migration_consistency_check: false,
122 };
123 let data_ingestion_path = tempfile::tempdir().unwrap();
124 let db_url = graphql_connection_config.db_url.clone();
125 let cancellation_token = CancellationToken::new();
126
127 let val_fn = start_validator_with_fullnode(data_ingestion_path.path().to_path_buf()).await;
129
130 let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
132 db_url,
133 None,
134 None,
135 Some(data_ingestion_path.path().to_path_buf()),
136 Some(cancellation_token.clone()),
137 None, None, )
140 .await;
141
142 NetworkCluster {
143 validator_fullnode_handle: val_fn,
144 indexer_store: pg_store,
145 indexer_join_handle: pg_handle,
146 cancellation_token,
147 data_ingestion_path,
148 database,
149 graphql_connection_config,
150 }
151}
152
153pub async fn serve_executor(
156 executor: Arc<dyn RpcStateReader + Send + Sync>,
157 snapshot_config: Option<SnapshotLagConfig>,
158 retention_config: Option<RetentionConfig>,
159 data_ingestion_path: PathBuf,
160) -> ExecutorCluster {
161 let database = TempDb::new().unwrap();
162 let graphql_connection_config = ConnectionConfig {
163 port: get_available_port(),
164 host: "127.0.0.1".to_owned(),
165 db_url: database.database().url().as_str().to_owned(),
166 db_pool_size: 5,
167 prom_host: "127.0.0.1".to_owned(),
168 prom_port: get_available_port(),
169 skip_migration_consistency_check: false,
170 };
171 let db_url = graphql_connection_config.db_url.clone();
172 let cancellation_token = CancellationToken::new();
175
176 let executor_server_url: SocketAddr = format!("127.0.0.1:{}", get_available_port())
177 .parse()
178 .unwrap();
179
180 info!("Starting executor server on {}", executor_server_url);
181
182 let executor_server_handle = tokio::spawn(async move {
183 sui_rpc_api::RpcService::new(executor)
184 .start_service(executor_server_url)
185 .await;
186 });
187
188 info!("spawned executor server");
189
190 let snapshot_config = snapshot_config.unwrap_or_default();
191
192 let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
193 db_url,
194 Some(snapshot_config.clone()),
195 retention_config,
196 Some(data_ingestion_path),
197 Some(cancellation_token.clone()),
198 None,
199 None,
200 )
201 .await;
202
203 let graphql_server_handle = start_graphql_server(
205 graphql_connection_config.clone(),
206 cancellation_token.clone(),
207 ServiceConfig::test_defaults(),
208 )
209 .await;
210
211 let server_url = format!(
212 "http://{}:{}/",
213 graphql_connection_config.host, graphql_connection_config.port
214 );
215
216 let client = SimpleClient::new(server_url);
218 wait_for_graphql_server(&client).await;
219
220 ExecutorCluster {
221 executor_server_handle,
222 indexer_store: pg_store,
223 indexer_join_handle: pg_handle,
224 graphql_server_join_handle: graphql_server_handle,
225 graphql_client: client,
226 snapshot_config,
227 graphql_connection_config,
228 cancellation_token,
229 database,
230 tempdir: None,
231 }
232}
233
234pub async fn prep_executor_cluster() -> ExecutorCluster {
235 let rng = StdRng::from_seed([12; 32]);
236 let data_ingestion_path = tempdir().unwrap();
237 let mut sim = Simulacrum::new_with_rng(rng);
238 sim.set_data_ingestion_path(data_ingestion_path.path().to_path_buf());
239
240 sim.create_checkpoint();
241 sim.create_checkpoint();
242 sim.create_checkpoint();
243 sim.advance_epoch(AdvanceEpochConfig {
244 create_random_state: true,
245 ..Default::default()
246 });
247 sim.create_checkpoint();
248 sim.advance_clock(
249 std::time::SystemTime::now()
250 .duration_since(std::time::SystemTime::UNIX_EPOCH)
251 .unwrap(),
252 );
253 sim.create_checkpoint();
254
255 let mut cluster = serve_executor(
256 Arc::new(sim),
257 None,
258 None,
259 data_ingestion_path.path().to_path_buf(),
260 )
261 .await;
262
263 cluster
264 .wait_for_checkpoint_catchup(6, Duration::from_secs(10))
265 .await;
266
267 cluster.tempdir = Some(data_ingestion_path);
268 cluster
269}
270
271pub async fn start_graphql_server(
272 graphql_connection_config: ConnectionConfig,
273 cancellation_token: CancellationToken,
274 service_config: ServiceConfig,
275) -> JoinHandle<()> {
276 start_graphql_server_with_fn_rpc(
277 graphql_connection_config,
278 None,
279 Some(cancellation_token),
280 service_config,
281 )
282 .await
283}
284
285pub async fn start_graphql_server_with_fn_rpc(
286 graphql_connection_config: ConnectionConfig,
287 fn_rpc_url: Option<String>,
288 cancellation_token: Option<CancellationToken>,
289 service_config: ServiceConfig,
290) -> JoinHandle<()> {
291 let cancellation_token = cancellation_token.unwrap_or_default();
292 let mut server_config = ServerConfig {
293 connection: graphql_connection_config,
294 service: service_config,
295 ..ServerConfig::default()
296 };
297 if let Some(fn_rpc_url) = fn_rpc_url {
298 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
299 };
300
301 tokio::spawn(async move {
303 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
304 .await
305 .unwrap();
306 })
307}
308
309async fn start_validator_with_fullnode(data_ingestion_dir: PathBuf) -> TestCluster {
310 TestClusterBuilder::new()
311 .with_num_validators(VALIDATOR_COUNT)
312 .with_epoch_duration_ms(EPOCH_DURATION_MS)
313 .with_data_ingestion_dir(data_ingestion_dir)
314 .with_accounts(vec![
315 AccountConfig {
316 address: None,
317 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
318 };
319 ACCOUNT_NUM
320 ])
321 .build()
322 .await
323}
324
325pub async fn wait_for_graphql_server(client: &SimpleClient) {
327 tokio::time::timeout(Duration::from_secs(60), async {
328 while client.ping().await.is_err() {
329 tokio::time::sleep(Duration::from_millis(500)).await;
330 }
331 })
332 .await
333 .expect("Timeout waiting for graphql server to start");
334}
335
336pub async fn wait_for_graphql_checkpoint_catchup(
339 client: &SimpleClient,
340 checkpoint: u64,
341 base_timeout: Duration,
342) {
343 info!(
344 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
345 checkpoint,
346 base_timeout.as_secs()
347 );
348 let query = r#"
349 {
350 availableRange {
351 last {
352 sequenceNumber
353 }
354 }
355 }"#;
356
357 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
358
359 tokio::time::timeout(timeout, async {
360 loop {
361 let resp = client
362 .execute_to_graphql(query.to_string(), false, vec![], vec![])
363 .await
364 .unwrap()
365 .response_body_json();
366
367 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
368 info!("Current checkpoint: {:?}", current_checkpoint);
369 let Some(current_checkpoint) = current_checkpoint else {
371 tokio::time::sleep(Duration::from_secs(1)).await;
372 continue;
373 };
374
375 let current_checkpoint = current_checkpoint.as_u64().unwrap();
377 if current_checkpoint < checkpoint {
378 tokio::time::sleep(Duration::from_secs(1)).await;
379 } else {
380 break;
381 }
382 }
383 })
384 .await
385 .expect("Timeout waiting for graphql to catchup to checkpoint");
386}
387
388pub async fn wait_for_graphql_epoch_catchup(
391 client: &SimpleClient,
392 epoch: u64,
393 base_timeout: Duration,
394) {
395 info!(
396 "Waiting for graphql to catchup to epoch {}, base time out is {}",
397 epoch,
398 base_timeout.as_secs()
399 );
400 let query = r#"
401 {
402 epoch {
403 epochId
404 }
405 }"#;
406
407 let timeout = base_timeout.mul_f64(epoch.max(1) as f64);
408
409 tokio::time::timeout(timeout, async {
410 loop {
411 let resp = client
412 .execute_to_graphql(query.to_string(), false, vec![], vec![])
413 .await
414 .unwrap()
415 .response_body_json();
416
417 let latest_epoch = resp["data"]["epoch"].get("epochId");
418 info!("Latest epoch: {:?}", latest_epoch);
419 let Some(latest_epoch) = latest_epoch else {
421 tokio::time::sleep(Duration::from_secs(1)).await;
422 continue;
423 };
424
425 let latest_epoch = latest_epoch.as_u64().unwrap();
427 if latest_epoch < epoch {
428 tokio::time::sleep(Duration::from_secs(1)).await;
429 } else {
430 break;
431 }
432 }
433 })
434 .await
435 .expect("Timeout waiting for graphql to catchup to epoch");
436}
437
438pub async fn wait_for_graphql_checkpoint_pruned(
441 client: &SimpleClient,
442 checkpoint: u64,
443 base_timeout: Duration,
444) {
445 info!(
446 "Waiting for checkpoint to be pruned {}, base time out is {}",
447 checkpoint,
448 base_timeout.as_secs()
449 );
450 let query = format!(
451 r#"
452 {{
453 checkpoint(id: {{ sequenceNumber: {} }}) {{
454 sequenceNumber
455 }}
456 }}"#,
457 checkpoint
458 );
459
460 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
461
462 tokio::time::timeout(timeout, async {
463 loop {
464 let resp = client
465 .execute_to_graphql(query.to_string(), false, vec![], vec![])
466 .await
467 .unwrap()
468 .response_body_json();
469
470 let current_checkpoint = &resp["data"]["checkpoint"];
471 if current_checkpoint.is_null() {
472 break;
473 } else {
474 tokio::time::sleep(Duration::from_secs(1)).await;
475 }
476 }
477 })
478 .await
479 .expect("Timeout waiting for checkpoint to be pruned");
480}
481
482impl Cluster {
483 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
486 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
487 }
488
489 pub async fn wait_for_epoch_catchup(&self, epoch: u64, base_timeout: Duration) {
492 wait_for_graphql_epoch_catchup(&self.graphql_client, epoch, base_timeout).await
493 }
494
495 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
497 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
498 }
499}
500
501impl ExecutorCluster {
502 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
505 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
506 }
507
508 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
510 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
511 }
512
513 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
517 let mut latest_snapshot_cp = 0;
518
519 let latest_cp = self
520 .indexer_store
521 .get_latest_checkpoint_sequence_number()
522 .await
523 .unwrap()
524 .unwrap();
525
526 tokio::time::timeout(base_timeout, async {
527 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
528 tokio::time::sleep(Duration::from_secs(1)).await;
529 latest_snapshot_cp = self
530 .indexer_store
531 .get_latest_object_snapshot_checkpoint_sequence_number()
532 .await
533 .unwrap()
534 .unwrap_or_default();
535 }
536 })
537 .await
538 .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {}, latest_snapshot_cp: {}",
539 latest_cp, latest_snapshot_cp));
540 }
541
542 pub async fn cleanup_resources(self) {
545 self.cancellation_token.cancel();
546 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
547 }
548}