sui_rpc_store/schema/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Column-family layout for `sui-rpc-store`.
5//!
6//! Each CF lives in its own submodule that declares:
7//!
8//! - `NAME` — the on-disk column-family name.
9//! - `Key` — the key type with `Encode` / `Decode` pinning its
10//!   on-disk layout.
11//! - `Value` — the value type, typically `Protobuf<…>`.
12//! - `options(resolver)` — per-CF `rocksdb::Options`, obtained from
13//!   the [`CfOptionsResolver`] with the CF's merge operator and
14//!   compaction filter (if any) layered on top.
15//!
16//! [`RpcStoreSchema`] aggregates these into the schema passed to
17//! [`sui_consistent_store::Db::open`]. Keys reused across multiple
18//! CFs live in [`primitives`].
19
20pub mod balance;
21pub mod checkpoint_contents;
22pub mod checkpoint_seq_by_digest;
23pub mod checkpoint_summary;
24pub mod effects;
25pub mod epochs;
26pub mod event_bitmap;
27pub mod events;
28pub mod object_by_owner;
29pub mod object_by_type;
30pub mod object_version_by_checkpoint;
31pub mod objects;
32pub mod package_versions;
33pub mod primitives;
34pub mod pruning_watermark;
35pub mod transaction_bitmap;
36pub mod transactions;
37pub mod tx_metadata_by_seq;
38pub mod tx_seq_by_digest;
39pub mod type_filter;
40
41use std::collections::BTreeMap;
42
43use sui_consistent_store::CfDescriptor;
44use sui_consistent_store::CfOptionsResolver;
45use sui_consistent_store::CfTuning;
46use sui_consistent_store::Compression;
47use sui_consistent_store::Db;
48use sui_consistent_store::DbMap;
49use sui_consistent_store::DbWideConfig;
50use sui_consistent_store::RocksDbConfig;
51use sui_consistent_store::Schema;
52use sui_consistent_store::SchemaAtSnapshot;
53use sui_consistent_store::Snapshot;
54use sui_consistent_store::WriteStallConfig;
55use sui_consistent_store::error::OpenError;
56use sui_consistent_store::reader::Reader;
57
58/// Typed handles to every CF in the `sui-rpc-store` layout.
59pub struct RpcStoreSchema<R: Reader = Db> {
60    /// Per-epoch metadata: protocol version, gas price, start and
61    /// end timestamps, and the epoch's final checkpoint.
62    pub epochs: DbMap<epochs::Key, epochs::Value, R>,
63
64    /// Signed checkpoint headers. The lightweight metadata served
65    /// by most "fetch a checkpoint" requests; the heavier contents
66    /// list lives in a separate CF.
67    pub checkpoint_summary: DbMap<checkpoint_summary::Key, checkpoint_summary::Value, R>,
68
69    /// The ordered list of executed transaction digests in each
70    /// checkpoint.
71    pub checkpoint_contents: DbMap<checkpoint_contents::Key, checkpoint_contents::Value, R>,
72
73    /// Resolves a checkpoint digest to its sequence number, which
74    /// is then the key for every other checkpoint-keyed CF.
75    pub checkpoint_seq_by_digest:
76        DbMap<checkpoint_seq_by_digest::Key, checkpoint_seq_by_digest::Value, R>,
77
78    /// Signed transactions, keyed by their assigned tx_seq.
79    pub transactions: DbMap<transactions::Key, transactions::Value, R>,
80
81    /// Resolves a transaction digest to its assigned tx_seq.
82    pub tx_seq_by_digest: DbMap<tx_seq_by_digest::Key, tx_seq_by_digest::Value, R>,
83
84    /// Per-transaction metadata: digest, the containing
85    /// checkpoint, position within that checkpoint, event count,
86    /// and the checkpoint's timestamp.
87    pub tx_metadata_by_seq: DbMap<tx_metadata_by_seq::Key, tx_metadata_by_seq::Value, R>,
88
89    /// The effects produced by each transaction, together with the
90    /// set of objects loaded but unchanged during execution.
91    pub effects: DbMap<effects::Key, effects::Value, R>,
92
93    /// The events emitted by each transaction.
94    pub events: DbMap<events::Key, events::Value, R>,
95
96    /// Every version of every object that has ever existed. A
97    /// prefix scan on the object id walks all versions in ascending
98    /// order; a reverse prefix scan resolves the latest version (the
99    /// greatest `(id, version)` row), the way the validator perpetual
100    /// store serves "latest object" reads.
101    pub objects: DbMap<objects::Key, objects::Value, R>,
102
103    /// An object's version as of a checkpoint: keyed by
104    /// `(object id, checkpoint)`, a reverse prefix scan resolves the
105    /// version live at the end of the most recent checkpoint, at or
106    /// before the one queried, in which the object changed. Backs
107    /// checkpoint-pinned historical reads that the version-keyed
108    /// `objects` CF cannot answer.
109    pub object_version_by_checkpoint:
110        DbMap<object_version_by_checkpoint::Key, object_version_by_checkpoint::Value, R>,
111
112    /// Supports listing an owner's objects, optionally filtered by
113    /// Move type. Coin-like objects sort richest-first within
114    /// each `(owner, type)` group so paginating valuable holdings
115    /// is a forward prefix scan.
116    pub object_by_owner: DbMap<object_by_owner::Key, object_by_owner::Value, R>,
117
118    /// Supports listing every live object of a given Move type,
119    /// regardless of owner.
120    pub object_by_type: DbMap<object_by_type::Key, object_by_type::Value, R>,
121
122    /// Tracks an account's balance per coin type, combining the
123    /// coin-derived component (sum of owned `Coin<T>` balances)
124    /// and the accumulator-derived component into a single row
125    /// merged from independent indexer pipelines.
126    pub balance: DbMap<balance::Key, balance::Value, R>,
127
128    /// Tracks every published version of a Move package and the
129    /// storage id under which each version lives.
130    pub package_versions: DbMap<package_versions::Key, package_versions::Value, R>,
131
132    /// Inverted bitmap index over transaction-sequence space,
133    /// supporting filtered transaction queries by indexed fields
134    /// such as sender, called function, or input/changed object.
135    pub transaction_bitmap: DbMap<transaction_bitmap::Key, transaction_bitmap::Value, R>,
136
137    /// Inverted bitmap index over packed event-sequence space,
138    /// supporting filtered event queries by event type, emitting
139    /// module, sender, and similar indexed fields.
140    pub event_bitmap: DbMap<event_bitmap::Key, event_bitmap::Value, R>,
141
142    // --- Bookkeeping ---
143    /// Singleton holding the lowest still-available `tx_seq`,
144    /// `checkpoint_seq`, and object version. Drives compaction
145    /// filters and feeds `available_range` responses.
146    pub pruning_watermark: DbMap<pruning_watermark::Key, pruning_watermark::Value, R>,
147}
148
149impl Schema for RpcStoreSchema {
150    fn cfs(opts: &CfOptionsResolver) -> Vec<CfDescriptor> {
151        vec![
152            CfDescriptor::new(epochs::NAME, epochs::options(opts)),
153            CfDescriptor::new(checkpoint_summary::NAME, checkpoint_summary::options(opts)),
154            CfDescriptor::new(
155                checkpoint_contents::NAME,
156                checkpoint_contents::options(opts),
157            ),
158            CfDescriptor::new(
159                checkpoint_seq_by_digest::NAME,
160                checkpoint_seq_by_digest::options(opts),
161            ),
162            CfDescriptor::new(transactions::NAME, transactions::options(opts)),
163            CfDescriptor::new(tx_seq_by_digest::NAME, tx_seq_by_digest::options(opts)),
164            CfDescriptor::new(tx_metadata_by_seq::NAME, tx_metadata_by_seq::options(opts)),
165            CfDescriptor::new(effects::NAME, effects::options(opts)),
166            CfDescriptor::new(events::NAME, events::options(opts)),
167            CfDescriptor::new(objects::NAME, objects::options(opts)),
168            CfDescriptor::new(
169                object_version_by_checkpoint::NAME,
170                object_version_by_checkpoint::options(opts),
171            ),
172            CfDescriptor::new(object_by_owner::NAME, object_by_owner::options(opts)),
173            CfDescriptor::new(object_by_type::NAME, object_by_type::options(opts)),
174            CfDescriptor::new(balance::NAME, balance::options(opts)),
175            CfDescriptor::new(package_versions::NAME, package_versions::options(opts)),
176            CfDescriptor::new(transaction_bitmap::NAME, transaction_bitmap::options(opts)),
177            CfDescriptor::new(event_bitmap::NAME, event_bitmap::options(opts)),
178            CfDescriptor::new(pruning_watermark::NAME, pruning_watermark::options(opts)),
179        ]
180    }
181
182    fn open(db: &Db) -> Result<Self, OpenError> {
183        Ok(Self {
184            epochs: DbMap::new(db.clone(), epochs::NAME)?,
185            checkpoint_summary: DbMap::new(db.clone(), checkpoint_summary::NAME)?,
186            checkpoint_contents: DbMap::new(db.clone(), checkpoint_contents::NAME)?,
187            checkpoint_seq_by_digest: DbMap::new(db.clone(), checkpoint_seq_by_digest::NAME)?,
188            transactions: DbMap::new(db.clone(), transactions::NAME)?,
189            tx_seq_by_digest: DbMap::new(db.clone(), tx_seq_by_digest::NAME)?,
190            tx_metadata_by_seq: DbMap::new(db.clone(), tx_metadata_by_seq::NAME)?,
191            effects: DbMap::new(db.clone(), effects::NAME)?,
192            events: DbMap::new(db.clone(), events::NAME)?,
193            objects: DbMap::new(db.clone(), objects::NAME)?,
194            object_version_by_checkpoint: DbMap::new(
195                db.clone(),
196                object_version_by_checkpoint::NAME,
197            )?,
198            object_by_owner: DbMap::new(db.clone(), object_by_owner::NAME)?,
199            object_by_type: DbMap::new(db.clone(), object_by_type::NAME)?,
200            balance: DbMap::new(db.clone(), balance::NAME)?,
201            package_versions: DbMap::new(db.clone(), package_versions::NAME)?,
202            transaction_bitmap: DbMap::new(db.clone(), transaction_bitmap::NAME)?,
203            event_bitmap: DbMap::new(db.clone(), event_bitmap::NAME)?,
204            pruning_watermark: DbMap::new(db.clone(), pruning_watermark::NAME)?,
205        })
206    }
207}
208
209impl SchemaAtSnapshot for RpcStoreSchema {
210    type At = RpcStoreSchema<Snapshot>;
211    fn at(&self, snap: &Snapshot) -> Self::At {
212        RpcStoreSchema {
213            epochs: self.epochs.at(snap),
214            checkpoint_summary: self.checkpoint_summary.at(snap),
215            checkpoint_contents: self.checkpoint_contents.at(snap),
216            checkpoint_seq_by_digest: self.checkpoint_seq_by_digest.at(snap),
217            transactions: self.transactions.at(snap),
218            tx_seq_by_digest: self.tx_seq_by_digest.at(snap),
219            tx_metadata_by_seq: self.tx_metadata_by_seq.at(snap),
220            effects: self.effects.at(snap),
221            events: self.events.at(snap),
222            objects: self.objects.at(snap),
223            object_version_by_checkpoint: self.object_version_by_checkpoint.at(snap),
224            object_by_owner: self.object_by_owner.at(snap),
225            object_by_type: self.object_by_type.at(snap),
226            balance: self.balance.at(snap),
227            package_versions: self.package_versions.at(snap),
228            transaction_bitmap: self.transaction_bitmap.at(snap),
229            event_bitmap: self.event_bitmap.at(snap),
230            pruning_watermark: self.pruning_watermark.at(snap),
231        }
232    }
233}
234
235/// The tuned [`RocksDbConfig`] this crate ships as its baseline for
236/// the `sui-rpc-store` column families.
237///
238/// Operators layer their own overrides on top via
239/// [`RocksDbConfig::merge_over`]; anything they leave unset falls back
240/// to these values. The defaults port the production-proven settings
241/// from `typed_store` and bake in a "no write stalls, generous
242/// compaction parallelism" policy: the pending-compaction stall limits
243/// are disabled and the L0 triggers raised so neither the bulk restore
244/// nor steady-state indexing throttles on compaction debt, while the
245/// L0 stop trigger still bounds a runaway backlog.
246pub fn default_rocksdb_config() -> RocksDbConfig {
247    let write_stall = WriteStallConfig {
248        soft_pending_compaction_bytes_limit_mb: Some(0),
249        hard_pending_compaction_bytes_limit_mb: Some(0),
250        level0_file_num_compaction_trigger: Some(4),
251        level0_slowdown_writes_trigger: Some(512),
252        level0_stop_writes_trigger: Some(1024),
253    };
254
255    let default_cf = CfTuning {
256        write_buffer_size_mb: Some(64),
257        max_write_buffer_number: Some(6),
258        compression: Some(Compression::Lz4),
259        bottommost_compression: Some(Compression::Zstd),
260        block_size_kb: Some(16),
261        bloom_filter_bits: None,
262        memtable_prefix_bloom_ratio: Some(0.02),
263        target_file_size_mb: Some(128),
264        write_stall,
265    };
266
267    let mut column_family = BTreeMap::new();
268
269    // Point-lookup CFs: a whole-key bloom filter lets reads skip SSTs
270    // that cannot contain the requested key.
271    let point_lookup = CfTuning {
272        bloom_filter_bits: Some(10.0),
273        ..Default::default()
274    };
275    for name in [tx_seq_by_digest::NAME, checkpoint_seq_by_digest::NAME] {
276        column_family.insert(name.to_string(), point_lookup.clone());
277    }
278
279    // Bitmap CFs accumulate large merge-blob values; a bigger memtable
280    // amortizes the merge-and-flush churn.
281    let bitmap = CfTuning {
282        write_buffer_size_mb: Some(256),
283        ..Default::default()
284    };
285    for name in [transaction_bitmap::NAME, event_bitmap::NAME] {
286        column_family.insert(name.to_string(), bitmap.clone());
287    }
288
289    RocksDbConfig {
290        db: DbWideConfig {
291            parallelism: Some(8),
292            max_background_jobs: None,
293            // RocksDB's default (`-1`) keeps every SST open, which
294            // exhausts the process file-descriptor budget on a
295            // large DB (a formal-snapshot restore writes thousands
296            // of SSTs and fails with "Too many open files"). Mirror
297            // `typed_store::default_db_options`: raise the fd limit
298            // toward the hard cap and bound the table cache to an
299            // eighth of it. `None` on platforms without the syscall
300            // (e.g. Windows), leaving the RocksDB default.
301            max_open_files: fdlimit::raise_fd_limit()
302                .map(|limit| (limit / 8).try_into().unwrap_or(i32::MAX)),
303            db_write_buffer_size_mb: Some(1024),
304            max_total_wal_size_mb: Some(1024),
305            enable_pipelined_write: Some(true),
306            table_cache_num_shard_bits: Some(10),
307            block_cache_size_mb: Some(1024),
308        },
309        default_cf,
310        column_family,
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use sui_consistent_store::Db;
317    use sui_consistent_store::DbOptions;
318    use sui_types::base_types::ObjectID;
319    use sui_types::base_types::SequenceNumber;
320
321    use super::*;
322
323    #[test]
324    fn opens_with_all_cfs() {
325        let dir = tempfile::tempdir().unwrap();
326        let (_db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
327        // Empty database — every typed handle is constructed; a
328        // miss on any of them returns None instead of an open-time
329        // missing-CF error.
330        assert!(
331            schema
332                .objects
333                .get(&objects::Key {
334                    id: ObjectID::ZERO,
335                    version: SequenceNumber::from_u64(0),
336                })
337                .unwrap()
338                .is_none()
339        );
340        assert!(
341            schema
342                .pruning_watermark
343                .get(&primitives::UnitKey)
344                .unwrap()
345                .is_none()
346        );
347    }
348
349    #[test]
350    fn default_config_validates() {
351        default_rocksdb_config()
352            .validate()
353            .expect("shipped default config must be internally consistent");
354    }
355
356    #[test]
357    fn default_config_opens_every_cf() {
358        // Exercises the full resolve path: validation, the shared
359        // block cache, and every CF's merge operator / compaction
360        // filter layered on the tuned per-CF options.
361        let dir = tempfile::tempdir().unwrap();
362        let opts = DbOptions {
363            rocksdb: default_rocksdb_config(),
364            snapshot_capacity: 32,
365        };
366        let (_db, schema) = Db::open::<RpcStoreSchema>(dir.path(), opts).unwrap();
367        assert!(
368            schema
369                .pruning_watermark
370                .get(&primitives::UnitKey)
371                .unwrap()
372                .is_none()
373        );
374    }
375
376    #[test]
377    fn default_config_sets_per_cf_deviations() {
378        let cfg = default_rocksdb_config();
379        // Point-lookup CFs get a bloom filter.
380        assert_eq!(
381            cfg.column_family[tx_seq_by_digest::NAME].bloom_filter_bits,
382            Some(10.0)
383        );
384        assert_eq!(
385            cfg.column_family[checkpoint_seq_by_digest::NAME].bloom_filter_bits,
386            Some(10.0)
387        );
388        // Bitmap CFs get a larger write buffer.
389        assert_eq!(
390            cfg.column_family[transaction_bitmap::NAME].write_buffer_size_mb,
391            Some(256)
392        );
393    }
394}