sui_indexer_alt_e2e_tests/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::fs;
6use std::net::IpAddr;
7use std::net::Ipv4Addr;
8use std::net::SocketAddr;
9use std::path::Path;
10use std::time::Duration;
11
12use anyhow::Context;
13use anyhow::ensure;
14use diesel::ExpressionMethods;
15use diesel::OptionalExtension;
16use diesel::QueryDsl;
17use diesel_async::RunQueryDsl;
18use prost::Message;
19use reqwest::Client;
20use serde_json::Value;
21use serde_json::json;
22use simulacrum::Simulacrum;
23use sui_futures::service::Service;
24use sui_indexer_alt::BootstrapGenesis;
25use sui_indexer_alt::config::IndexerConfig;
26use sui_indexer_alt::setup_indexer;
27use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::AvailableRangeRequest;
28use sui_indexer_alt_consistent_api::proto::rpc::consistent::v1alpha::consistent_service_client::ConsistentServiceClient;
29use sui_indexer_alt_consistent_store::args::RpcArgs as ConsistentArgs;
30use sui_indexer_alt_consistent_store::args::TlsArgs as ConsistentTlsArgs;
31use sui_indexer_alt_consistent_store::config::ServiceConfig as ConsistentConfig;
32use sui_indexer_alt_consistent_store::start_service as start_consistent_store;
33use sui_indexer_alt_framework::IndexerArgs;
34use sui_indexer_alt_framework::ingestion::ClientArgs;
35use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientArgs;
36use sui_indexer_alt_framework::postgres::schema::watermarks;
37use sui_indexer_alt_graphql::RpcArgs as GraphQlArgs;
38use sui_indexer_alt_graphql::args::KvArgs as GraphQlKvArgs;
39use sui_indexer_alt_graphql::config::RpcConfig as GraphQlConfig;
40use sui_indexer_alt_graphql::start_rpc as start_graphql;
41use sui_indexer_alt_jsonrpc::NodeArgs as JsonRpcNodeArgs;
42use sui_indexer_alt_jsonrpc::RpcArgs as JsonRpcArgs;
43use sui_indexer_alt_jsonrpc::config::RpcConfig as JsonRpcConfig;
44use sui_indexer_alt_jsonrpc::start_rpc as start_jsonrpc;
45use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
46use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
47use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
48use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
49use sui_pg_db::Db;
50use sui_pg_db::DbArgs;
51use sui_pg_db::temp::TempDb;
52use sui_pg_db::temp::get_available_port;
53use sui_rpc::field::FieldMask;
54use sui_rpc::field::FieldMaskUtil;
55use sui_rpc::merge::Merge;
56use sui_rpc::proto::sui::rpc;
57use sui_types::base_types::ObjectRef;
58use sui_types::base_types::SuiAddress;
59use sui_types::crypto::AccountKeyPair;
60use sui_types::effects::TransactionEffects;
61use sui_types::error::ExecutionError;
62use sui_types::full_checkpoint_content::Checkpoint;
63use sui_types::messages_checkpoint::VerifiedCheckpoint;
64use sui_types::transaction::Transaction;
65use tempfile::TempDir;
66use tokio::time::error::Elapsed;
67use tokio::time::interval;
68use tokio::try_join;
69use url::Url;
70
71pub mod coin_registry;
72pub mod find;
73
74/// A simulation of the network, accompanied by off-chain services (database, indexer, RPC),
75/// connected by local data ingestion.
76pub struct FullCluster {
77    /// A simulation of the network, executing transactions and producing checkpoints.
78    executor: Simulacrum,
79
80    /// The off-chain services (database, indexer, RPC) that are ingesting data from the
81    /// simulation.
82    offchain: OffchainCluster,
83
84    /// Temporary directory to store checkpoint information in, so that the indexer can pick it up.
85    #[allow(unused)]
86    temp_dir: TempDir,
87}
88
89/// A collection of the off-chain services (an indexer, a database, and JSON-RPC/GraphQL servers
90/// that read from that database), grouped together to simplify set-up and tear-down for tests. The
91/// included RPC servers do not support transaction dry run and execution.
92///
93/// The database is temporary, and will be cleaned up when the cluster is dropped, and the RPCs are
94/// set-up to listen on a random, available port, to avoid conflicts when multiple instances are
95/// running concurrently in the same process.
96pub struct OffchainCluster {
97    /// The address the consistent store is listening on.
98    consistent_listen_address: SocketAddr,
99
100    /// The address the JSON-RPC server is listening on.
101    jsonrpc_listen_address: SocketAddr,
102
103    /// The address the GraphQL server is listening on.
104    graphql_listen_address: SocketAddr,
105
106    /// Read access to the temporary database.
107    db: Db,
108
109    /// The pipelines that the indexer is populating.
110    pipelines: Vec<&'static str>,
111
112    /// Handles to all running services. Held on to so the services are not dropped (and therefore
113    /// aborted) until the cluster is stopped.
114    #[allow(unused)]
115    services: Service,
116
117    /// Hold on to the database so it doesn't get dropped until the cluster is stopped.
118    #[allow(unused)]
119    database: TempDb,
120
121    /// Hold on to the temporary directory where the consistent store writes its data, so it
122    /// doesn't get cleaned up until the cluster is stopped.
123    #[allow(unused)]
124    dir: TempDir,
125}
126
127pub struct OffchainClusterConfig {
128    pub indexer_args: IndexerArgs,
129    pub consistent_indexer_args: IndexerArgs,
130    pub fullnode_args: FullnodeArgs,
131    pub indexer_config: IndexerConfig,
132    pub consistent_config: ConsistentConfig,
133    pub jsonrpc_config: JsonRpcConfig,
134    pub graphql_config: GraphQlConfig,
135    pub bootstrap_genesis: Option<BootstrapGenesis>,
136}
137
138impl FullCluster {
139    /// Creates a cluster with a fresh executor where the off-chain services are set up with a
140    /// default configuration.
141    pub async fn new() -> anyhow::Result<Self> {
142        Self::new_with_configs(
143            Simulacrum::new(),
144            OffchainClusterConfig::default(),
145            &prometheus::Registry::new(),
146        )
147        .await
148    }
149
150    /// Creates a new cluster executing transactions using `executor`. The indexer is configured
151    /// using `indexer_args` and `indexer_config, the JSON-RPC server is configured using
152    /// `jsonrpc_config`, and the GraphQL server is configured using `graphql_config`.
153    pub async fn new_with_configs(
154        mut executor: Simulacrum,
155        offchain_cluster_config: OffchainClusterConfig,
156        registry: &prometheus::Registry,
157    ) -> anyhow::Result<Self> {
158        let (client_args, temp_dir) = local_ingestion_client_args();
159        executor.set_data_ingestion_path(temp_dir.path().to_owned());
160
161        let offchain = OffchainCluster::new(client_args, offchain_cluster_config, registry)
162            .await
163            .context("Failed to create off-chain cluster")?;
164
165        Ok(Self {
166            executor,
167            offchain,
168            temp_dir,
169        })
170    }
171
172    /// Return the reference gas price for the current epoch
173    pub fn reference_gas_price(&self) -> u64 {
174        self.executor.reference_gas_price()
175    }
176
177    /// Create a new account and credit it with `amount` gas units from a faucet account. Returns
178    /// the account, its keypair, and a reference to the gas object it was funded with.
179    pub fn funded_account(
180        &mut self,
181        amount: u64,
182    ) -> anyhow::Result<(SuiAddress, AccountKeyPair, ObjectRef)> {
183        self.executor.funded_account(amount)
184    }
185
186    /// Request gas from the faucet, sent to `address`. Return the object reference of the gas
187    /// object that was sent.
188    pub fn request_gas(
189        &mut self,
190        address: SuiAddress,
191        amount: u64,
192    ) -> anyhow::Result<TransactionEffects> {
193        self.executor.request_gas(address, amount)
194    }
195
196    /// Execute a signed transaction, returning its effects.
197    pub fn execute_transaction(
198        &mut self,
199        tx: Transaction,
200    ) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
201        self.executor.execute_transaction(tx)
202    }
203
204    /// Execute a system transaction advancing the lock by the given `duration`.
205    pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
206        self.executor.advance_clock(duration)
207    }
208
209    /// Create a new checkpoint containing the transactions executed since the last checkpoint that
210    /// was created, and wait for the off-chain services to ingest it. Returns the checkpoint
211    /// contents.
212    pub async fn create_checkpoint(&mut self) -> VerifiedCheckpoint {
213        let checkpoint = self.executor.create_checkpoint();
214        let indexer = self
215            .offchain
216            .wait_for_indexer(checkpoint.sequence_number, Duration::from_secs(100));
217        let consistent_store = self
218            .offchain
219            .wait_for_consistent_store(checkpoint.sequence_number, Duration::from_secs(100));
220        let graphql = self
221            .offchain
222            .wait_for_graphql(checkpoint.sequence_number, Duration::from_secs(100));
223
224        try_join!(indexer, consistent_store, graphql)
225            .expect("Timed out waiting for indexer and consistent store");
226
227        checkpoint
228    }
229
230    /// The URL to talk to the database on.
231    pub fn db_url(&self) -> Url {
232        self.offchain.db_url()
233    }
234
235    /// The URL to send Consistent Store requests to.
236    pub fn consistent_store_url(&self) -> Url {
237        self.offchain.consistent_store_url()
238    }
239
240    /// The URL to send JSON-RPC requests to.
241    pub fn jsonrpc_url(&self) -> Url {
242        self.offchain.jsonrpc_url()
243    }
244
245    /// The URL to send GraphQL requests to.
246    pub fn graphql_url(&self) -> Url {
247        self.offchain.graphql_url()
248    }
249
250    /// Returns the latest checkpoint that we have all data for in the database, according to the
251    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
252    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
253        self.offchain.latest_checkpoint().await
254    }
255
256    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
257    /// reached (an error).
258    pub async fn wait_for_indexer(
259        &self,
260        checkpoint: u64,
261        timeout: Duration,
262    ) -> Result<(), Elapsed> {
263        self.offchain.wait_for_indexer(checkpoint, timeout).await
264    }
265
266    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
267    /// `pipeline`, or the `timeout` is reached (an error).
268    pub async fn wait_for_pruner(
269        &self,
270        pipeline: &str,
271        checkpoint: u64,
272        timeout: Duration,
273    ) -> Result<(), Elapsed> {
274        self.offchain
275            .wait_for_pruner(pipeline, checkpoint, timeout)
276            .await
277    }
278
279    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is
280    /// reached (an error).
281    pub async fn wait_for_graphql(
282        &self,
283        checkpoint: u64,
284        timeout: Duration,
285    ) -> Result<(), Elapsed> {
286        self.offchain.wait_for_graphql(checkpoint, timeout).await
287    }
288}
289
290impl OffchainCluster {
291    /// Construct a new off-chain cluster and spin up its constituent services.
292    ///
293    /// - `indexer_args`, `client_args`, and `indexer_config` control the indexer. In particular
294    ///   `client_args` is used to configure the client that the indexer uses to fetch checkpoints.
295    /// - `jsonrpc_config` controls the JSON-RPC server.
296    /// - `graphql_config` controls the GraphQL server.
297    /// - `registry` is used to register metrics for the indexer, JSON-RPC, and GraphQL servers.
298    pub async fn new(
299        client_args: ClientArgs,
300        OffchainClusterConfig {
301            indexer_args,
302            consistent_indexer_args,
303            fullnode_args,
304            indexer_config,
305            consistent_config,
306            jsonrpc_config,
307            graphql_config,
308            bootstrap_genesis,
309        }: OffchainClusterConfig,
310        registry: &prometheus::Registry,
311    ) -> anyhow::Result<Self> {
312        let consistent_port = get_available_port();
313        let consistent_listen_address =
314            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), consistent_port);
315
316        let jsonrpc_port = get_available_port();
317        let jsonrpc_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), jsonrpc_port);
318
319        let graphql_port = get_available_port();
320        let graphql_listen_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), graphql_port);
321
322        let database = TempDb::new().context("Failed to create database")?;
323        let database_url = database.database().url();
324
325        let dir = tempfile::tempdir().context("Failed to create temporary directory")?;
326        let rocksdb_path = dir.path().join("rocksdb");
327
328        let consistent_args = ConsistentArgs {
329            rpc_listen_address: consistent_listen_address,
330            tls: ConsistentTlsArgs::default(),
331        };
332
333        let jsonrpc_args = JsonRpcArgs {
334            rpc_listen_address: jsonrpc_listen_address,
335            ..Default::default()
336        };
337
338        let graphql_args = GraphQlArgs {
339            rpc_listen_address: graphql_listen_address,
340            no_ide: true,
341        };
342
343        let db = Db::for_read(database_url.clone(), DbArgs::default())
344            .await
345            .context("Failed to connect to database")?;
346
347        let indexer = setup_indexer(
348            database_url.clone(),
349            DbArgs::default(),
350            indexer_args,
351            client_args.clone(),
352            indexer_config,
353            bootstrap_genesis,
354            registry,
355        )
356        .await
357        .context("Failed to setup indexer")?;
358
359        let pipelines: Vec<_> = indexer.pipelines().collect();
360        let indexer = indexer.run().await.context("Failed to start indexer")?;
361
362        let consistent_store = start_consistent_store(
363            rocksdb_path,
364            consistent_indexer_args,
365            client_args,
366            consistent_args,
367            "0.0.0",
368            consistent_config,
369            registry,
370        )
371        .await
372        .context("Failed to start Consistent Store")?;
373
374        let consistent_reader_args = ConsistentReaderArgs {
375            consistent_store_url: Some(
376                Url::parse(&format!("http://{consistent_listen_address}")).unwrap(),
377            ),
378            consistent_store_statement_timeout_ms: None,
379        };
380
381        let jsonrpc = start_jsonrpc(
382            Some(database_url.clone()),
383            None,
384            DbArgs::default(),
385            BigtableArgs::default(),
386            consistent_reader_args.clone(),
387            jsonrpc_args,
388            JsonRpcNodeArgs::default(),
389            SystemPackageTaskArgs::default(),
390            jsonrpc_config,
391            registry,
392        )
393        .await
394        .context("Failed to start JSON-RPC server")?;
395
396        let graphql = start_graphql(
397            Some(database_url.clone()),
398            fullnode_args,
399            DbArgs::default(),
400            GraphQlKvArgs::default(),
401            consistent_reader_args,
402            graphql_args,
403            SystemPackageTaskArgs::default(),
404            "0.0.0",
405            graphql_config,
406            pipelines.iter().map(|p| p.to_string()).collect(),
407            registry,
408        )
409        .await
410        .context("Failed to start GraphQL server")?;
411
412        let services = indexer
413            .merge(consistent_store)
414            .merge(jsonrpc)
415            .merge(graphql);
416
417        Ok(Self {
418            consistent_listen_address,
419            jsonrpc_listen_address,
420            graphql_listen_address,
421            db,
422            pipelines,
423            services,
424            database,
425            dir,
426        })
427    }
428
429    /// The URL to talk to the database on.
430    pub fn db_url(&self) -> Url {
431        self.database.database().url().clone()
432    }
433
434    /// The URL to send Consistent Store requests to.
435    pub fn consistent_store_url(&self) -> Url {
436        Url::parse(&format!("http://{}/", self.consistent_listen_address))
437            .expect("Failed to parse RPC URL")
438    }
439
440    /// The URL to send JSON-RPC requests to.
441    pub fn jsonrpc_url(&self) -> Url {
442        Url::parse(&format!("http://{}/", self.jsonrpc_listen_address))
443            .expect("Failed to parse RPC URL")
444    }
445
446    /// The URL to send GraphQL requests to.
447    pub fn graphql_url(&self) -> Url {
448        Url::parse(&format!("http://{}/graphql", self.graphql_listen_address))
449            .expect("Failed to parse RPC URL")
450    }
451
452    /// Returns the latest checkpoint that we have all data for in the database, according to the
453    /// watermarks table. Returns `None` if any of the expected pipelines are missing data.
454    pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
455        use watermarks::dsl as w;
456
457        let mut conn = self
458            .db
459            .connect()
460            .await
461            .context("Failed to connect to database")?;
462
463        let latest: HashMap<String, i64> = w::watermarks
464            .select((w::pipeline, w::checkpoint_hi_inclusive))
465            .filter(w::pipeline.eq_any(&self.pipelines))
466            .load(&mut conn)
467            .await?
468            .into_iter()
469            .collect();
470
471        if latest.len() != self.pipelines.len() {
472            return Ok(None);
473        }
474
475        Ok(latest.into_values().min().map(|l| l as u64))
476    }
477
478    /// Returns the latest checkpoint that the pruner is willing to prune up to for the given
479    /// `pipeline`.
480    pub async fn latest_pruner_checkpoint(&self, pipeline: &str) -> anyhow::Result<Option<u64>> {
481        use watermarks::dsl as w;
482
483        let mut conn = self
484            .db
485            .connect()
486            .await
487            .context("Failed to connect to database")?;
488
489        let latest: Option<i64> = w::watermarks
490            .select(w::reader_lo)
491            .filter(w::pipeline.eq(pipeline))
492            .first(&mut conn)
493            .await
494            .optional()?;
495
496        Ok(latest.map(|l| l as u64))
497    }
498
499    /// Returns the latest checkpoint that the consistent store is aware of.
500    pub async fn latest_consistent_store_checkpoint(&self) -> anyhow::Result<u64> {
501        ConsistentServiceClient::connect(self.consistent_store_url().to_string())
502            .await
503            .context("Failed to connect to Consistent Store")?
504            .available_range(AvailableRangeRequest {})
505            .await
506            .context("Failed to fetch available range from Consistent Store")?
507            .into_inner()
508            .max_checkpoint
509            .context("Consistent Store has not started yet")
510    }
511
512    /// Returns the latest checkpoint that the GraphQL service is aware of.
513    pub async fn latest_graphql_checkpoint(&self) -> anyhow::Result<u64> {
514        let query = json!({
515            "query": "query { checkpoint { sequenceNumber } }"
516        });
517
518        let client = Client::new();
519        let request = client.post(self.graphql_url()).json(&query);
520        let response = request
521            .send()
522            .await
523            .context("Request to GraphQL server failed")?;
524
525        let body: Value = response
526            .json()
527            .await
528            .context("Failed to parse GraphQL response")?;
529
530        let sequence_number = body
531            .pointer("/data/checkpoint/sequenceNumber")
532            .context("Failed to find checkpoint sequence number in response")?;
533
534        let sequence_number: i64 = serde_json::from_value(sequence_number.clone())
535            .context("Failed to parse sequence number as i64")?;
536
537        ensure!(sequence_number != i64::MAX, "Indexer has not started yet");
538
539        Ok(sequence_number as u64)
540    }
541
542    /// Returns the latest epoch that the GraphQL service is aware of.
543    pub async fn latest_graphql_epoch(&self) -> anyhow::Result<u64> {
544        let query = json!({
545            "query": "query { epoch { epochId } }"
546        });
547
548        let client = Client::new();
549        let request = client.post(self.graphql_url()).json(&query);
550        let response = request
551            .send()
552            .await
553            .context("Request to GraphQL server failed")?;
554
555        let body: Value = response
556            .json()
557            .await
558            .context("Failed to parse GraphQL response")?;
559
560        let epoch_id = body
561            .pointer("/data/epoch/epochId")
562            .context("Failed to find epochId in response")?;
563
564        let epoch_id: i64 =
565            serde_json::from_value(epoch_id.clone()).context("Failed to parse epochId as i64")?;
566
567        ensure!(epoch_id != i64::MAX, "Indexer has not started yet");
568
569        Ok(epoch_id as u64)
570    }
571
572    /// Waits until the indexer has caught up to the given `checkpoint`, or the `timeout` is
573    /// reached (an error).
574    pub async fn wait_for_indexer(
575        &self,
576        checkpoint: u64,
577        timeout: Duration,
578    ) -> Result<(), Elapsed> {
579        tokio::time::timeout(timeout, async move {
580            let mut interval = interval(Duration::from_millis(200));
581            loop {
582                interval.tick().await;
583                if matches!(self.latest_checkpoint().await, Ok(Some(l)) if l >= checkpoint) {
584                    break;
585                }
586            }
587        })
588        .await
589    }
590
591    /// Waits until the indexer's pruner has caught up to the given `checkpoint`, for the given
592    /// `pipeline`, or the `timeout` is reached (an error).
593    pub async fn wait_for_pruner(
594        &self,
595        pipeline: &str,
596        checkpoint: u64,
597        timeout: Duration,
598    ) -> Result<(), Elapsed> {
599        tokio::time::timeout(timeout, async move {
600            let mut interval = interval(Duration::from_millis(200));
601            loop {
602                interval.tick().await;
603                if matches!(self.latest_pruner_checkpoint(pipeline).await, Ok(Some(l)) if l >= checkpoint) {
604                    break;
605                }
606            }
607        }).await
608    }
609
610    /// Waits until the Consistent Store has caught up to the given `checkpoint`, or the `timeout`
611    /// is reached (an error).
612    pub async fn wait_for_consistent_store(
613        &self,
614        checkpoint: u64,
615        timeout: Duration,
616    ) -> Result<(), Elapsed> {
617        tokio::time::timeout(timeout, async move {
618            let mut interval = interval(Duration::from_millis(200));
619            loop {
620                interval.tick().await;
621                if matches!(self.latest_consistent_store_checkpoint().await, Ok(l) if l >= checkpoint) {
622                    break;
623                }
624            }
625        })
626        .await
627    }
628
629    /// Waits until GraphQL has caught up to the given `checkpoint`, or the `timeout` is reached
630    /// (an error).
631    pub async fn wait_for_graphql(
632        &self,
633        checkpoint: u64,
634        timeout: Duration,
635    ) -> Result<(), Elapsed> {
636        tokio::time::timeout(timeout, async move {
637            let mut interval = interval(Duration::from_millis(200));
638            loop {
639                interval.tick().await;
640                if matches!(self.latest_graphql_checkpoint().await, Ok(l) if l >= checkpoint) {
641                    break;
642                }
643            }
644        })
645        .await
646    }
647}
648
649impl Default for OffchainClusterConfig {
650    fn default() -> Self {
651        Self {
652            indexer_args: Default::default(),
653            consistent_indexer_args: Default::default(),
654            fullnode_args: Default::default(),
655            indexer_config: IndexerConfig::for_test(),
656            consistent_config: ConsistentConfig::for_test(),
657            jsonrpc_config: Default::default(),
658            graphql_config: Default::default(),
659            bootstrap_genesis: None,
660        }
661    }
662}
663
664/// Returns ClientArgs that use a temporary local ingestion path and the TempDir of that path.
665pub fn local_ingestion_client_args() -> (ClientArgs, TempDir) {
666    let temp_dir = tempfile::tempdir()
667        .context("Failed to create data ingestion path")
668        .unwrap();
669    let client_args = ClientArgs {
670        ingestion: IngestionClientArgs {
671            local_ingestion_path: Some(temp_dir.path().to_owned()),
672            ..Default::default()
673        },
674        ..Default::default()
675    };
676    (client_args, temp_dir)
677}
678
679/// Writes a checkpoint file to the given path.
680pub async fn write_checkpoint(path: &Path, checkpoint: Checkpoint) -> anyhow::Result<()> {
681    let sequence_number = checkpoint.summary.sequence_number;
682
683    let mask = FieldMask::from_paths([
684        rpc::v2::Checkpoint::path_builder().sequence_number(),
685        rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
686        rpc::v2::Checkpoint::path_builder().signature().finish(),
687        rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
688        rpc::v2::Checkpoint::path_builder()
689            .transactions()
690            .transaction()
691            .bcs()
692            .value(),
693        rpc::v2::Checkpoint::path_builder()
694            .transactions()
695            .effects()
696            .bcs()
697            .value(),
698        rpc::v2::Checkpoint::path_builder()
699            .transactions()
700            .effects()
701            .unchanged_loaded_runtime_objects()
702            .finish(),
703        rpc::v2::Checkpoint::path_builder()
704            .transactions()
705            .events()
706            .bcs()
707            .value(),
708        rpc::v2::Checkpoint::path_builder()
709            .objects()
710            .objects()
711            .bcs()
712            .value(),
713    ]);
714
715    let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
716    let proto_bytes = proto_checkpoint.encode_to_vec();
717    let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
718
719    let file_name = format!("{}.binpb.zst", sequence_number);
720    let file_path = path.join(file_name);
721    fs::write(file_path, compressed)?;
722    Ok(())
723}