sui_indexer_alt_e2e_tests/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::Simulacrum;
23use sui_futures::service::Service;
24use sui_indexer_alt::BootstrapGenesis;
25use sui_indexer_alt::config::IndexerConfig;
26use sui_indexer_alt::setup_indexer;
27use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
29use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
30use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
31use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
32use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
33use sui_indexer_alt_framework::IndexerArgs;
34use sui_indexer_alt_framework::ingestion::ClientArgs;
35use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
36use sui_indexer_alt_framework::pipeline::CommitterConfig;
37use sui_indexer_alt_framework::postgres::schema::watermarks;
38use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
39use sui_indexer_alt_graphql::args::SubscriptionArgs;
40use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
41use sui_indexer_alt_graphql::start_rpc as start_graphql;
42use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
43use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
44use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
45use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
46use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
47use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
48use sui_indexer_alt_reader::kv_loader::KvArgs;
49use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
50use sui_kv_rpc::KvRpcServer;
51use sui_kvstore::ALL_PIPELINE_NAMES;
52use sui_kvstore::ALPHA_PIPELINE_NAMES;
53use sui_kvstore::BigTableClient;
54use sui_kvstore::BigTableIndexer;
55use sui_kvstore::BigTableStore;
56use sui_kvstore::IndexerConfig as BtIndexerConfig;
57use sui_kvstore::IngestionConfig as BtIngestionConfig;
58use sui_kvstore::KeyValueStoreReader;
59use sui_kvstore::PipelineLayer;
60use sui_kvstore::testing::BigTableEmulator;
61use sui_kvstore::testing::INSTANCE_ID;
62use sui_kvstore::testing::create_tables;
63use sui_pg_db::Db;
64use sui_pg_db::DbArgs;
65use sui_pg_db::temp::TempDb;
66use sui_pg_db::temp::get_available_port;
67use sui_protocol_config::Chain;
68use sui_rpc::field::FieldMask;
69use sui_rpc::field::FieldMaskUtil;
70use sui_rpc::merge::Merge;
71use sui_rpc::proto::sui::rpc;
72use sui_types::base_types::ObjectRef;
73use sui_types::base_types::SuiAddress;
74use sui_types::crypto::AccountKeyPair;
75use sui_types::effects::TransactionEffects;
76use sui_types::error::ExecutionError;
77use sui_types::full_checkpoint_content::Checkpoint;
78use sui_types::messages_checkpoint::VerifiedCheckpoint;
79use sui_types::transaction::Transaction;
80use tempfile::TempDir;
81use tokio::time::error::Elapsed;
82use tokio::time::interval;
83use tokio::try_join;
84use url::Url;
85
86pub mod coin_registry;
87pub mod find;
88pub mod move_helpers;
89
90/// A simulation of the network, accompanied by off-chain services (database, indexer, RPC),
91/// connected by local data ingestion.
92pub struct FullCluster {
93    /// A simulation of the network, executing transactions and producing checkpoints.
94    executor: Simulacrum,
95
96    /// The off-chain services (database, indexer, RPC) that are ingesting data from the
97    /// simulation.
98    offchain: OffchainCluster,
99
100    /// Temporary directory to store checkpoint information in, so that the indexer can pick it up.
101    #[allow(unused)]
102    temp_dir: TempDir,
103}
104
105/// A collection of the off-chain services (an indexer, a database, and JSON-RPC/GraphQL servers
106/// that read from that database), grouped together to simplify set-up and tear-down for tests. The
107/// included RPC servers do not support transaction dry run and execution.
108///
109/// The database is temporary, and will be cleaned up when the cluster is dropped, and the RPCs are
110/// set-up to listen on a random, available port, to avoid conflicts when multiple instances are
111/// running concurrently in the same process.
112pub struct OffchainCluster {
113    /// The address the consistent store is listening on.
114    consistent_listen_address: SocketAddr,
115
116    /// The address the JSON-RPC server is listening on.
117    jsonrpc_listen_address: SocketAddr,
118
119    /// The address the GraphQL server is listening on.
120    graphql_listen_address: SocketAddr,
121
122    /// The address the kv-rpc (LedgerService) server is listening on.
123    kv_rpc_listen_address: SocketAddr,
124
125    /// Read access to BigTable.
126    bigtable_client: BigTableClient,
127
128    /// Read access to the temporary database.
129    db: Db,
130
131    /// The pipelines that the indexer is populating.
132    pipelines: Vec<&'static str>,
133
134    /// Handles to all running services. Held on to so the services are not dropped (and therefore
135    /// aborted) until the cluster is stopped.
136    #[allow(unused)]
137    services: Service,
138
139    /// Handle to the BigTable emulator process.
140    #[allow(unused)]
141    bigtable_emulator: BigTableEmulator,
142
143    /// Hold on to the database so it doesn't get dropped until the cluster is stopped.
144    #[allow(unused)]
145    database: TempDb,
146
147    /// Hold on to the temporary directory where the consistent store writes its data, so it
148    /// doesn't get cleaned up until the cluster is stopped.
149    #[allow(unused)]
150    dir: TempDir,
151}
152
153pub struct OffchainClusterConfig {
154    pub indexer_args: IndexerArgs,
155    pub consistent_indexer_args: IndexerArgs,
156    pub fullnode_args: FullnodeArgs,
157    pub indexer_config: IndexerConfig,
158    pub consistent_config: ConsistentConfig,
159    pub jsonrpc_config: JsonRpcConfig,
160    pub jsonrpc_node_args: JsonRpcNodeArgs,
161    pub graphql_config: GraphQlConfig,
162    pub bootstrap_genesis: Option<BootstrapGenesis>,
163}
164
165impl FullCluster {
166    /// Creates a cluster with a fresh executor where the off-chain services are set up with a
167    /// default configuration.
168    pub async fn new() -> anyhow::Result<Self> {
169        Self::new_with_configs(
170            Simulacrum::new(),
171            OffchainClusterConfig::default(),
172            &prometheus::Registry::new(),
173        )
174        .await
175    }
176
177    /// Creates a new cluster executing transactions using `executor`. The indexer is configured
178    /// using `indexer_args` and `indexer_config, the JSON-RPC server is configured using
179    /// `jsonrpc_config`, and the GraphQL server is configured using `graphql_config`.
180    pub async fn new_with_configs(
181        mut executor: Simulacrum,
182        offchain_cluster_config: OffchainClusterConfig,
183        registry: &prometheus::Registry,
184    ) -> anyhow::Result<Self> {
185        let (client_args, temp_dir) = local_ingestion_client_args();
186        executor.set_data_ingestion_path(temp_dir.path().to_owned());
187
188        let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
189            .await
190            .context("Failed to create off-chain cluster")?;
191
192        Ok(Self {
193            executor,
194            offchain,
195            temp_dir,
196        })
197    }
198
199    /// Return the reference gas price for the current epoch
200    pub fn reference_gas_price(&self) -> u64 {
201        self.executor.reference_gas_price()
202    }
203
204    /// Create a new account and credit it with `amount` gas units from a faucet account. Returns
205    /// the account, its keypair, and a reference to the gas object it was funded with.
206    pub fn funded_account(
207        &mut self,
208        amount: u64,
209    ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
210        self.executor.funded_account(amount)
211    }
212
213    /// Request gas from the faucet, sent to `address`. Return the object reference of the gas
214    /// object that was sent.
215    pub fn request_gas(
216        &mut self,
217        address: SuiAddress,
218        amount: u64,
219    ) -> anyhow::Result<TransactionEffects> {
220        self.executor.request_gas(address, amount)
221    }
222
223    /// Execute a signed transaction, returning its effects.
224    pub fn execute_transaction(
225        &mut self,
226        tx: Transaction,
227    ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
228        self.executor.execute_transaction(tx)
229    }
230
231    /// Execute a system transaction advancing the lock by the given `duration`.
232    pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
233        self.executor.advance_clock(duration)
234    }
235
236    /// Create a new checkpoint containing the transactions executed since the last checkpoint that
237    /// was created, and wait for the off-chain services to ingest it. Returns the checkpoint
238    /// contents.
239    pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
240        let checkpoint = self.executor.create_checkpoint();
241        let timeout = Duration::from_secs(100);
242        let indexer = self
243            .offchain
244            .wait_for_indexer(checkpoint.sequence_number, timeout);
245        let consistent_store = self
246            .offchain
247            .wait_for_consistent_store(checkpoint.sequence_number, timeout);
248        let graphql = self
249            .offchain
250            .wait_for_graphql(checkpoint.sequence_number, timeout);
251        let bigtable = self
252            .offchain
253            .wait_for_bigtable(checkpoint.sequence_number, timeout);
254
255        try_join!(indexer, consistent_store, graphql, bigtable)
256            .expect("Timed out waiting for off-chain services");
257
258        checkpoint
259    }
260
261    /// The URL to talk to the database on.
262    pub fn db_url(&self) -> Url {
263        self.offchain.db_url()
264    }
265
266    /// The URL to send Consistent Store requests to.
267    pub fn consistent_store_url(&self) -> Url {
268        self.offchain.consistent_store_url()
269    }
270
271    /// The URL to send JSON-RPC requests to.
272    pub fn jsonrpc_url(&self) -> Url {
273        self.offchain.jsonrpc_url()
274    }
275
276    /// The URL to send GraphQL requests to.
277    pub fn graphql_url(&self) -> Url {
278        self.offchain.graphql_url()
279    }
280
281    /// The URL to send kv-rpc (LedgerService) requests to.
282    pub fn kv_rpc_url(&self) -> Url {
283        self.offchain.kv_rpc_url()
284    }
285
286    /// Returns the latest checkpoint that we have all data for in the database, according to the
287    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
288    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
289        self.offchain.latest_checkpoint().await
290    }
291
292    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
293    /// reached (an error).
294    pub async fn wait_for_indexer(
295        &self,
296        checkpoint: u64,
297        timeout: Duration,
298    ) -> Result<(), Elapsed> {
299        self.offchain.wait_for_indexer(checkpoint, timeout).await
300    }
301
302    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
303    /// `pipeline`, or the `timeout` is reached (an error).
304    pub async fn wait_for_pruner(
305        &self,
306        pipeline: &str,
307        checkpoint: u64,
308        timeout: Duration,
309    ) -> Result<(), Elapsed> {
310        self.offchain
311            .wait_for_pruner(pipeline, checkpoint, timeout)
312            .await
313    }
314
315    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is
316    /// reached (an error).
317    pub async fn wait_for_graphql(
318        &self,
319        checkpoint: u64,
320        timeout: Duration,
321    ) -> Result<(), Elapsed> {
322        self.offchain.wait_for_graphql(checkpoint, timeout).await
323    }
324}
325
326impl OffchainCluster {
327    /// Construct a new off-chain cluster and spin up its constituent services.
328    ///
329    /// - `indexer_args`, `client_args`, and `indexer_config` control the indexer. In particular
330    ///   `client_args` is used to configure the client that the indexer uses to fetch checkpoints.
331    /// - `jsonrpc_config` controls the JSON-RPC server.
332    /// - `graphql_config` controls the GraphQL server.
333    /// - `registry` is used to register metrics for the indexer, JSON-RPC, and GraphQL servers.
334    pub async fn new(
335        client_args: ClientArgs,
336        OffchainClusterConfig {
337            indexer_args,
338            consistent_indexer_args,
339            fullnode_args,
340            indexer_config,
341            consistent_config,
342            jsonrpc_config,
343            jsonrpc_node_args,
344            graphql_config,
345            bootstrap_genesis,
346        }: OffchainClusterConfig,
347        registry: &prometheus::Registry,
348    ) -> anyhow::Result<Self> {
349        let consistent_port = get_available_port();
350        let consistent_listen_address =
351            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
352
353        let jsonrpc_port = get_available_port();
354        let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
355
356        let graphql_port = get_available_port();
357        let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
358
359        let kv_rpc_port = get_available_port();
360        let kv_rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), kv_rpc_port);
361
362        let database = TempDb::new().context("Failed to create database")?;
363        let database_url = database.database().url();
364
365        let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
366        let rocksdb_path = dir.path().join("rocksdb");
367
368        let consistent_args = ConsistentArgs {
369            rpc_listen_address: consistent_listen_address,
370            tls: ConsistentTlsArgs::default(),
371        };
372
373        let jsonrpc_args = JsonRpcArgs {
374            rpc_listen_address: jsonrpc_listen_address,
375            ..Default::default()
376        };
377
378        let graphql_args = GraphQlArgs {
379            rpc_listen_address: graphql_listen_address,
380            no_ide: true,
381        };
382
383        let db = Db::for_read(database_url.clone(), DbArgs::default())
384            .await
385            .context("Failed to connect to database")?;
386
387        let indexer = setup_indexer(
388            database_url.clone(),
389            DbArgs::default(),
390            indexer_args,
391            client_args.clone(),
392            indexer_config,
393            bootstrap_genesis,
394            registry,
395        )
396        .await
397        .context("Failed to setup indexer")?;
398
399        let pipelines: Vec<_> = indexer.pipelines().collect();
400        let indexer = indexer.run().await.context("Failed to start indexer")?;
401
402        let consistent_store = start_consistent_store(
403            rocksdb_path,
404            consistent_indexer_args,
405            client_args.clone(),
406            consistent_args,
407            "0.0.0",
408            consistent_config,
409            registry,
410        )
411        .await
412        .context("Failed to start Consistent Store")?;
413
414        let consistent_reader_args = ConsistentReaderArgs {
415            consistent_store_url: Some(
416                Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
417            ),
418            ..Default::default()
419        };
420
421        let (bigtable_client, bigtable_emulator, archival_service) =
422            start_archival(client_args.clone(), kv_rpc_address, registry).await?;
423
424        let kv_args = KvArgs {
425            ledger_grpc_url: Some(
426                format!("http://{kv_rpc_address}")
427                    .parse()
428                    .expect("Failed to parse kv-rpc URI"),
429            ),
430            ..Default::default()
431        };
432
433        let jsonrpc = start_jsonrpc(
434            Some(database_url.clone()),
435            DbArgs::default(),
436            kv_args.clone(),
437            consistent_reader_args.clone(),
438            jsonrpc_args,
439            jsonrpc_node_args,
440            SystemPackageTaskArgs::default(),
441            jsonrpc_config,
442            registry,
443        )
444        .await
445        .context("Failed to start JSON-RPC server")?;
446
447        let graphql = start_graphql(
448            Some(database_url.clone()),
449            fullnode_args,
450            DbArgs::default(),
451            kv_args,
452            consistent_reader_args,
453            graphql_args,
454            SystemPackageTaskArgs::default(),
455            SubscriptionArgs::default(),
456            "0.0.0",
457            graphql_config,
458            pipelines.iter().map(|p| p.to_string()).collect(),
459            registry,
460        )
461        .await
462        .context("Failed to start GraphQL server")?;
463
464        let services = indexer
465            .merge(consistent_store)
466            .merge(jsonrpc)
467            .merge(graphql)
468            .merge(archival_service);
469
470        Ok(Self {
471            consistent_listen_address,
472            jsonrpc_listen_address,
473            graphql_listen_address,
474            kv_rpc_listen_address: kv_rpc_address,
475            bigtable_client,
476            db,
477            pipelines,
478            services,
479            bigtable_emulator,
480            database,
481            dir,
482        })
483    }
484
485    /// The URL to talk to the database on.
486    pub fn db_url(&self) -> Url {
487        self.database.database().url().clone()
488    }
489
490    /// The URL to send Consistent Store requests to.
491    pub fn consistent_store_url(&self) -> Url {
492        Url::parse(&format!("http://{}/", self.consistent_listen_address))
493            .expect("Failed to parse RPC URL")
494    }
495
496    /// The URL to send JSON-RPC requests to.
497    pub fn jsonrpc_url(&self) -> Url {
498        Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
499            .expect("Failed to parse RPC URL")
500    }
501
502    /// The URL to send GraphQL requests to.
503    pub fn graphql_url(&self) -> Url {
504        Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
505            .expect("Failed to parse RPC URL")
506    }
507
508    /// The URL to send kv-rpc (LedgerService) requests to.
509    pub fn kv_rpc_url(&self) -> Url {
510        Url::parse(&format!("http://{}/", self.kv_rpc_listen_address))
511            .expect("Failed to parse RPC URL")
512    }
513
514    /// Returns the latest checkpoint that we have all data for in the database, according to the
515    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
516    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
517        use watermarks::dsl as w;
518
519        let mut conn = self
520            .db
521            .connect()
522            .await
523            .context("Failed to connect to database")?;
524
525        let latest: HashMap<String, i64> = w::watermarks
526            .select((w::pipeline, w::checkpoint_hi_inclusive))
527            .filter(w::pipeline.eq_any(&self.pipelines))
528            .filter(w::reader_lo.le(w::checkpoint_hi_inclusive))
529            .load(&mut conn)
530            .await?
531            .into_iter()
532            .collect();
533
534        if latest.len() != self.pipelines.len() {
535            return Ok(None);
536        }
537
538        Ok(latest.into_values().min().map(|l| l as u64))
539    }
540
541    /// Returns the latest checkpoint that the pruner is willing to prune up to for the given
542    /// `pipeline`.
543    pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
544        use watermarks::dsl as w;
545
546        let mut conn = self
547            .db
548            .connect()
549            .await
550            .context("Failed to connect to database")?;
551
552        let latest: Option<i64> = w::watermarks
553            .select(w::reader_lo)
554            .filter(w::pipeline.eq(pipeline))
555            .first(&mut conn)
556            .await
557            .optional()?;
558
559        Ok(latest.map(|l| l as u64))
560    }
561
562    /// Returns the latest checkpoint that the consistent store is aware of.
563    pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
564        ConsistentServiceClient::connect(self.consistent_store_url().to_string())
565            .await
566            .context("Failed to connect to Consistent Store")?
567            .available_range(AvailableRangeRequest {})
568            .await
569            .context("Failed to fetch available range from Consistent Store")?
570            .into_inner()
571            .max_checkpoint
572            .context("Consistent Store has not started yet")
573    }
574
575    /// Returns the latest checkpoint that the GraphQL service is aware of.
576    pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
577        let query = json!({
578            "query": "query { checkpoint { sequenceNumber } }"
579        });
580
581        let client = Client::new();
582        let request = client.post(self.graphql_url()).json(&query);
583        let response = request
584            .send()
585            .await
586            .context("Request to GraphQL server failed")?;
587
588        let body: Value = response
589            .json()
590            .await
591            .context("Failed to parse GraphQL response")?;
592
593        let sequence_number = body
594            .pointer("/data/checkpoint/sequenceNumber")
595            .context("Failed to find checkpoint sequence number in response")?;
596
597        let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
598            .context("Failed to parse sequence number as i64")?;
599
600        ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
601
602        Ok(sequence_number as u64)
603    }
604
605    /// Returns the latest epoch that the GraphQL service is aware of.
606    pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
607        let query = json!({
608            "query": "query { epoch { epochId } }"
609        });
610
611        let client = Client::new();
612        let request = client.post(self.graphql_url()).json(&query);
613        let response = request
614            .send()
615            .await
616            .context("Request to GraphQL server failed")?;
617
618        let body: Value = response
619            .json()
620            .await
621            .context("Failed to parse GraphQL response")?;
622
623        let epoch_id = body
624            .pointer("/data/epoch/epochId")
625            .context("Failed to find epochId in response")?;
626
627        let epoch_id: i64 =
628            serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
629
630        ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
631
632        Ok(epoch_id as u64)
633    }
634
635    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
636    /// reached (an error).
637    pub async fn wait_for_indexer(
638        &self,
639        checkpoint: u64,
640        timeout: Duration,
641    ) -> Result<(), Elapsed> {
642        tokio::time::timeout(timeout, async move {
643            let mut interval = interval(Duration::from_millis(200));
644            loop {
645                interval.tick().await;
646                if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
647                    break;
648                }
649            }
650        })
651        .await
652    }
653
654    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
655    /// `pipeline`, or the `timeout` is reached (an error).
656    pub async fn wait_for_pruner(
657        &self,
658        pipeline: &str,
659        checkpoint: u64,
660        timeout: Duration,
661    ) -> Result<(), Elapsed> {
662        tokio::time::timeout(timeout, async move {
663            let mut interval = interval(Duration::from_millis(200));
664            loop {
665                interval.tick().await;
666                if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
667                    break;
668                }
669            }
670        }).await
671    }
672
673    /// Waits until the Consistent Store has caught up to the given `checkpoint`, or the `timeout`
674    /// is reached (an error).
675    pub async fn wait_for_consistent_store(
676        &self,
677        checkpoint: u64,
678        timeout: Duration,
679    ) -> Result<(), Elapsed> {
680        tokio::time::timeout(timeout, async move {
681            let mut interval = interval(Duration::from_millis(200));
682            loop {
683                interval.tick().await;
684                if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
685                    break;
686                }
687            }
688        })
689        .await
690    }
691
692    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is reached
693    /// (an error).
694    pub async fn wait_for_graphql(
695        &self,
696        checkpoint: u64,
697        timeout: Duration,
698    ) -> Result<(), Elapsed> {
699        tokio::time::timeout(timeout, async move {
700            let mut interval = interval(Duration::from_millis(200));
701            loop {
702                interval.tick().await;
703                if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
704                    break;
705                }
706            }
707        })
708        .await
709    }
710
711    /// Waits until the BigTable indexer has caught up to the given `checkpoint`, or the `timeout`
712    /// is reached (an error).
713    pub async fn wait_for_bigtable(
714        &self,
715        checkpoint: u64,
716        timeout: Duration,
717    ) -> Result<(), Elapsed> {
718        let mut client = self.bigtable_client.clone();
719        tokio::time::timeout(timeout, async move {
720            let mut interval = interval(Duration::from_millis(200));
721            loop {
722                interval.tick().await;
723                if client
724                    .get_watermark_for_pipelines(&ALL_PIPELINE_NAMES)
725                    .await
726                    .is_ok_and(|wm| {
727                        wm.is_some_and(|wm| {
728                            wm.checkpoint_hi_inclusive
729                                .is_some_and(|cp| cp >= checkpoint)
730                        })
731                    })
732                {
733                    break;
734                }
735            }
736        })
737        .await
738    }
739}
740
741impl Default for OffchainClusterConfig {
742    fn default() -> Self {
743        Self {
744            indexer_args: Default::default(),
745            consistent_indexer_args: Default::default(),
746            fullnode_args: FullnodeArgs::default(),
747            indexer_config: IndexerConfig::for_test(),
748            consistent_config: ConsistentConfig::for_test(),
749            jsonrpc_config: Default::default(),
750            jsonrpc_node_args: Default::default(),
751            graphql_config: Default::default(),
752            bootstrap_genesis: None,
753        }
754    }
755}
756
757/// Returns ClientArgs that use a temporary local ingestion path and the TempDir of that path.
758pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
759    let temp_dir = tempfile::tempdir()
760        .context("Failed to create data ingestion path")
761        .unwrap();
762    let client_args = ClientArgs {
763        ingestion: IngestionClientArgs {
764            local_ingestion_path: Some(temp_dir.path().to_owned()),
765            ..Default::default()
766        },
767        ..Default::default()
768    };
769    (client_args, temp_dir)
770}
771
772/// Writes a checkpoint file to the given path.
773pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
774    let sequence_number = checkpoint.summary.sequence_number;
775
776    let mask = FieldMask::from_paths([
777        rpc::v2::Checkpoint::path_builder().sequence_number(),
778        rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
779        rpc::v2::Checkpoint::path_builder().signature().finish(),
780        rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
781        rpc::v2::Checkpoint::path_builder()
782            .transactions()
783            .transaction()
784            .bcs()
785            .value(),
786        rpc::v2::Checkpoint::path_builder()
787            .transactions()
788            .effects()
789            .bcs()
790            .value(),
791        rpc::v2::Checkpoint::path_builder()
792            .transactions()
793            .effects()
794            .unchanged_loaded_runtime_objects()
795            .finish(),
796        rpc::v2::Checkpoint::path_builder()
797            .transactions()
798            .events()
799            .bcs()
800            .value(),
801        rpc::v2::Checkpoint::path_builder()
802            .objects()
803            .objects()
804            .bcs()
805            .value(),
806    ]);
807
808    let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
809    let proto_bytes = proto_checkpoint.encode_to_vec();
810    let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
811
812    let file_name = format!("{}.binpb.zst", sequence_number);
813    let file_path = path.join(file_name);
814    fs::write(file_path, compressed)?;
815    Ok(())
816}
817
818/// Start the archival stack: BigTable emulator, BigTable indexer, and sui-kv-rpc.
819async fn start_archival(
820    client_args: ClientArgs,
821    kv_rpc_address: SocketAddr,
822    registry: &prometheus::Registry,
823) -> anyhow::Result<(BigTableClient, BigTableEmulator, Service)> {
824    let emulator = tokio::task::spawn_blocking(BigTableEmulator::start)
825        .await
826        .context("spawn_blocking panicked")?
827        .context("Failed to start BigTable emulator")?;
828
829    create_tables(emulator.host(), INSTANCE_ID)
830        .await
831        .context("Failed to create BigTable tables")?;
832
833    let bigtable_client =
834        BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
835            .await
836            .context("Failed to create BigTable client")?;
837
838    let store = BigTableStore::new(
839        BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
840            .await
841            .context("Failed to create BigTable client for indexer")?,
842    );
843    let bt_indexer = BigTableIndexer::new(
844        store,
845        IndexerArgs::default(),
846        client_args,
847        BtIngestionConfig::default(),
848        CommitterConfig::default(),
849        BtIndexerConfig::default(),
850        PipelineLayer::default(),
851        Chain::Unknown,
852        &ALPHA_PIPELINE_NAMES,
853        registry,
854    )
855    .await
856    .context("Failed to create BigTable indexer")?;
857
858    let bt_indexer_service = bt_indexer
859        .indexer
860        .run()
861        .await
862        .context("Failed to start BigTable indexer")?;
863
864    let kv_rpc_server = KvRpcServer::new_local(
865        emulator.host().to_string(),
866        INSTANCE_ID.to_string(),
867        None,
868        None,
869    )
870    .await
871    .context("Failed to create KvRpcServer")?;
872    let kv_rpc_service = kv_rpc_server
873        .start_service(kv_rpc_address, sui_kv_rpc::ServerConfig::default())
874        .await
875        .context("Failed to start kv-rpc server")?;
876
877    let service = bt_indexer_service.merge(kv_rpc_service);
878    Ok((bigtable_client, emulator, service))
879}