sui_rpc_store/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Runtime configuration for the `sui-rpc-store` indexer.
5//!
6//! The indexer is driven by [`ServiceConfig`], which groups the
7//! ingestion, consistency, RocksDB, committer, and per-pipeline
8//! settings the orchestrator needs.
9//!
10//! Per-pipeline enable/disable is expressed through
11//! [`PipelineLayer`]: every pipeline maps to an
12//! `Option<CommitterLayer>` field; `Some(_)` means the pipeline is
13//! registered (with the supplied committer overrides), `None` means
14//! it is skipped. The standalone binary populates the layer from
15//! its TOML config; the embedded-fullnode caller builds it
16//! programmatically via [`PipelineLayer::embedded`] so the raw
17//! chain CFs (served by the fullnode's perpetual store) are not
18//! double-written by this indexer.
19
20use std::time::Duration;
21
22use serde::Deserialize;
23use serde::Serialize;
24use sui_indexer_alt_framework::pipeline::CommitterConfig;
25
26/// Top-level configuration for the `sui-rpc-store` indexer
27/// service. Parses from TOML; every field has a sensible default
28/// for tests and for the embedded use case where most knobs are
29/// supplied programmatically.
30#[derive(Default, Deserialize, Serialize)]
31#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
32pub struct ServiceConfig {
33    /// Cross-pipeline consistency knobs: how often to take
34    /// snapshots and how deep the per-pipeline write buffer is.
35    pub consistency: ConsistencyConfig,
36
37    /// Default committer settings shared by all pipelines.
38    /// Per-pipeline entries in [`PipelineLayer`] can override
39    /// individual fields.
40    pub committer: CommitterLayer,
41
42    /// Per-pipeline enable/disable plus optional committer
43    /// overrides.
44    pub pipeline: PipelineLayer,
45
46    /// Pruning policy for the historical CFs. Absent (the default)
47    /// disables pruning entirely — the store retains all history.
48    pub pruner: Option<PrunerConfig>,
49}
50
51/// Cross-pipeline consistency knobs surfaced to operators. The
52/// indexer threads these into the [`Synchronizer`] at startup.
53///
54/// Snapshot *retention* (how many in-memory snapshots are kept, and
55/// thus how far back consistent reads can reach) is not configured
56/// here: it is an open-time property of the database, set via
57/// [`DbOptions::snapshot_capacity`]. Because a snapshot is taken at
58/// every checkpoint boundary, the effective consistent-read window
59/// is roughly `snapshot_capacity` checkpoints.
60///
61/// [`Synchronizer`]: sui_consistent_store::Synchronizer
62/// [`DbOptions::snapshot_capacity`]: sui_consistent_store::DbOptions::snapshot_capacity
63#[derive(Clone, Deserialize, Serialize)]
64#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
65pub struct ConsistencyConfig {
66    /// Per-pipeline mpsc capacity for batches waiting to be
67    /// committed. The synchronizer's slowest pipeline gates
68    /// progress; this buffer absorbs short bursts of slack between
69    /// peer pipelines before back-pressure kicks in.
70    pub buffer_size: usize,
71}
72
73/// Pruning policy for the historical column families.
74///
75/// Retention is expressed in epochs, mirroring the validator's
76/// perpetual-store pruner: the `retention_epochs` most-recent
77/// epochs (including the current one) are retained in full, and
78/// everything in older epochs becomes eligible for deletion. The
79/// resulting floor is additionally clamped so it never advances past
80/// the oldest in-memory snapshot, keeping point-in-time reads
81/// coherent even under an aggressively small retention.
82///
83/// The pruner advances the floor toward its target in chunks of at
84/// most `max_chunk_checkpoints` checkpoints, persisting the new
85/// watermark after each chunk so progress survives a restart. Each
86/// tick advances the floor by at most `max_checkpoints_per_tick`
87/// checkpoints so a large backlog drains across many ticks rather
88/// than one long blocking pass.
89///
90/// Only the historical CFs are pruned: the per-transaction
91/// (`transactions`, `effects`, `events`, `tx_metadata_by_seq`),
92/// per-checkpoint (`checkpoint_summary`, `checkpoint_contents`),
93/// digest-reverse-index (`tx_seq_by_digest`,
94/// `checkpoint_seq_by_digest`), superseded-`objects`-version,
95/// checkpoint-pinned `object_version_by_checkpoint`, and
96/// ledger-history bitmap CFs. The live-set-bounded indexes
97/// (`object_by_owner`, `object_by_type`, `balance`,
98/// `package_versions`) and the tiny `epochs` CF are never pruned.
99#[derive(Clone, Deserialize, Serialize)]
100#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
101pub struct PrunerConfig {
102    /// Number of most-recent epochs to retain in full. Data in
103    /// epochs older than this is eligible for pruning. Must be at
104    /// least `1`; the pruner refuses to start otherwise, since a
105    /// value of `0` would prune the current epoch.
106    pub retention_epochs: u64,
107
108    /// How often the pruner wakes to recompute the target floor and
109    /// advance toward it, in milliseconds.
110    pub interval_ms: u64,
111
112    /// Maximum number of checkpoints whose data is deleted in a
113    /// single write batch. Bounds the per-batch work (and the number
114    /// of effects rows scanned for object/digest deletes) when a
115    /// whole epoch ages out at once.
116    pub max_chunk_checkpoints: u64,
117
118    /// Maximum number of checkpoints whose history is pruned in a
119    /// single tick. Bounds the per-tick (blocking) work so that a
120    /// large backlog — for example when pruning is first enabled on
121    /// an old database — drains across many ticks rather than one
122    /// long pass that occupies a blocking thread for minutes. The
123    /// floor still converges to its retention target over subsequent
124    /// ticks; `interval_ms` and this bound together set the drain
125    /// rate. Must be at least `1`; the pruner refuses to start
126    /// otherwise, since a value of `0` would never make progress.
127    pub max_checkpoints_per_tick: u64,
128}
129
130impl Default for PrunerConfig {
131    fn default() -> Self {
132        Self {
133            retention_epochs: 30,
134            interval_ms: 300_000,
135            max_chunk_checkpoints: 100,
136            // 100 chunks per tick at the default chunk size. Far above
137            // the steady-state rate at which a single epoch ages out,
138            // so retention is honored without intervention, while a
139            // first-run backlog on an old database is still bounded
140            // per tick rather than drained in one blocking pass.
141            max_checkpoints_per_tick: 10_000,
142        }
143    }
144}
145
146impl PrunerConfig {
147    /// The pruner's wake interval as a [`Duration`].
148    pub fn interval(&self) -> Duration {
149        Duration::from_millis(self.interval_ms)
150    }
151}
152
153/// Per-pipeline registration + override map. Every pipeline that
154/// writes to a CF in [`RpcStoreSchema`] has a corresponding
155/// `Option<CommitterLayer>` field here.
156///
157/// `Some(layer)` registers the pipeline with the supplied committer
158/// overrides folded onto the shared [`CommitterLayer`] default;
159/// `None` skips the pipeline entirely (e.g. the raw chain CFs in
160/// the embedded-fullnode case, where the fullnode populates them
161/// through a separate path).
162///
163/// Grouped in the struct for documentation only — serde sees each
164/// field as a top-level key.
165///
166/// [`RpcStoreSchema`]: crate::RpcStoreSchema
167#[derive(Default, Deserialize, Serialize)]
168#[serde(default, rename_all = "kebab-case")]
169pub struct PipelineLayer {
170    // --- Raw chain data ---
171    pub epochs: Option<CommitterLayer>,
172    pub checkpoint_summary: Option<CommitterLayer>,
173    pub checkpoint_contents: Option<CommitterLayer>,
174    pub checkpoint_seq_by_digest: Option<CommitterLayer>,
175    pub transactions: Option<CommitterLayer>,
176    pub tx_seq_by_digest: Option<CommitterLayer>,
177    pub tx_metadata_by_seq: Option<CommitterLayer>,
178    pub effects: Option<CommitterLayer>,
179    pub events: Option<CommitterLayer>,
180    pub objects: Option<CommitterLayer>,
181    pub object_version_by_checkpoint: Option<CommitterLayer>,
182
183    // --- Indexes ---
184    pub object_by_owner: Option<CommitterLayer>,
185    pub object_by_type: Option<CommitterLayer>,
186    pub balance: Option<CommitterLayer>,
187    pub package_versions: Option<CommitterLayer>,
188    pub transaction_bitmap: Option<CommitterLayer>,
189    pub event_bitmap: Option<CommitterLayer>,
190}
191
192/// Per-pipeline committer overrides. Every field is optional; an
193/// unset field inherits from the shared committer default the
194/// orchestrator passes through to
195/// [`CommitterLayer::finish`](Self::finish).
196#[derive(Default, Deserialize, Serialize)]
197#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
198pub struct CommitterLayer {
199    pub write_concurrency: Option<usize>,
200    pub collect_interval_ms: Option<u64>,
201    pub watermark_interval_ms: Option<u64>,
202}
203
204impl ServiceConfig {
205    /// Configuration matching [`Self::default()`] but with every
206    /// pipeline explicitly enabled and the committer layer
207    /// initialised from [`CommitterConfig::default()`]. Suitable
208    /// for surfacing as a TOML example.
209    pub fn example() -> Self {
210        Self {
211            consistency: ConsistencyConfig::default(),
212            committer: CommitterConfig::default().into(),
213            pipeline: PipelineLayer::all(),
214            pruner: Some(PrunerConfig::default()),
215        }
216    }
217}
218
219impl PipelineLayer {
220    /// Every pipeline enabled with default committer overrides
221    /// (`Some(CommitterLayer::default())`). The standalone-binary
222    /// default.
223    pub fn all() -> Self {
224        Self {
225            epochs: Some(CommitterLayer::default()),
226            checkpoint_summary: Some(CommitterLayer::default()),
227            checkpoint_contents: Some(CommitterLayer::default()),
228            checkpoint_seq_by_digest: Some(CommitterLayer::default()),
229            transactions: Some(CommitterLayer::default()),
230            tx_seq_by_digest: Some(CommitterLayer::default()),
231            tx_metadata_by_seq: Some(CommitterLayer::default()),
232            effects: Some(CommitterLayer::default()),
233            events: Some(CommitterLayer::default()),
234            objects: Some(CommitterLayer::default()),
235            object_version_by_checkpoint: Some(CommitterLayer::default()),
236            object_by_owner: Some(CommitterLayer::default()),
237            object_by_type: Some(CommitterLayer::default()),
238            balance: Some(CommitterLayer::default()),
239            package_versions: Some(CommitterLayer::default()),
240            transaction_bitmap: Some(CommitterLayer::default()),
241            event_bitmap: Some(CommitterLayer::default()),
242        }
243    }
244
245    /// The embedded-fullnode cohort: every pipeline this indexer owns
246    /// when it runs inside a Sui fullnode beside the validator's
247    /// perpetual store.
248    ///
249    /// The raw chain-data CFs (`transactions`, `effects`, `events`,
250    /// `objects`, `checkpoint_summary`, `checkpoint_contents`,
251    /// `checkpoint_seq_by_digest`) are left `None`: the perpetual store
252    /// already holds that data and serves it directly, so this indexer
253    /// must not double-write it.
254    ///
255    /// The enabled pipelines form two cohorts. The
256    /// [`Synchronizer`](sui_consistent_store::Synchronizer)
257    /// distinguishes them by their persisted watermark at startup,
258    /// not by this layer, so both are simply registered here:
259    ///
260    /// - **Live cohort** — restored to the fullnode's tip and
261    ///   following live from there: `object_by_owner`,
262    ///   `object_by_type`, `balance`.
263    /// - **History cohort** — seeded to the lowest available
264    ///   checkpoint and backfilling upward: `epochs`,
265    ///   `object_version_by_checkpoint`, `package_versions`,
266    ///   `tx_seq_by_digest`, `tx_metadata_by_seq`, `transaction_bitmap`,
267    ///   `event_bitmap`. These back the ledger-history list APIs (the
268    ///   bitmaps plus the `tx_seq` <-> digest maps needed to interpret
269    ///   bitmap results) and the per-epoch protocol/committee reads
270    ///   (`epochs`). `object_version_by_checkpoint` and
271    ///   `package_versions` are additionally restored at the tip for
272    ///   their floor rows, then backfill the per-checkpoint detail over
273    ///   `(L, T]` (see the cohort docs in
274    ///   [`restore`](crate::indexer::restore)).
275    pub fn embedded() -> Self {
276        Self {
277            // Live cohort: restored to the tip, follows live.
278            object_by_owner: Some(CommitterLayer::default()),
279            object_by_type: Some(CommitterLayer::default()),
280            balance: Some(CommitterLayer::default()),
281            // History cohort: seeded to L, backfills upward.
282            // `object_version_by_checkpoint` and `package_versions` are
283            // additionally restored at the tip for their floor rows (see
284            // the cohort docs in `restore.rs`).
285            epochs: Some(CommitterLayer::default()),
286            object_version_by_checkpoint: Some(CommitterLayer::default()),
287            package_versions: Some(CommitterLayer::default()),
288            tx_seq_by_digest: Some(CommitterLayer::default()),
289            tx_metadata_by_seq: Some(CommitterLayer::default()),
290            transaction_bitmap: Some(CommitterLayer::default()),
291            event_bitmap: Some(CommitterLayer::default()),
292            ..Self::default()
293        }
294    }
295}
296
297/// Per-pipeline registration toggles for
298/// [`restore_indexes`](crate::restore_indexes).
299///
300/// The derived-index pipelines (`object_by_owner`, `object_by_type`,
301/// `balance`, `package_versions`) are always
302/// restored — they cannot be reconstructed from anywhere else. The
303/// raw `objects` CF is conditional: the standalone deployment
304/// needs it so version-keyed reads are served by the restored
305/// snapshot, while the embedded-fullnode deployment already has
306/// every object version in the validator's perpetual store and
307/// can skip the duplicate write.
308#[derive(Default, Clone, Debug)]
309pub struct RestoreLayer {
310    /// If true, register the `objects` pipeline with the restore
311    /// driver so each live object lands as an
312    /// `(ObjectID, version) → StoredObject` row.
313    pub objects: bool,
314}
315
316impl RestoreLayer {
317    /// Restore every pipeline, including the raw `objects` CF.
318    /// The standalone-binary default.
319    pub fn all() -> Self {
320        Self { objects: true }
321    }
322
323    /// Restore only the derived-index pipelines. The embedded-
324    /// fullnode default — the fullnode's perpetual store already
325    /// holds every object version, so the `objects` CF is left
326    /// untouched here.
327    pub fn indexes_only() -> Self {
328        Self { objects: false }
329    }
330}
331
332impl CommitterLayer {
333    /// Fold the override layer onto a shared default
334    /// [`CommitterConfig`]. Unset fields inherit from `base`.
335    pub fn finish(self, base: CommitterConfig) -> CommitterConfig {
336        CommitterConfig {
337            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
338            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
339            watermark_interval_ms: self
340                .watermark_interval_ms
341                .unwrap_or(base.watermark_interval_ms),
342            ..Default::default()
343        }
344    }
345}
346
347impl From<CommitterConfig> for CommitterLayer {
348    fn from(config: CommitterConfig) -> Self {
349        Self {
350            write_concurrency: Some(config.write_concurrency),
351            collect_interval_ms: Some(config.collect_interval_ms),
352            watermark_interval_ms: Some(config.watermark_interval_ms),
353        }
354    }
355}
356
357impl Default for ConsistencyConfig {
358    fn default() -> Self {
359        Self { buffer_size: 5_000 }
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    #[test]
368    fn embedded_enables_only_cohort_pipelines() {
369        let layer = PipelineLayer::embedded();
370        // Live cohort.
371        assert!(layer.object_by_owner.is_some());
372        assert!(layer.object_by_type.is_some());
373        assert!(layer.balance.is_some());
374        // History cohort (object_version_by_checkpoint and
375        // package_versions are also restored).
376        assert!(layer.epochs.is_some());
377        assert!(layer.object_version_by_checkpoint.is_some());
378        assert!(layer.package_versions.is_some());
379        assert!(layer.tx_seq_by_digest.is_some());
380        assert!(layer.tx_metadata_by_seq.is_some());
381        assert!(layer.transaction_bitmap.is_some());
382        assert!(layer.event_bitmap.is_some());
383        // Deactivated: served directly by the perpetual store.
384        assert!(layer.objects.is_none());
385        assert!(layer.transactions.is_none());
386        assert!(layer.effects.is_none());
387        assert!(layer.events.is_none());
388        assert!(layer.checkpoint_summary.is_none());
389        assert!(layer.checkpoint_contents.is_none());
390        assert!(layer.checkpoint_seq_by_digest.is_none());
391    }
392
393    #[test]
394    fn all_enables_every_pipeline() {
395        let layer = PipelineLayer::all();
396        assert!(layer.epochs.is_some());
397        assert!(layer.checkpoint_summary.is_some());
398        assert!(layer.transactions.is_some());
399        assert!(layer.objects.is_some());
400        assert!(layer.object_by_owner.is_some());
401        assert!(layer.balance.is_some());
402        assert!(layer.event_bitmap.is_some());
403    }
404
405    #[test]
406    fn pruning_disabled_by_default() {
407        // A default ServiceConfig (the embedded-fullnode shape)
408        // leaves pruning off; `example()` surfaces it populated.
409        assert!(ServiceConfig::default().pruner.is_none());
410        assert!(ServiceConfig::example().pruner.is_some());
411    }
412
413    #[test]
414    fn pruner_config_interval_round_trips() {
415        let cfg = PrunerConfig {
416            interval_ms: 1_500,
417            ..PrunerConfig::default()
418        };
419        assert_eq!(cfg.interval(), std::time::Duration::from_millis(1_500));
420    }
421
422    #[test]
423    fn committer_layer_overrides_base() {
424        let base = CommitterConfig {
425            write_concurrency: 4,
426            collect_interval_ms: 200,
427            watermark_interval_ms: 200,
428            ..Default::default()
429        };
430        let layer = CommitterLayer {
431            write_concurrency: Some(8),
432            collect_interval_ms: None,
433            watermark_interval_ms: Some(500),
434        };
435        let merged = layer.finish(base);
436        assert_eq!(merged.write_concurrency, 8);
437        // Unset fields inherit from `base`.
438        assert_eq!(merged.collect_interval_ms, 200);
439        assert_eq!(merged.watermark_interval_ms, 500);
440    }
441}