1use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::Simulacrum;
23use sui_futures::service::Service;
24use sui_indexer_alt::BootstrapGenesis;
25use sui_indexer_alt::config::IndexerConfig;
26use sui_indexer_alt::setup_indexer;
27use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
29use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
30use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
31use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
32use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
33use sui_indexer_alt_framework::IndexerArgs;
34use sui_indexer_alt_framework::ingestion::ClientArgs;
35use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
36use sui_indexer_alt_framework::postgres::schema::watermarks;
37use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
38use sui_indexer_alt_graphql::args::KvArgs as GraphQlKvArgs;
39use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
40use sui_indexer_alt_graphql::start_rpc as start_graphql;
41use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
42use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
43use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
44use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
45use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
46use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
47use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
48use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
49use sui_pg_db::Db;
50use sui_pg_db::DbArgs;
51use sui_pg_db::temp::TempDb;
52use sui_pg_db::temp::get_available_port;
53use sui_rpc::field::FieldMask;
54use sui_rpc::field::FieldMaskUtil;
55use sui_rpc::merge::Merge;
56use sui_rpc::proto::sui::rpc;
57use sui_types::base_types::ObjectRef;
58use sui_types::base_types::SuiAddress;
59use sui_types::crypto::AccountKeyPair;
60use sui_types::effects::TransactionEffects;
61use sui_types::error::ExecutionError;
62use sui_types::full_checkpoint_content::Checkpoint;
63use sui_types::messages_checkpoint::VerifiedCheckpoint;
64use sui_types::transaction::Transaction;
65use tempfile::TempDir;
66use tokio::time::error::Elapsed;
67use tokio::time::interval;
68use tokio::try_join;
69use url::Url;
70
71pub mod coin_registry;
72pub mod find;
73
74pub struct FullCluster {
77 executor: Simulacrum,
79
80 offchain: OffchainCluster,
83
84 #[allow(unused)]
86 temp_dir: TempDir,
87}
88
89pub struct OffchainCluster {
97 consistent_listen_address: SocketAddr,
99
100 jsonrpc_listen_address: SocketAddr,
102
103 graphql_listen_address: SocketAddr,
105
106 db: Db,
108
109 pipelines: Vec<&'static str>,
111
112 #[allow(unused)]
115 services: Service,
116
117 #[allow(unused)]
119 database: TempDb,
120
121 #[allow(unused)]
124 dir: TempDir,
125}
126
127pub struct OffchainClusterConfig {
128 pub indexer_args: IndexerArgs,
129 pub consistent_indexer_args: IndexerArgs,
130 pub fullnode_args: FullnodeArgs,
131 pub indexer_config: IndexerConfig,
132 pub consistent_config: ConsistentConfig,
133 pub jsonrpc_config: JsonRpcConfig,
134 pub graphql_config: GraphQlConfig,
135 pub bootstrap_genesis: Option<BootstrapGenesis>,
136}
137
138impl FullCluster {
139 pub async fn new() -> anyhow::Result<Self> {
142 Self::new_with_configs(
143 Simulacrum::new(),
144 OffchainClusterConfig::default(),
145 &prometheus::Registry::new(),
146 )
147 .await
148 }
149
150 pub async fn new_with_configs(
154 mut executor: Simulacrum,
155 offchain_cluster_config: OffchainClusterConfig,
156 registry: &prometheus::Registry,
157 ) -> anyhow::Result<Self> {
158 let (client_args, temp_dir) = local_ingestion_client_args();
159 executor.set_data_ingestion_path(temp_dir.path().to_owned());
160
161 let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
162 .await
163 .context("Failed to create off-chain cluster")?;
164
165 Ok(Self {
166 executor,
167 offchain,
168 temp_dir,
169 })
170 }
171
172 pub fn reference_gas_price(&self) -> u64 {
174 self.executor.reference_gas_price()
175 }
176
177 pub fn funded_account(
180 &mut self,
181 amount: u64,
182 ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
183 self.executor.funded_account(amount)
184 }
185
186 pub fn request_gas(
189 &mut self,
190 address: SuiAddress,
191 amount: u64,
192 ) -> anyhow::Result<TransactionEffects> {
193 self.executor.request_gas(address, amount)
194 }
195
196 pub fn execute_transaction(
198 &mut self,
199 tx: Transaction,
200 ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
201 self.executor.execute_transaction(tx)
202 }
203
204 pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
206 self.executor.advance_clock(duration)
207 }
208
209 pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
213 let checkpoint = self.executor.create_checkpoint();
214 let indexer = self
215 .offchain
216 .wait_for_indexer(checkpoint.sequence_number, Duration::from_secs(100));
217 let consistent_store = self
218 .offchain
219 .wait_for_consistent_store(checkpoint.sequence_number, Duration::from_secs(100));
220 let graphql = self
221 .offchain
222 .wait_for_graphql(checkpoint.sequence_number, Duration::from_secs(100));
223
224 try_join!(indexer, consistent_store, graphql)
225 .expect("Timed out waiting for indexer and consistent store");
226
227 checkpoint
228 }
229
230 pub fn db_url(&self) -> Url {
232 self.offchain.db_url()
233 }
234
235 pub fn consistent_store_url(&self) -> Url {
237 self.offchain.consistent_store_url()
238 }
239
240 pub fn jsonrpc_url(&self) -> Url {
242 self.offchain.jsonrpc_url()
243 }
244
245 pub fn graphql_url(&self) -> Url {
247 self.offchain.graphql_url()
248 }
249
250 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
253 self.offchain.latest_checkpoint().await
254 }
255
256 pub async fn wait_for_indexer(
259 &self,
260 checkpoint: u64,
261 timeout: Duration,
262 ) -> Result<(), Elapsed> {
263 self.offchain.wait_for_indexer(checkpoint, timeout).await
264 }
265
266 pub async fn wait_for_pruner(
269 &self,
270 pipeline: &str,
271 checkpoint: u64,
272 timeout: Duration,
273 ) -> Result<(), Elapsed> {
274 self.offchain
275 .wait_for_pruner(pipeline, checkpoint, timeout)
276 .await
277 }
278
279 pub async fn wait_for_graphql(
282 &self,
283 checkpoint: u64,
284 timeout: Duration,
285 ) -> Result<(), Elapsed> {
286 self.offchain.wait_for_graphql(checkpoint, timeout).await
287 }
288}
289
290impl OffchainCluster {
291 pub async fn new(
299 client_args: ClientArgs,
300 OffchainClusterConfig {
301 indexer_args,
302 consistent_indexer_args,
303 fullnode_args,
304 indexer_config,
305 consistent_config,
306 jsonrpc_config,
307 graphql_config,
308 bootstrap_genesis,
309 }: OffchainClusterConfig,
310 registry: &prometheus::Registry,
311 ) -> anyhow::Result<Self> {
312 let consistent_port = get_available_port();
313 let consistent_listen_address =
314 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
315
316 let jsonrpc_port = get_available_port();
317 let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
318
319 let graphql_port = get_available_port();
320 let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
321
322 let database = TempDb::new().context("Failed to create database")?;
323 let database_url = database.database().url();
324
325 let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
326 let rocksdb_path = dir.path().join("rocksdb");
327
328 let consistent_args = ConsistentArgs {
329 rpc_listen_address: consistent_listen_address,
330 tls: ConsistentTlsArgs::default(),
331 };
332
333 let jsonrpc_args = JsonRpcArgs {
334 rpc_listen_address: jsonrpc_listen_address,
335 ..Default::default()
336 };
337
338 let graphql_args = GraphQlArgs {
339 rpc_listen_address: graphql_listen_address,
340 no_ide: true,
341 };
342
343 let db = Db::for_read(database_url.clone(), DbArgs::default())
344 .await
345 .context("Failed to connect to database")?;
346
347 let indexer = setup_indexer(
348 database_url.clone(),
349 DbArgs::default(),
350 indexer_args,
351 client_args.clone(),
352 indexer_config,
353 bootstrap_genesis,
354 registry,
355 )
356 .await
357 .context("Failed to setup indexer")?;
358
359 let pipelines: Vec<_> = indexer.pipelines().collect();
360 let indexer = indexer.run().await.context("Failed to start indexer")?;
361
362 let consistent_store = start_consistent_store(
363 rocksdb_path,
364 consistent_indexer_args,
365 client_args,
366 consistent_args,
367 "0.0.0",
368 consistent_config,
369 registry,
370 )
371 .await
372 .context("Failed to start Consistent Store")?;
373
374 let consistent_reader_args = ConsistentReaderArgs {
375 consistent_store_url: Some(
376 Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
377 ),
378 consistent_store_statement_timeout_ms: None,
379 };
380
381 let jsonrpc = start_jsonrpc(
382 Some(database_url.clone()),
383 None,
384 DbArgs::default(),
385 BigtableArgs::default(),
386 consistent_reader_args.clone(),
387 jsonrpc_args,
388 JsonRpcNodeArgs::default(),
389 SystemPackageTaskArgs::default(),
390 jsonrpc_config,
391 registry,
392 )
393 .await
394 .context("Failed to start JSON-RPC server")?;
395
396 let graphql = start_graphql(
397 Some(database_url.clone()),
398 fullnode_args,
399 DbArgs::default(),
400 GraphQlKvArgs::default(),
401 consistent_reader_args,
402 graphql_args,
403 SystemPackageTaskArgs::default(),
404 "0.0.0",
405 graphql_config,
406 pipelines.iter().map(|p| p.to_string()).collect(),
407 registry,
408 )
409 .await
410 .context("Failed to start GraphQL server")?;
411
412 let services = indexer
413 .merge(consistent_store)
414 .merge(jsonrpc)
415 .merge(graphql);
416
417 Ok(Self {
418 consistent_listen_address,
419 jsonrpc_listen_address,
420 graphql_listen_address,
421 db,
422 pipelines,
423 services,
424 database,
425 dir,
426 })
427 }
428
429 pub fn db_url(&self) -> Url {
431 self.database.database().url().clone()
432 }
433
434 pub fn consistent_store_url(&self) -> Url {
436 Url::parse(&format!("http://{}/", self.consistent_listen_address))
437 .expect("Failed to parse RPC URL")
438 }
439
440 pub fn jsonrpc_url(&self) -> Url {
442 Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
443 .expect("Failed to parse RPC URL")
444 }
445
446 pub fn graphql_url(&self) -> Url {
448 Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
449 .expect("Failed to parse RPC URL")
450 }
451
452 pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
455 use watermarks::dsl as w;
456
457 let mut conn = self
458 .db
459 .connect()
460 .await
461 .context("Failed to connect to database")?;
462
463 let latest: HashMap<String, i64> = w::watermarks
464 .select((w::pipeline, w::checkpoint_hi_inclusive))
465 .filter(w::pipeline.eq_any(&self.pipelines))
466 .load(&mut conn)
467 .await?
468 .into_iter()
469 .collect();
470
471 if latest.len() != self.pipelines.len() {
472 return Ok(None);
473 }
474
475 Ok(latest.into_values().min().map(|l| l as u64))
476 }
477
478 pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
481 use watermarks::dsl as w;
482
483 let mut conn = self
484 .db
485 .connect()
486 .await
487 .context("Failed to connect to database")?;
488
489 let latest: Option<i64> = w::watermarks
490 .select(w::reader_lo)
491 .filter(w::pipeline.eq(pipeline))
492 .first(&mut conn)
493 .await
494 .optional()?;
495
496 Ok(latest.map(|l| l as u64))
497 }
498
499 pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
501 ConsistentServiceClient::connect(self.consistent_store_url().to_string())
502 .await
503 .context("Failed to connect to Consistent Store")?
504 .available_range(AvailableRangeRequest {})
505 .await
506 .context("Failed to fetch available range from Consistent Store")?
507 .into_inner()
508 .max_checkpoint
509 .context("Consistent Store has not started yet")
510 }
511
512 pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
514 let query = json!({
515 "query": "query { checkpoint { sequenceNumber } }"
516 });
517
518 let client = Client::new();
519 let request = client.post(self.graphql_url()).json(&query);
520 let response = request
521 .send()
522 .await
523 .context("Request to GraphQL server failed")?;
524
525 let body: Value = response
526 .json()
527 .await
528 .context("Failed to parse GraphQL response")?;
529
530 let sequence_number = body
531 .pointer("/data/checkpoint/sequenceNumber")
532 .context("Failed to find checkpoint sequence number in response")?;
533
534 let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
535 .context("Failed to parse sequence number as i64")?;
536
537 ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
538
539 Ok(sequence_number as u64)
540 }
541
542 pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
544 let query = json!({
545 "query": "query { epoch { epochId } }"
546 });
547
548 let client = Client::new();
549 let request = client.post(self.graphql_url()).json(&query);
550 let response = request
551 .send()
552 .await
553 .context("Request to GraphQL server failed")?;
554
555 let body: Value = response
556 .json()
557 .await
558 .context("Failed to parse GraphQL response")?;
559
560 let epoch_id = body
561 .pointer("/data/epoch/epochId")
562 .context("Failed to find epochId in response")?;
563
564 let epoch_id: i64 =
565 serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
566
567 ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
568
569 Ok(epoch_id as u64)
570 }
571
572 pub async fn wait_for_indexer(
575 &self,
576 checkpoint: u64,
577 timeout: Duration,
578 ) -> Result<(), Elapsed> {
579 tokio::time::timeout(timeout, async move {
580 let mut interval = interval(Duration::from_millis(200));
581 loop {
582 interval.tick().await;
583 if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
584 break;
585 }
586 }
587 })
588 .await
589 }
590
591 pub async fn wait_for_pruner(
594 &self,
595 pipeline: &str,
596 checkpoint: u64,
597 timeout: Duration,
598 ) -> Result<(), Elapsed> {
599 tokio::time::timeout(timeout, async move {
600 let mut interval = interval(Duration::from_millis(200));
601 loop {
602 interval.tick().await;
603 if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
604 break;
605 }
606 }
607 }).await
608 }
609
610 pub async fn wait_for_consistent_store(
613 &self,
614 checkpoint: u64,
615 timeout: Duration,
616 ) -> Result<(), Elapsed> {
617 tokio::time::timeout(timeout, async move {
618 let mut interval = interval(Duration::from_millis(200));
619 loop {
620 interval.tick().await;
621 if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
622 break;
623 }
624 }
625 })
626 .await
627 }
628
629 pub async fn wait_for_graphql(
632 &self,
633 checkpoint: u64,
634 timeout: Duration,
635 ) -> Result<(), Elapsed> {
636 tokio::time::timeout(timeout, async move {
637 let mut interval = interval(Duration::from_millis(200));
638 loop {
639 interval.tick().await;
640 if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
641 break;
642 }
643 }
644 })
645 .await
646 }
647}
648
649impl Default for OffchainClusterConfig {
650 fn default() -> Self {
651 Self {
652 indexer_args: Default::default(),
653 consistent_indexer_args: Default::default(),
654 fullnode_args: Default::default(),
655 indexer_config: IndexerConfig::for_test(),
656 consistent_config: ConsistentConfig::for_test(),
657 jsonrpc_config: Default::default(),
658 graphql_config: Default::default(),
659 bootstrap_genesis: None,
660 }
661 }
662}
663
664pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
666 let temp_dir = tempfile::tempdir()
667 .context("Failed to create data ingestion path")
668 .unwrap();
669 let client_args = ClientArgs {
670 ingestion: IngestionClientArgs {
671 local_ingestion_path: Some(temp_dir.path().to_owned()),
672 ..Default::default()
673 },
674 ..Default::default()
675 };
676 (client_args, temp_dir)
677}
678
679pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
681 let sequence_number = checkpoint.summary.sequence_number;
682
683 let mask = FieldMask::from_paths([
684 rpc::v2::Checkpoint::path_builder().sequence_number(),
685 rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
686 rpc::v2::Checkpoint::path_builder().signature().finish(),
687 rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
688 rpc::v2::Checkpoint::path_builder()
689 .transactions()
690 .transaction()
691 .bcs()
692 .value(),
693 rpc::v2::Checkpoint::path_builder()
694 .transactions()
695 .effects()
696 .bcs()
697 .value(),
698 rpc::v2::Checkpoint::path_builder()
699 .transactions()
700 .effects()
701 .unchanged_loaded_runtime_objects()
702 .finish(),
703 rpc::v2::Checkpoint::path_builder()
704 .transactions()
705 .events()
706 .bcs()
707 .value(),
708 rpc::v2::Checkpoint::path_builder()
709 .objects()
710 .objects()
711 .bcs()
712 .value(),
713 ]);
714
715 let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
716 let proto_bytes = proto_checkpoint.encode_to_vec();
717 let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
718
719 let file_name = format!("{}.binpb.zst", sequence_number);
720 let file_path = path.join(file_name);
721 fs::write(file_path, compressed)?;
722 Ok(())
723}