sui_rpc_store/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Storage backend for `sui-rpc-api`.
5//!
6//! Built on top of [`sui_consistent_store`], this crate hosts the
7//! column families that back every read the RPC service performs:
8//!
9//! - Raw chain data — objects, transactions, effects, events,
10//!   checkpoints, committees — previously served by the validator's
11//!   perpetual / checkpoint / committee stores.
12//! - Indexes — owner, dynamic-field, coin, balance, package version,
13//!   epoch info, ledger history — previously served by
14//!   `sui-core::rpc_index` and `sui-indexer-alt-consistent-store`.
15//!
16//! Values are encoded with bespoke protobuf messages defined under
17//! `proto/sui/rpc_store/`, mirroring the build setup in
18//! `sui-consistent-store`.
19
20pub mod config;
21pub mod indexer;
22pub mod proto;
23pub mod reader;
24pub mod schema;
25
26use std::path::Path;
27
28use prometheus::Registry;
29use sui_consistent_store::DbOptions;
30use sui_indexer_alt_framework::IndexerArgs;
31use sui_indexer_alt_framework::ingestion::BoxedStreamingClient;
32use sui_indexer_alt_framework::ingestion::ClientArgs;
33use sui_indexer_alt_framework::ingestion::IngestionConfig;
34use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
35use sui_indexer_alt_framework::ingestion::streaming_client::GrpcStreamingClient;
36use sui_indexer_alt_framework::metrics::IngestionMetrics;
37use sui_indexer_alt_framework::pipeline::CommitterConfig;
38use sui_indexer_alt_framework::service::Service;
39
40pub use crate::config::CommitterLayer;
41pub use crate::config::ConsistencyConfig;
42pub use crate::config::PipelineLayer;
43pub use crate::config::PrunerConfig;
44pub use crate::config::RestoreLayer;
45pub use crate::config::ServiceConfig;
46pub use crate::indexer::Indexer;
47pub use crate::indexer::METRICS_PREFIX;
48pub use crate::indexer::Store;
49pub use crate::indexer::checkpoint_broadcast::CheckpointBroadcast;
50pub use crate::indexer::checkpoint_broadcast::seed_watermark_to_tip as seed_checkpoint_broadcast_watermark;
51pub use crate::indexer::pruner::prune_history_cohort;
52pub use crate::indexer::restore::HISTORY_COHORT;
53pub use crate::indexer::restore::LIVE_COHORT;
54pub use crate::indexer::restore::floor_unrestored_pipelines;
55pub use crate::indexer::restore::restore_indexes;
56pub use crate::indexer::restore::seed_current_epoch_start;
57pub use crate::indexer::restore::seed_history_cohort;
58pub use crate::reader::RpcStoreReader;
59pub use crate::schema::RpcStoreSchema;
60pub use crate::schema::default_rocksdb_config;
61
62/// Standalone-binary entry point. Opens the database at `path`,
63/// constructs an [`Indexer`] from `ClientArgs`-driven ingestion /
64/// streaming clients, registers every pipeline that is enabled in
65/// `config.pipeline`, and runs the resulting indexer.
66///
67/// The embedded-fullnode path bypasses this helper and constructs
68/// [`Indexer::from_store`] directly with its own
69/// [`IngestionClientTrait`] /
70/// [`CheckpointStreamingClient`] implementations.
71///
72/// [`IngestionClientTrait`]:
73///   sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait
74/// [`CheckpointStreamingClient`]:
75///   sui_indexer_alt_framework::ingestion::streaming_client::CheckpointStreamingClient
76pub async fn start_indexer(
77    path: impl AsRef<Path>,
78    indexer_args: IndexerArgs,
79    client_args: ClientArgs,
80    db_options: DbOptions,
81    ingestion_config: IngestionConfig,
82    config: ServiceConfig,
83    registry: &Registry,
84) -> anyhow::Result<Service> {
85    let metrics_prefix = Some(METRICS_PREFIX);
86
87    // Build the metrics once; the same Arc threads through the
88    // ingestion client and (via `IngestionClient::metrics`) the
89    // ingestion service, avoiding double registration against
90    // `registry`.
91    let ingestion_metrics = IngestionMetrics::new(metrics_prefix, registry);
92    let ingestion_client = IngestionClient::new(client_args.ingestion, ingestion_metrics)?;
93    let streaming_client: Option<BoxedStreamingClient> =
94        client_args.streaming.streaming_url.map(|uri| {
95            Box::new(GrpcStreamingClient::new(
96                uri,
97                ingestion_config.streaming_connection_timeout(),
98                ingestion_config.streaming_statement_timeout(),
99            )) as BoxedStreamingClient
100        });
101
102    let mut indexer = Indexer::new(
103        path,
104        indexer_args,
105        ingestion_client,
106        streaming_client,
107        config.consistency,
108        config.pruner,
109        ingestion_config,
110        db_options,
111        registry,
112    )
113    .await?;
114
115    let committer = config.committer.finish(CommitterConfig::default());
116    indexer.add_pipelines(config.pipeline, committer).await?;
117
118    indexer.run().await
119}