sui_rpc_store/config.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Runtime configuration for the `sui-rpc-store` indexer.
5//!
6//! The indexer is driven by [`ServiceConfig`], which groups the
7//! ingestion, consistency, RocksDB, committer, and per-pipeline
8//! settings the orchestrator needs.
9//!
10//! Per-pipeline enable/disable is expressed through
11//! [`PipelineLayer`]: every pipeline maps to an
12//! `Option<CommitterLayer>` field; `Some(_)` means the pipeline is
13//! registered (with the supplied committer overrides), `None` means
14//! it is skipped. The standalone binary populates the layer from
15//! its TOML config; the embedded-fullnode caller builds it
16//! programmatically via [`PipelineLayer::embedded`] so the raw
17//! chain CFs (served by the fullnode's perpetual store) are not
18//! double-written by this indexer.
19
20use std::time::Duration;
21
22use serde::Deserialize;
23use serde::Serialize;
24use sui_indexer_alt_framework::pipeline::CommitterConfig;
25
26/// Top-level configuration for the `sui-rpc-store` indexer
27/// service. Parses from TOML; every field has a sensible default
28/// for tests and for the embedded use case where most knobs are
29/// supplied programmatically.
30#[derive(Default, Deserialize, Serialize)]
31#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
32pub struct ServiceConfig {
33 /// Cross-pipeline consistency knobs: how often to take
34 /// snapshots and how deep the per-pipeline write buffer is.
35 pub consistency: ConsistencyConfig,
36
37 /// Default committer settings shared by all pipelines.
38 /// Per-pipeline entries in [`PipelineLayer`] can override
39 /// individual fields.
40 pub committer: CommitterLayer,
41
42 /// Per-pipeline enable/disable plus optional committer
43 /// overrides.
44 pub pipeline: PipelineLayer,
45
46 /// Pruning policy for the historical CFs. Absent (the default)
47 /// disables pruning entirely — the store retains all history.
48 pub pruner: Option<PrunerConfig>,
49}
50
51/// Cross-pipeline consistency knobs surfaced to operators. The
52/// indexer threads these into the [`Synchronizer`] at startup.
53///
54/// Snapshot *retention* (how many in-memory snapshots are kept, and
55/// thus how far back consistent reads can reach) is not configured
56/// here: it is an open-time property of the database, set via
57/// [`DbOptions::snapshot_capacity`]. Because a snapshot is taken at
58/// every checkpoint boundary, the effective consistent-read window
59/// is roughly `snapshot_capacity` checkpoints.
60///
61/// [`Synchronizer`]: sui_consistent_store::Synchronizer
62/// [`DbOptions::snapshot_capacity`]: sui_consistent_store::DbOptions::snapshot_capacity
63#[derive(Clone, Deserialize, Serialize)]
64#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
65pub struct ConsistencyConfig {
66 /// Per-pipeline mpsc capacity for batches waiting to be
67 /// committed. The synchronizer's slowest pipeline gates
68 /// progress; this buffer absorbs short bursts of slack between
69 /// peer pipelines before back-pressure kicks in.
70 pub buffer_size: usize,
71}
72
73/// Pruning policy for the historical column families.
74///
75/// Retention is expressed in epochs, mirroring the validator's
76/// perpetual-store pruner: the `retention_epochs` most-recent
77/// epochs (including the current one) are retained in full, and
78/// everything in older epochs becomes eligible for deletion. The
79/// resulting floor is additionally clamped so it never advances past
80/// the oldest in-memory snapshot, keeping point-in-time reads
81/// coherent even under an aggressively small retention.
82///
83/// The pruner advances the floor toward its target in chunks of at
84/// most `max_chunk_checkpoints` checkpoints, persisting the new
85/// watermark after each chunk so progress survives a restart. Each
86/// tick advances the floor by at most `max_checkpoints_per_tick`
87/// checkpoints so a large backlog drains across many ticks rather
88/// than one long blocking pass.
89///
90/// Only the historical CFs are pruned: the per-transaction
91/// (`transactions`, `effects`, `events`, `tx_metadata_by_seq`),
92/// per-checkpoint (`checkpoint_summary`, `checkpoint_contents`),
93/// digest-reverse-index (`tx_seq_by_digest`,
94/// `checkpoint_seq_by_digest`), superseded-`objects`-version,
95/// checkpoint-pinned `object_version_by_checkpoint`, and
96/// ledger-history bitmap CFs. The live-set-bounded indexes
97/// (`object_by_owner`, `object_by_type`, `balance`,
98/// `package_versions`) and the tiny `epochs` CF are never pruned.
99#[derive(Clone, Deserialize, Serialize)]
100#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
101pub struct PrunerConfig {
102 /// Number of most-recent epochs to retain in full. Data in
103 /// epochs older than this is eligible for pruning. Must be at
104 /// least `1`; the pruner refuses to start otherwise, since a
105 /// value of `0` would prune the current epoch.
106 pub retention_epochs: u64,
107
108 /// How often the pruner wakes to recompute the target floor and
109 /// advance toward it, in milliseconds.
110 pub interval_ms: u64,
111
112 /// Maximum number of checkpoints whose data is deleted in a
113 /// single write batch. Bounds the per-batch work (and the number
114 /// of effects rows scanned for object/digest deletes) when a
115 /// whole epoch ages out at once.
116 pub max_chunk_checkpoints: u64,
117
118 /// Maximum number of checkpoints whose history is pruned in a
119 /// single tick. Bounds the per-tick (blocking) work so that a
120 /// large backlog — for example when pruning is first enabled on
121 /// an old database — drains across many ticks rather than one
122 /// long pass that occupies a blocking thread for minutes. The
123 /// floor still converges to its retention target over subsequent
124 /// ticks; `interval_ms` and this bound together set the drain
125 /// rate. Must be at least `1`; the pruner refuses to start
126 /// otherwise, since a value of `0` would never make progress.
127 pub max_checkpoints_per_tick: u64,
128}
129
130impl Default for PrunerConfig {
131 fn default() -> Self {
132 Self {
133 retention_epochs: 30,
134 interval_ms: 300_000,
135 max_chunk_checkpoints: 100,
136 // 100 chunks per tick at the default chunk size. Far above
137 // the steady-state rate at which a single epoch ages out,
138 // so retention is honored without intervention, while a
139 // first-run backlog on an old database is still bounded
140 // per tick rather than drained in one blocking pass.
141 max_checkpoints_per_tick: 10_000,
142 }
143 }
144}
145
146impl PrunerConfig {
147 /// The pruner's wake interval as a [`Duration`].
148 pub fn interval(&self) -> Duration {
149 Duration::from_millis(self.interval_ms)
150 }
151}
152
153/// Per-pipeline registration + override map. Every pipeline that
154/// writes to a CF in [`RpcStoreSchema`] has a corresponding
155/// `Option<CommitterLayer>` field here.
156///
157/// `Some(layer)` registers the pipeline with the supplied committer
158/// overrides folded onto the shared [`CommitterLayer`] default;
159/// `None` skips the pipeline entirely (e.g. the raw chain CFs in
160/// the embedded-fullnode case, where the fullnode populates them
161/// through a separate path).
162///
163/// Grouped in the struct for documentation only — serde sees each
164/// field as a top-level key.
165///
166/// [`RpcStoreSchema`]: crate::RpcStoreSchema
167#[derive(Default, Deserialize, Serialize)]
168#[serde(default, rename_all = "kebab-case")]
169pub struct PipelineLayer {
170 // --- Raw chain data ---
171 pub epochs: Option<CommitterLayer>,
172 pub checkpoint_summary: Option<CommitterLayer>,
173 pub checkpoint_contents: Option<CommitterLayer>,
174 pub checkpoint_seq_by_digest: Option<CommitterLayer>,
175 pub transactions: Option<CommitterLayer>,
176 pub tx_seq_by_digest: Option<CommitterLayer>,
177 pub tx_metadata_by_seq: Option<CommitterLayer>,
178 pub effects: Option<CommitterLayer>,
179 pub events: Option<CommitterLayer>,
180 pub objects: Option<CommitterLayer>,
181 pub object_version_by_checkpoint: Option<CommitterLayer>,
182
183 // --- Indexes ---
184 pub object_by_owner: Option<CommitterLayer>,
185 pub object_by_type: Option<CommitterLayer>,
186 pub balance: Option<CommitterLayer>,
187 pub package_versions: Option<CommitterLayer>,
188 pub transaction_bitmap: Option<CommitterLayer>,
189 pub event_bitmap: Option<CommitterLayer>,
190}
191
192/// Per-pipeline committer overrides. Every field is optional; an
193/// unset field inherits from the shared committer default the
194/// orchestrator passes through to
195/// [`CommitterLayer::finish`](Self::finish).
196#[derive(Default, Deserialize, Serialize)]
197#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
198pub struct CommitterLayer {
199 pub write_concurrency: Option<usize>,
200 pub collect_interval_ms: Option<u64>,
201 pub watermark_interval_ms: Option<u64>,
202}
203
204impl ServiceConfig {
205 /// Configuration matching [`Self::default()`] but with every
206 /// pipeline explicitly enabled and the committer layer
207 /// initialised from [`CommitterConfig::default()`]. Suitable
208 /// for surfacing as a TOML example.
209 pub fn example() -> Self {
210 Self {
211 consistency: ConsistencyConfig::default(),
212 committer: CommitterConfig::default().into(),
213 pipeline: PipelineLayer::all(),
214 pruner: Some(PrunerConfig::default()),
215 }
216 }
217}
218
219impl PipelineLayer {
220 /// Every pipeline enabled with default committer overrides
221 /// (`Some(CommitterLayer::default())`). The standalone-binary
222 /// default.
223 pub fn all() -> Self {
224 Self {
225 epochs: Some(CommitterLayer::default()),
226 checkpoint_summary: Some(CommitterLayer::default()),
227 checkpoint_contents: Some(CommitterLayer::default()),
228 checkpoint_seq_by_digest: Some(CommitterLayer::default()),
229 transactions: Some(CommitterLayer::default()),
230 tx_seq_by_digest: Some(CommitterLayer::default()),
231 tx_metadata_by_seq: Some(CommitterLayer::default()),
232 effects: Some(CommitterLayer::default()),
233 events: Some(CommitterLayer::default()),
234 objects: Some(CommitterLayer::default()),
235 object_version_by_checkpoint: Some(CommitterLayer::default()),
236 object_by_owner: Some(CommitterLayer::default()),
237 object_by_type: Some(CommitterLayer::default()),
238 balance: Some(CommitterLayer::default()),
239 package_versions: Some(CommitterLayer::default()),
240 transaction_bitmap: Some(CommitterLayer::default()),
241 event_bitmap: Some(CommitterLayer::default()),
242 }
243 }
244
245 /// The embedded-fullnode cohort: every pipeline this indexer owns
246 /// when it runs inside a Sui fullnode beside the validator's
247 /// perpetual store.
248 ///
249 /// The raw chain-data CFs (`transactions`, `effects`, `events`,
250 /// `objects`, `checkpoint_summary`, `checkpoint_contents`,
251 /// `checkpoint_seq_by_digest`) are left `None`: the perpetual store
252 /// already holds that data and serves it directly, so this indexer
253 /// must not double-write it.
254 ///
255 /// The enabled pipelines form two cohorts. The
256 /// [`Synchronizer`](sui_consistent_store::Synchronizer)
257 /// distinguishes them by their persisted watermark at startup,
258 /// not by this layer, so both are simply registered here:
259 ///
260 /// - **Live cohort** — restored to the fullnode's tip and
261 /// following live from there: `object_by_owner`,
262 /// `object_by_type`, `balance`.
263 /// - **History cohort** — seeded to the lowest available
264 /// checkpoint and backfilling upward: `epochs`,
265 /// `object_version_by_checkpoint`, `package_versions`,
266 /// `tx_seq_by_digest`, `tx_metadata_by_seq`, `transaction_bitmap`,
267 /// `event_bitmap`. These back the ledger-history list APIs (the
268 /// bitmaps plus the `tx_seq` <-> digest maps needed to interpret
269 /// bitmap results) and the per-epoch protocol/committee reads
270 /// (`epochs`). `object_version_by_checkpoint` and
271 /// `package_versions` are additionally restored at the tip for
272 /// their floor rows, then backfill the per-checkpoint detail over
273 /// `(L, T]` (see the cohort docs in
274 /// [`restore`](crate::indexer::restore)).
275 pub fn embedded() -> Self {
276 Self {
277 // Live cohort: restored to the tip, follows live.
278 object_by_owner: Some(CommitterLayer::default()),
279 object_by_type: Some(CommitterLayer::default()),
280 balance: Some(CommitterLayer::default()),
281 // History cohort: seeded to L, backfills upward.
282 // `object_version_by_checkpoint` and `package_versions` are
283 // additionally restored at the tip for their floor rows (see
284 // the cohort docs in `restore.rs`).
285 epochs: Some(CommitterLayer::default()),
286 object_version_by_checkpoint: Some(CommitterLayer::default()),
287 package_versions: Some(CommitterLayer::default()),
288 tx_seq_by_digest: Some(CommitterLayer::default()),
289 tx_metadata_by_seq: Some(CommitterLayer::default()),
290 transaction_bitmap: Some(CommitterLayer::default()),
291 event_bitmap: Some(CommitterLayer::default()),
292 ..Self::default()
293 }
294 }
295}
296
297/// Per-pipeline registration toggles for
298/// [`restore_indexes`](crate::restore_indexes).
299///
300/// The derived-index pipelines (`object_by_owner`, `object_by_type`,
301/// `balance`, `package_versions`) are always
302/// restored — they cannot be reconstructed from anywhere else. The
303/// raw `objects` CF is conditional: the standalone deployment
304/// needs it so version-keyed reads are served by the restored
305/// snapshot, while the embedded-fullnode deployment already has
306/// every object version in the validator's perpetual store and
307/// can skip the duplicate write.
308#[derive(Default, Clone, Debug)]
309pub struct RestoreLayer {
310 /// If true, register the `objects` pipeline with the restore
311 /// driver so each live object lands as an
312 /// `(ObjectID, version) → StoredObject` row.
313 pub objects: bool,
314}
315
316impl RestoreLayer {
317 /// Restore every pipeline, including the raw `objects` CF.
318 /// The standalone-binary default.
319 pub fn all() -> Self {
320 Self { objects: true }
321 }
322
323 /// Restore only the derived-index pipelines. The embedded-
324 /// fullnode default — the fullnode's perpetual store already
325 /// holds every object version, so the `objects` CF is left
326 /// untouched here.
327 pub fn indexes_only() -> Self {
328 Self { objects: false }
329 }
330}
331
332impl CommitterLayer {
333 /// Fold the override layer onto a shared default
334 /// [`CommitterConfig`]. Unset fields inherit from `base`.
335 pub fn finish(self, base: CommitterConfig) -> CommitterConfig {
336 CommitterConfig {
337 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
338 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
339 watermark_interval_ms: self
340 .watermark_interval_ms
341 .unwrap_or(base.watermark_interval_ms),
342 ..Default::default()
343 }
344 }
345}
346
347impl From<CommitterConfig> for CommitterLayer {
348 fn from(config: CommitterConfig) -> Self {
349 Self {
350 write_concurrency: Some(config.write_concurrency),
351 collect_interval_ms: Some(config.collect_interval_ms),
352 watermark_interval_ms: Some(config.watermark_interval_ms),
353 }
354 }
355}
356
357impl Default for ConsistencyConfig {
358 fn default() -> Self {
359 Self { buffer_size: 5_000 }
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 #[test]
368 fn embedded_enables_only_cohort_pipelines() {
369 let layer = PipelineLayer::embedded();
370 // Live cohort.
371 assert!(layer.object_by_owner.is_some());
372 assert!(layer.object_by_type.is_some());
373 assert!(layer.balance.is_some());
374 // History cohort (object_version_by_checkpoint and
375 // package_versions are also restored).
376 assert!(layer.epochs.is_some());
377 assert!(layer.object_version_by_checkpoint.is_some());
378 assert!(layer.package_versions.is_some());
379 assert!(layer.tx_seq_by_digest.is_some());
380 assert!(layer.tx_metadata_by_seq.is_some());
381 assert!(layer.transaction_bitmap.is_some());
382 assert!(layer.event_bitmap.is_some());
383 // Deactivated: served directly by the perpetual store.
384 assert!(layer.objects.is_none());
385 assert!(layer.transactions.is_none());
386 assert!(layer.effects.is_none());
387 assert!(layer.events.is_none());
388 assert!(layer.checkpoint_summary.is_none());
389 assert!(layer.checkpoint_contents.is_none());
390 assert!(layer.checkpoint_seq_by_digest.is_none());
391 }
392
393 #[test]
394 fn all_enables_every_pipeline() {
395 let layer = PipelineLayer::all();
396 assert!(layer.epochs.is_some());
397 assert!(layer.checkpoint_summary.is_some());
398 assert!(layer.transactions.is_some());
399 assert!(layer.objects.is_some());
400 assert!(layer.object_by_owner.is_some());
401 assert!(layer.balance.is_some());
402 assert!(layer.event_bitmap.is_some());
403 }
404
405 #[test]
406 fn pruning_disabled_by_default() {
407 // A default ServiceConfig (the embedded-fullnode shape)
408 // leaves pruning off; `example()` surfaces it populated.
409 assert!(ServiceConfig::default().pruner.is_none());
410 assert!(ServiceConfig::example().pruner.is_some());
411 }
412
413 #[test]
414 fn pruner_config_interval_round_trips() {
415 let cfg = PrunerConfig {
416 interval_ms: 1_500,
417 ..PrunerConfig::default()
418 };
419 assert_eq!(cfg.interval(), std::time::Duration::from_millis(1_500));
420 }
421
422 #[test]
423 fn committer_layer_overrides_base() {
424 let base = CommitterConfig {
425 write_concurrency: 4,
426 collect_interval_ms: 200,
427 watermark_interval_ms: 200,
428 ..Default::default()
429 };
430 let layer = CommitterLayer {
431 write_concurrency: Some(8),
432 collect_interval_ms: None,
433 watermark_interval_ms: Some(500),
434 };
435 let merged = layer.finish(base);
436 assert_eq!(merged.write_concurrency, 8);
437 // Unset fields inherit from `base`.
438 assert_eq!(merged.collect_interval_ms, 200);
439 assert_eq!(merged.watermark_interval_ms, 500);
440 }
441}