sui_indexer_alt_e2e_tests/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::AdvanceEpochConfig;
23use simulacrum::Simulacrum;
24use sui_futures::service::Service;
25use sui_indexer_alt::BootstrapGenesis;
26use sui_indexer_alt::config::IndexerConfig;
27use sui_indexer_alt::setup_indexer;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
29use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
30use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
31use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
32use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
33use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
34use sui_indexer_alt_framework::IndexerArgs;
35use sui_indexer_alt_framework::ingestion::ClientArgs;
36use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
37use sui_indexer_alt_framework::pipeline::CommitterConfig;
38use sui_indexer_alt_framework::postgres::schema::watermarks;
39use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
40use sui_indexer_alt_graphql::args::SubscriptionArgs;
41use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
42use sui_indexer_alt_graphql::start_rpc as start_graphql;
43use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
44use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
45use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
46use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
47use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
48use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
49use sui_indexer_alt_reader::kv_loader::KvArgs;
50use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
51use sui_kv_rpc::KvRpcConfig;
52use sui_kv_rpc::KvRpcServer;
53use sui_kvstore::ALL_PIPELINE_NAMES;
54use sui_kvstore::ALPHA_PIPELINE_NAMES;
55use sui_kvstore::BigTableClient;
56use sui_kvstore::BigTableIndexer;
57use sui_kvstore::BigTableStore;
58use sui_kvstore::IndexerConfig as BtIndexerConfig;
59use sui_kvstore::IngestionConfig as BtIngestionConfig;
60use sui_kvstore::KeyValueStoreReader;
61use sui_kvstore::PipelineLayer;
62use sui_kvstore::testing::BigTableEmulator;
63use sui_kvstore::testing::INSTANCE_ID;
64use sui_kvstore::testing::create_tables;
65use sui_pg_db::Db;
66use sui_pg_db::DbArgs;
67use sui_pg_db::temp::TempDb;
68use sui_pg_db::temp::get_available_port;
69use sui_protocol_config::Chain;
70use sui_rpc::field::FieldMask;
71use sui_rpc::field::FieldMaskUtil;
72use sui_rpc::merge::Merge;
73use sui_rpc::proto::sui::rpc;
74use sui_types::base_types::ObjectRef;
75use sui_types::base_types::SuiAddress;
76use sui_types::crypto::AccountKeyPair;
77use sui_types::effects::TransactionEffects;
78use sui_types::error::ExecutionError;
79use sui_types::full_checkpoint_content::Checkpoint;
80use sui_types::messages_checkpoint::VerifiedCheckpoint;
81use sui_types::transaction::Transaction;
82use tempfile::TempDir;
83use tokio::time::error::Elapsed;
84use tokio::time::interval;
85use tokio::try_join;
86use url::Url;
87
88pub mod coin_registry;
89pub mod find;
90pub mod graphql;
91pub mod move_helpers;
92pub mod transaction;
93
94/// A simulation of the network, accompanied by off-chain services (database, indexer, RPC),
95/// connected by local data ingestion.
96pub struct FullCluster {
97    /// A simulation of the network, executing transactions and producing checkpoints.
98    executor: Simulacrum,
99
100    /// The off-chain services (database, indexer, RPC) that are ingesting data from the
101    /// simulation.
102    offchain: OffchainCluster,
103
104    /// Temporary directory to store checkpoint information in, so that the indexer can pick it up.
105    #[allow(unused)]
106    temp_dir: TempDir,
107}
108
109/// A collection of the off-chain services (an indexer, a database, and JSON-RPC/GraphQL servers
110/// that read from that database), grouped together to simplify set-up and tear-down for tests. The
111/// included RPC servers do not support transaction dry run and execution.
112///
113/// The database is temporary, and will be cleaned up when the cluster is dropped, and the RPCs are
114/// set-up to listen on a random, available port, to avoid conflicts when multiple instances are
115/// running concurrently in the same process.
116pub struct OffchainCluster {
117    /// The address the consistent store is listening on.
118    consistent_listen_address: SocketAddr,
119
120    /// The address the JSON-RPC server is listening on.
121    jsonrpc_listen_address: SocketAddr,
122
123    /// The address the GraphQL server is listening on.
124    graphql_listen_address: SocketAddr,
125
126    /// The address the kv-rpc (LedgerService) server is listening on.
127    kv_rpc_listen_address: SocketAddr,
128
129    /// Read access to BigTable.
130    bigtable_client: BigTableClient,
131
132    /// Read access to the temporary database.
133    db: Db,
134
135    /// The pipelines that the indexer is populating.
136    pipelines: Vec<&'static str>,
137
138    /// Handles to all running services. Held on to so the services are not dropped (and therefore
139    /// aborted) until the cluster is stopped.
140    #[allow(unused)]
141    services: Service,
142
143    /// Handle to the BigTable emulator process.
144    #[allow(unused)]
145    bigtable_emulator: BigTableEmulator,
146
147    /// Hold on to the database so it doesn't get dropped until the cluster is stopped.
148    #[allow(unused)]
149    database: TempDb,
150
151    /// Hold on to the temporary directory where the consistent store writes its data, so it
152    /// doesn't get cleaned up until the cluster is stopped.
153    #[allow(unused)]
154    dir: TempDir,
155}
156
157pub struct OffchainClusterConfig {
158    pub indexer_args: IndexerArgs,
159    pub consistent_indexer_args: IndexerArgs,
160    pub fullnode_args: FullnodeArgs,
161    pub indexer_config: IndexerConfig,
162    pub consistent_config: ConsistentConfig,
163    pub jsonrpc_config: JsonRpcConfig,
164    pub jsonrpc_node_args: JsonRpcNodeArgs,
165    pub graphql_config: GraphQlConfig,
166    pub bootstrap_genesis: Option<BootstrapGenesis>,
167    pub kv_rpc_config: KvRpcConfig,
168}
169
170impl FullCluster {
171    /// Creates a cluster with a fresh executor where the off-chain services are set up with a
172    /// default configuration.
173    pub async fn new() -> anyhow::Result<Self> {
174        Self::new_with_configs(
175            Simulacrum::new(),
176            OffchainClusterConfig::default(),
177            &prometheus::Registry::new(),
178        )
179        .await
180    }
181
182    /// Creates a new cluster executing transactions using `executor`. The indexer is configured
183    /// using `indexer_args` and `indexer_config, the JSON-RPC server is configured using
184    /// `jsonrpc_config`, and the GraphQL server is configured using `graphql_config`.
185    pub async fn new_with_configs(
186        mut executor: Simulacrum,
187        offchain_cluster_config: OffchainClusterConfig,
188        registry: &prometheus::Registry,
189    ) -> anyhow::Result<Self> {
190        let (client_args, temp_dir) = local_ingestion_client_args();
191        executor.set_data_ingestion_path(temp_dir.path().to_owned());
192
193        let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
194            .await
195            .context("Failed to create off-chain cluster")?;
196
197        Ok(Self {
198            executor,
199            offchain,
200            temp_dir,
201        })
202    }
203
204    /// Return the reference gas price for the current epoch
205    pub fn reference_gas_price(&self) -> u64 {
206        self.executor.reference_gas_price()
207    }
208
209    /// Create a new account and credit it with `amount` gas units from a faucet account. Returns
210    /// the account, its keypair, and a reference to the gas object it was funded with.
211    pub fn funded_account(
212        &mut self,
213        amount: u64,
214    ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
215        self.executor.funded_account(amount)
216    }
217
218    /// Request gas from the faucet, sent to `address`. Return the object reference of the gas
219    /// object that was sent.
220    pub fn request_gas(
221        &mut self,
222        address: SuiAddress,
223        amount: u64,
224    ) -> anyhow::Result<TransactionEffects> {
225        self.executor.request_gas(address, amount)
226    }
227
228    /// Execute a signed transaction, returning its effects.
229    pub fn execute_transaction(
230        &mut self,
231        tx: Transaction,
232    ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
233        self.executor.execute_transaction(tx)
234    }
235
236    /// Execute a system transaction advancing the lock by the given `duration`.
237    pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
238        self.executor.advance_clock(duration)
239    }
240
241    /// Advance the executor into the next epoch. This executes an end-of-epoch transaction and
242    /// creates the epoch's final checkpoint, but does not wait for the off-chain services to ingest
243    /// it — follow with [`create_checkpoint`](Self::create_checkpoint) to sync.
244    pub fn advance_epoch(&mut self) {
245        self.executor.advance_epoch(AdvanceEpochConfig::default());
246    }
247
248    /// Create a new checkpoint containing the transactions executed since the last checkpoint that
249    /// was created, and wait for the off-chain services to ingest it. Returns the checkpoint
250    /// contents.
251    pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
252        let checkpoint = self.executor.create_checkpoint();
253        let timeout = Duration::from_secs(100);
254        let indexer = self
255            .offchain
256            .wait_for_indexer(checkpoint.sequence_number, timeout);
257        let consistent_store = self
258            .offchain
259            .wait_for_consistent_store(checkpoint.sequence_number, timeout);
260        let graphql = self
261            .offchain
262            .wait_for_graphql(checkpoint.sequence_number, timeout);
263        let bigtable = self
264            .offchain
265            .wait_for_bigtable(checkpoint.sequence_number, timeout);
266
267        try_join!(indexer, consistent_store, graphql, bigtable)
268            .expect("Timed out waiting for off-chain services");
269
270        checkpoint
271    }
272
273    /// The URL to talk to the database on.
274    pub fn db_url(&self) -> Url {
275        self.offchain.db_url()
276    }
277
278    /// The URL to send Consistent Store requests to.
279    pub fn consistent_store_url(&self) -> Url {
280        self.offchain.consistent_store_url()
281    }
282
283    /// The URL to send JSON-RPC requests to.
284    pub fn jsonrpc_url(&self) -> Url {
285        self.offchain.jsonrpc_url()
286    }
287
288    /// The URL to send GraphQL requests to.
289    pub fn graphql_url(&self) -> Url {
290        self.offchain.graphql_url()
291    }
292
293    /// The URL to send kv-rpc (LedgerService) requests to.
294    pub fn kv_rpc_url(&self) -> Url {
295        self.offchain.kv_rpc_url()
296    }
297
298    /// Returns the latest checkpoint that we have all data for in the database, according to the
299    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
300    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
301        self.offchain.latest_checkpoint().await
302    }
303
304    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
305    /// reached (an error).
306    pub async fn wait_for_indexer(
307        &self,
308        checkpoint: u64,
309        timeout: Duration,
310    ) -> Result<(), Elapsed> {
311        self.offchain.wait_for_indexer(checkpoint, timeout).await
312    }
313
314    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
315    /// `pipeline`, or the `timeout` is reached (an error).
316    pub async fn wait_for_pruner(
317        &self,
318        pipeline: &str,
319        checkpoint: u64,
320        timeout: Duration,
321    ) -> Result<(), Elapsed> {
322        self.offchain
323            .wait_for_pruner(pipeline, checkpoint, timeout)
324            .await
325    }
326
327    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is
328    /// reached (an error).
329    pub async fn wait_for_graphql(
330        &self,
331        checkpoint: u64,
332        timeout: Duration,
333    ) -> Result<(), Elapsed> {
334        self.offchain.wait_for_graphql(checkpoint, timeout).await
335    }
336}
337
338impl OffchainCluster {
339    /// Construct a new off-chain cluster and spin up its constituent services.
340    ///
341    /// - `indexer_args`, `client_args`, and `indexer_config` control the indexer. In particular
342    ///   `client_args` is used to configure the client that the indexer uses to fetch checkpoints.
343    /// - `jsonrpc_config` controls the JSON-RPC server.
344    /// - `graphql_config` controls the GraphQL server.
345    /// - `registry` is used to register metrics for the indexer, JSON-RPC, and GraphQL servers.
346    pub async fn new(
347        client_args: ClientArgs,
348        OffchainClusterConfig {
349            indexer_args,
350            consistent_indexer_args,
351            fullnode_args,
352            indexer_config,
353            consistent_config,
354            jsonrpc_config,
355            jsonrpc_node_args,
356            graphql_config,
357            bootstrap_genesis,
358            kv_rpc_config,
359        }: OffchainClusterConfig,
360        registry: &prometheus::Registry,
361    ) -> anyhow::Result<Self> {
362        let consistent_port = get_available_port();
363        let consistent_listen_address =
364            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
365
366        let jsonrpc_port = get_available_port();
367        let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
368
369        let graphql_port = get_available_port();
370        let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
371
372        let kv_rpc_port = get_available_port();
373        let kv_rpc_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), kv_rpc_port);
374
375        let database = TempDb::new().context("Failed to create database")?;
376        let database_url = database.database().url();
377
378        let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
379        let rocksdb_path = dir.path().join("rocksdb");
380
381        let consistent_args = ConsistentArgs {
382            rpc_listen_address: consistent_listen_address,
383            tls: ConsistentTlsArgs::default(),
384        };
385
386        let jsonrpc_args = JsonRpcArgs {
387            rpc_listen_address: jsonrpc_listen_address,
388            ..Default::default()
389        };
390
391        let graphql_args = GraphQlArgs {
392            rpc_listen_address: graphql_listen_address,
393            no_ide: true,
394        };
395
396        let db = Db::for_read(database_url.clone(), DbArgs::default())
397            .await
398            .context("Failed to connect to database")?;
399
400        let indexer = setup_indexer(
401            database_url.clone(),
402            DbArgs::default(),
403            indexer_args,
404            client_args.clone(),
405            indexer_config,
406            bootstrap_genesis,
407            registry,
408        )
409        .await
410        .context("Failed to setup indexer")?;
411
412        let pipelines: Vec<_> = indexer.pipelines().collect();
413        let indexer = indexer.run().await.context("Failed to start indexer")?;
414
415        let consistent_store = start_consistent_store(
416            rocksdb_path,
417            consistent_indexer_args,
418            client_args.clone(),
419            consistent_args,
420            "0.0.0",
421            consistent_config,
422            registry,
423        )
424        .await
425        .context("Failed to start Consistent Store")?;
426
427        let consistent_reader_args = ConsistentReaderArgs {
428            consistent_store_url: Some(
429                Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
430            ),
431            ..Default::default()
432        };
433
434        let (bigtable_client, bigtable_emulator, archival_service) =
435            start_archival(client_args.clone(), kv_rpc_address, kv_rpc_config, registry).await?;
436
437        let kv_args = KvArgs {
438            ledger_grpc_url: Some(
439                format!("http://{kv_rpc_address}")
440                    .parse()
441                    .expect("Failed to parse kv-rpc URI"),
442            ),
443            ..Default::default()
444        };
445
446        let jsonrpc = start_jsonrpc(
447            Some(database_url.clone()),
448            DbArgs::default(),
449            kv_args.clone(),
450            consistent_reader_args.clone(),
451            jsonrpc_args,
452            jsonrpc_node_args,
453            SystemPackageTaskArgs::default(),
454            jsonrpc_config,
455            registry,
456        )
457        .await
458        .context("Failed to start JSON-RPC server")?;
459
460        let graphql = start_graphql(
461            Some(database_url.clone()),
462            fullnode_args,
463            DbArgs::default(),
464            kv_args,
465            consistent_reader_args,
466            graphql_args,
467            SystemPackageTaskArgs::default(),
468            SubscriptionArgs::default(),
469            "0.0.0",
470            graphql_config,
471            pipelines.iter().map(|p| p.to_string()).collect(),
472            registry,
473        )
474        .await
475        .context("Failed to start GraphQL server")?;
476
477        let services = indexer
478            .merge(consistent_store)
479            .merge(jsonrpc)
480            .merge(graphql)
481            .merge(archival_service);
482
483        Ok(Self {
484            consistent_listen_address,
485            jsonrpc_listen_address,
486            graphql_listen_address,
487            kv_rpc_listen_address: kv_rpc_address,
488            bigtable_client,
489            db,
490            pipelines,
491            services,
492            bigtable_emulator,
493            database,
494            dir,
495        })
496    }
497
498    /// The URL to talk to the database on.
499    pub fn db_url(&self) -> Url {
500        self.database.database().url().clone()
501    }
502
503    /// The URL to send Consistent Store requests to.
504    pub fn consistent_store_url(&self) -> Url {
505        Url::parse(&format!("http://{}/", self.consistent_listen_address))
506            .expect("Failed to parse RPC URL")
507    }
508
509    /// The URL to send JSON-RPC requests to.
510    pub fn jsonrpc_url(&self) -> Url {
511        Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
512            .expect("Failed to parse RPC URL")
513    }
514
515    /// The URL to send GraphQL requests to.
516    pub fn graphql_url(&self) -> Url {
517        Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
518            .expect("Failed to parse RPC URL")
519    }
520
521    /// The URL to send kv-rpc (LedgerService) requests to.
522    pub fn kv_rpc_url(&self) -> Url {
523        Url::parse(&format!("http://{}/", self.kv_rpc_listen_address))
524            .expect("Failed to parse RPC URL")
525    }
526
527    /// Returns the latest checkpoint that we have all data for in the database, according to the
528    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
529    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
530        use watermarks::dsl as w;
531
532        let mut conn = self
533            .db
534            .connect()
535            .await
536            .context("Failed to connect to database")?;
537
538        let latest: HashMap<String, i64> = w::watermarks
539            .select((w::pipeline, w::checkpoint_hi_inclusive))
540            .filter(w::pipeline.eq_any(&self.pipelines))
541            .filter(w::reader_lo.le(w::checkpoint_hi_inclusive))
542            .load(&mut conn)
543            .await?
544            .into_iter()
545            .collect();
546
547        if latest.len() != self.pipelines.len() {
548            return Ok(None);
549        }
550
551        Ok(latest.into_values().min().map(|l| l as u64))
552    }
553
554    /// Returns the latest checkpoint that the pruner is willing to prune up to for the given
555    /// `pipeline`.
556    pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
557        use watermarks::dsl as w;
558
559        let mut conn = self
560            .db
561            .connect()
562            .await
563            .context("Failed to connect to database")?;
564
565        let latest: Option<i64> = w::watermarks
566            .select(w::reader_lo)
567            .filter(w::pipeline.eq(pipeline))
568            .first(&mut conn)
569            .await
570            .optional()?;
571
572        Ok(latest.map(|l| l as u64))
573    }
574
575    /// Returns the latest checkpoint that the consistent store is aware of.
576    pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
577        ConsistentServiceClient::connect(self.consistent_store_url().to_string())
578            .await
579            .context("Failed to connect to Consistent Store")?
580            .available_range(AvailableRangeRequest {})
581            .await
582            .context("Failed to fetch available range from Consistent Store")?
583            .into_inner()
584            .max_checkpoint
585            .context("Consistent Store has not started yet")
586    }
587
588    /// Returns the latest checkpoint that the GraphQL service is aware of.
589    pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
590        let query = json!({
591            "query": "query { checkpoint { sequenceNumber } }"
592        });
593
594        let client = Client::new();
595        let request = client.post(self.graphql_url()).json(&query);
596        let response = request
597            .send()
598            .await
599            .context("Request to GraphQL server failed")?;
600
601        let body: Value = response
602            .json()
603            .await
604            .context("Failed to parse GraphQL response")?;
605
606        let sequence_number = body
607            .pointer("/data/checkpoint/sequenceNumber")
608            .context("Failed to find checkpoint sequence number in response")?;
609
610        let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
611            .context("Failed to parse sequence number as i64")?;
612
613        ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
614
615        Ok(sequence_number as u64)
616    }
617
618    /// Returns the latest epoch that the GraphQL service is aware of.
619    pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
620        let query = json!({
621            "query": "query { epoch { epochId } }"
622        });
623
624        let client = Client::new();
625        let request = client.post(self.graphql_url()).json(&query);
626        let response = request
627            .send()
628            .await
629            .context("Request to GraphQL server failed")?;
630
631        let body: Value = response
632            .json()
633            .await
634            .context("Failed to parse GraphQL response")?;
635
636        let epoch_id = body
637            .pointer("/data/epoch/epochId")
638            .context("Failed to find epochId in response")?;
639
640        let epoch_id: i64 =
641            serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
642
643        ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
644
645        Ok(epoch_id as u64)
646    }
647
648    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
649    /// reached (an error).
650    pub async fn wait_for_indexer(
651        &self,
652        checkpoint: u64,
653        timeout: Duration,
654    ) -> Result<(), Elapsed> {
655        tokio::time::timeout(timeout, async move {
656            let mut interval = interval(Duration::from_millis(200));
657            loop {
658                interval.tick().await;
659                if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
660                    break;
661                }
662            }
663        })
664        .await
665    }
666
667    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
668    /// `pipeline`, or the `timeout` is reached (an error).
669    pub async fn wait_for_pruner(
670        &self,
671        pipeline: &str,
672        checkpoint: u64,
673        timeout: Duration,
674    ) -> Result<(), Elapsed> {
675        tokio::time::timeout(timeout, async move {
676            let mut interval = interval(Duration::from_millis(200));
677            loop {
678                interval.tick().await;
679                if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
680                    break;
681                }
682            }
683        }).await
684    }
685
686    /// Waits until the Consistent Store has caught up to the given `checkpoint`, or the `timeout`
687    /// is reached (an error).
688    pub async fn wait_for_consistent_store(
689        &self,
690        checkpoint: u64,
691        timeout: Duration,
692    ) -> Result<(), Elapsed> {
693        tokio::time::timeout(timeout, async move {
694            let mut interval = interval(Duration::from_millis(200));
695            loop {
696                interval.tick().await;
697                if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
698                    break;
699                }
700            }
701        })
702        .await
703    }
704
705    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is reached
706    /// (an error).
707    pub async fn wait_for_graphql(
708        &self,
709        checkpoint: u64,
710        timeout: Duration,
711    ) -> Result<(), Elapsed> {
712        tokio::time::timeout(timeout, async move {
713            let mut interval = interval(Duration::from_millis(200));
714            loop {
715                interval.tick().await;
716                if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
717                    break;
718                }
719            }
720        })
721        .await
722    }
723
724    /// Waits until the BigTable indexer has caught up to the given `checkpoint`, or the `timeout`
725    /// is reached (an error).
726    pub async fn wait_for_bigtable(
727        &self,
728        checkpoint: u64,
729        timeout: Duration,
730    ) -> Result<(), Elapsed> {
731        let mut client = self.bigtable_client.clone();
732        tokio::time::timeout(timeout, async move {
733            let mut interval = interval(Duration::from_millis(200));
734            loop {
735                interval.tick().await;
736                if client
737                    .get_watermark_for_pipelines(&ALL_PIPELINE_NAMES)
738                    .await
739                    .is_ok_and(|wm| {
740                        wm.is_some_and(|wm| {
741                            wm.checkpoint_hi_inclusive
742                                .is_some_and(|cp| cp >= checkpoint)
743                        })
744                    })
745                {
746                    break;
747                }
748            }
749        })
750        .await
751    }
752}
753
754impl Default for OffchainClusterConfig {
755    fn default() -> Self {
756        Self {
757            indexer_args: Default::default(),
758            consistent_indexer_args: Default::default(),
759            fullnode_args: FullnodeArgs::default(),
760            indexer_config: IndexerConfig::for_test(),
761            consistent_config: ConsistentConfig::for_test(),
762            jsonrpc_config: Default::default(),
763            jsonrpc_node_args: Default::default(),
764            graphql_config: Default::default(),
765            bootstrap_genesis: None,
766            kv_rpc_config: KvRpcConfig::default(),
767        }
768    }
769}
770
771/// Returns ClientArgs that use a temporary local ingestion path and the TempDir of that path.
772pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
773    let temp_dir = tempfile::tempdir()
774        .context("Failed to create data ingestion path")
775        .unwrap();
776    let client_args = ClientArgs {
777        ingestion: IngestionClientArgs {
778            local_ingestion_path: Some(temp_dir.path().to_owned()),
779            ..Default::default()
780        },
781        ..Default::default()
782    };
783    (client_args, temp_dir)
784}
785
786/// Writes a checkpoint file to the given path.
787pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
788    let sequence_number = checkpoint.summary.sequence_number;
789
790    let mask = FieldMask::from_paths([
791        rpc::v2::Checkpoint::path_builder().sequence_number(),
792        rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
793        rpc::v2::Checkpoint::path_builder().signature().finish(),
794        rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
795        rpc::v2::Checkpoint::path_builder()
796            .transactions()
797            .transaction()
798            .bcs()
799            .value(),
800        rpc::v2::Checkpoint::path_builder()
801            .transactions()
802            .effects()
803            .bcs()
804            .value(),
805        rpc::v2::Checkpoint::path_builder()
806            .transactions()
807            .effects()
808            .unchanged_loaded_runtime_objects()
809            .finish(),
810        rpc::v2::Checkpoint::path_builder()
811            .transactions()
812            .events()
813            .bcs()
814            .value(),
815        rpc::v2::Checkpoint::path_builder()
816            .objects()
817            .objects()
818            .bcs()
819            .value(),
820    ]);
821
822    let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
823    let proto_bytes = proto_checkpoint.encode_to_vec();
824    let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
825
826    let file_name = format!("{}.binpb.zst", sequence_number);
827    let file_path = path.join(file_name);
828    fs::write(file_path, compressed)?;
829    Ok(())
830}
831
832/// Start the archival stack: BigTable emulator, BigTable indexer, and sui-kv-rpc.
833async fn start_archival(
834    client_args: ClientArgs,
835    kv_rpc_address: SocketAddr,
836    kv_rpc_config: KvRpcConfig,
837    registry: &prometheus::Registry,
838) -> anyhow::Result<(BigTableClient, BigTableEmulator, Service)> {
839    let emulator = tokio::task::spawn_blocking(BigTableEmulator::start)
840        .await
841        .context("spawn_blocking panicked")?
842        .context("Failed to start BigTable emulator")?;
843
844    create_tables(emulator.host(), INSTANCE_ID)
845        .await
846        .context("Failed to create BigTable tables")?;
847
848    let bigtable_client =
849        BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
850            .await
851            .context("Failed to create BigTable client")?;
852
853    let store = BigTableStore::new(
854        BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.to_string())
855            .await
856            .context("Failed to create BigTable client for indexer")?,
857    );
858    let bt_indexer = BigTableIndexer::new(
859        store,
860        IndexerArgs::default(),
861        client_args,
862        BtIngestionConfig::default(),
863        CommitterConfig::default(),
864        BtIndexerConfig::default(),
865        PipelineLayer::default(),
866        Chain::Unknown,
867        &ALPHA_PIPELINE_NAMES,
868        registry,
869    )
870    .await
871    .context("Failed to create BigTable indexer")?;
872
873    // Use the BigTable wrapper, not the raw framework indexer, so bitmap
874    // committer background tasks are supervised for the duration of the test.
875    let bt_indexer_service = bt_indexer
876        .run()
877        .await
878        .context("Failed to start BigTable indexer")?;
879
880    let kv_rpc_server = KvRpcServer::new_local_with_config(
881        emulator.host().to_string(),
882        INSTANCE_ID.to_string(),
883        None,
884        kv_rpc_config.ledger_history(),
885        kv_rpc_config.request_bigtable_concurrency(),
886        kv_rpc_config.stages(),
887    )
888    .await
889    .context("Failed to create KvRpcServer")?;
890    let kv_rpc_service = kv_rpc_server
891        .start_service(
892            kv_rpc_address,
893            sui_kv_rpc::ServerConfig {
894                enable_experimental_query_apis: true,
895                ..Default::default()
896            },
897        )
898        .await
899        .context("Failed to start kv-rpc server")?;
900
901    let service = bt_indexer_service.merge(kv_rpc_service);
902    Ok((bigtable_client, emulator, service))
903}