1pub mod balance;
21pub mod checkpoint_contents;
22pub mod checkpoint_seq_by_digest;
23pub mod checkpoint_summary;
24pub mod effects;
25pub mod epochs;
26pub mod event_bitmap;
27pub mod events;
28pub mod object_by_owner;
29pub mod object_by_type;
30pub mod object_version_by_checkpoint;
31pub mod objects;
32pub mod package_versions;
33pub mod primitives;
34pub mod pruning_watermark;
35pub mod transaction_bitmap;
36pub mod transactions;
37pub mod tx_metadata_by_seq;
38pub mod tx_seq_by_digest;
39pub mod type_filter;
40
41use std::collections::BTreeMap;
42
43use sui_consistent_store::CfDescriptor;
44use sui_consistent_store::CfOptionsResolver;
45use sui_consistent_store::CfTuning;
46use sui_consistent_store::Compression;
47use sui_consistent_store::Db;
48use sui_consistent_store::DbMap;
49use sui_consistent_store::DbWideConfig;
50use sui_consistent_store::RocksDbConfig;
51use sui_consistent_store::Schema;
52use sui_consistent_store::SchemaAtSnapshot;
53use sui_consistent_store::Snapshot;
54use sui_consistent_store::WriteStallConfig;
55use sui_consistent_store::error::OpenError;
56use sui_consistent_store::reader::Reader;
57
58pub struct RpcStoreSchema<R: Reader = Db> {
60 pub epochs: DbMap<epochs::Key, epochs::Value, R>,
63
64 pub checkpoint_summary: DbMap<checkpoint_summary::Key, checkpoint_summary::Value, R>,
68
69 pub checkpoint_contents: DbMap<checkpoint_contents::Key, checkpoint_contents::Value, R>,
72
73 pub checkpoint_seq_by_digest:
76 DbMap<checkpoint_seq_by_digest::Key, checkpoint_seq_by_digest::Value, R>,
77
78 pub transactions: DbMap<transactions::Key, transactions::Value, R>,
80
81 pub tx_seq_by_digest: DbMap<tx_seq_by_digest::Key, tx_seq_by_digest::Value, R>,
83
84 pub tx_metadata_by_seq: DbMap<tx_metadata_by_seq::Key, tx_metadata_by_seq::Value, R>,
88
89 pub effects: DbMap<effects::Key, effects::Value, R>,
92
93 pub events: DbMap<events::Key, events::Value, R>,
95
96 pub objects: DbMap<objects::Key, objects::Value, R>,
102
103 pub object_version_by_checkpoint:
110 DbMap<object_version_by_checkpoint::Key, object_version_by_checkpoint::Value, R>,
111
112 pub object_by_owner: DbMap<object_by_owner::Key, object_by_owner::Value, R>,
117
118 pub object_by_type: DbMap<object_by_type::Key, object_by_type::Value, R>,
121
122 pub balance: DbMap<balance::Key, balance::Value, R>,
127
128 pub package_versions: DbMap<package_versions::Key, package_versions::Value, R>,
131
132 pub transaction_bitmap: DbMap<transaction_bitmap::Key, transaction_bitmap::Value, R>,
136
137 pub event_bitmap: DbMap<event_bitmap::Key, event_bitmap::Value, R>,
141
142 pub pruning_watermark: DbMap<pruning_watermark::Key, pruning_watermark::Value, R>,
147}
148
149impl Schema for RpcStoreSchema {
150 fn cfs(opts: &CfOptionsResolver) -> Vec<CfDescriptor> {
151 vec![
152 CfDescriptor::new(epochs::NAME, epochs::options(opts)),
153 CfDescriptor::new(checkpoint_summary::NAME, checkpoint_summary::options(opts)),
154 CfDescriptor::new(
155 checkpoint_contents::NAME,
156 checkpoint_contents::options(opts),
157 ),
158 CfDescriptor::new(
159 checkpoint_seq_by_digest::NAME,
160 checkpoint_seq_by_digest::options(opts),
161 ),
162 CfDescriptor::new(transactions::NAME, transactions::options(opts)),
163 CfDescriptor::new(tx_seq_by_digest::NAME, tx_seq_by_digest::options(opts)),
164 CfDescriptor::new(tx_metadata_by_seq::NAME, tx_metadata_by_seq::options(opts)),
165 CfDescriptor::new(effects::NAME, effects::options(opts)),
166 CfDescriptor::new(events::NAME, events::options(opts)),
167 CfDescriptor::new(objects::NAME, objects::options(opts)),
168 CfDescriptor::new(
169 object_version_by_checkpoint::NAME,
170 object_version_by_checkpoint::options(opts),
171 ),
172 CfDescriptor::new(object_by_owner::NAME, object_by_owner::options(opts)),
173 CfDescriptor::new(object_by_type::NAME, object_by_type::options(opts)),
174 CfDescriptor::new(balance::NAME, balance::options(opts)),
175 CfDescriptor::new(package_versions::NAME, package_versions::options(opts)),
176 CfDescriptor::new(transaction_bitmap::NAME, transaction_bitmap::options(opts)),
177 CfDescriptor::new(event_bitmap::NAME, event_bitmap::options(opts)),
178 CfDescriptor::new(pruning_watermark::NAME, pruning_watermark::options(opts)),
179 ]
180 }
181
182 fn open(db: &Db) -> Result<Self, OpenError> {
183 Ok(Self {
184 epochs: DbMap::new(db.clone(), epochs::NAME)?,
185 checkpoint_summary: DbMap::new(db.clone(), checkpoint_summary::NAME)?,
186 checkpoint_contents: DbMap::new(db.clone(), checkpoint_contents::NAME)?,
187 checkpoint_seq_by_digest: DbMap::new(db.clone(), checkpoint_seq_by_digest::NAME)?,
188 transactions: DbMap::new(db.clone(), transactions::NAME)?,
189 tx_seq_by_digest: DbMap::new(db.clone(), tx_seq_by_digest::NAME)?,
190 tx_metadata_by_seq: DbMap::new(db.clone(), tx_metadata_by_seq::NAME)?,
191 effects: DbMap::new(db.clone(), effects::NAME)?,
192 events: DbMap::new(db.clone(), events::NAME)?,
193 objects: DbMap::new(db.clone(), objects::NAME)?,
194 object_version_by_checkpoint: DbMap::new(
195 db.clone(),
196 object_version_by_checkpoint::NAME,
197 )?,
198 object_by_owner: DbMap::new(db.clone(), object_by_owner::NAME)?,
199 object_by_type: DbMap::new(db.clone(), object_by_type::NAME)?,
200 balance: DbMap::new(db.clone(), balance::NAME)?,
201 package_versions: DbMap::new(db.clone(), package_versions::NAME)?,
202 transaction_bitmap: DbMap::new(db.clone(), transaction_bitmap::NAME)?,
203 event_bitmap: DbMap::new(db.clone(), event_bitmap::NAME)?,
204 pruning_watermark: DbMap::new(db.clone(), pruning_watermark::NAME)?,
205 })
206 }
207}
208
209impl SchemaAtSnapshot for RpcStoreSchema {
210 type At = RpcStoreSchema<Snapshot>;
211 fn at(&self, snap: &Snapshot) -> Self::At {
212 RpcStoreSchema {
213 epochs: self.epochs.at(snap),
214 checkpoint_summary: self.checkpoint_summary.at(snap),
215 checkpoint_contents: self.checkpoint_contents.at(snap),
216 checkpoint_seq_by_digest: self.checkpoint_seq_by_digest.at(snap),
217 transactions: self.transactions.at(snap),
218 tx_seq_by_digest: self.tx_seq_by_digest.at(snap),
219 tx_metadata_by_seq: self.tx_metadata_by_seq.at(snap),
220 effects: self.effects.at(snap),
221 events: self.events.at(snap),
222 objects: self.objects.at(snap),
223 object_version_by_checkpoint: self.object_version_by_checkpoint.at(snap),
224 object_by_owner: self.object_by_owner.at(snap),
225 object_by_type: self.object_by_type.at(snap),
226 balance: self.balance.at(snap),
227 package_versions: self.package_versions.at(snap),
228 transaction_bitmap: self.transaction_bitmap.at(snap),
229 event_bitmap: self.event_bitmap.at(snap),
230 pruning_watermark: self.pruning_watermark.at(snap),
231 }
232 }
233}
234
235pub fn default_rocksdb_config() -> RocksDbConfig {
247 let write_stall = WriteStallConfig {
248 soft_pending_compaction_bytes_limit_mb: Some(0),
249 hard_pending_compaction_bytes_limit_mb: Some(0),
250 level0_file_num_compaction_trigger: Some(4),
251 level0_slowdown_writes_trigger: Some(512),
252 level0_stop_writes_trigger: Some(1024),
253 };
254
255 let default_cf = CfTuning {
256 write_buffer_size_mb: Some(64),
257 max_write_buffer_number: Some(6),
258 compression: Some(Compression::Lz4),
259 bottommost_compression: Some(Compression::Zstd),
260 block_size_kb: Some(16),
261 bloom_filter_bits: None,
262 memtable_prefix_bloom_ratio: Some(0.02),
263 target_file_size_mb: Some(128),
264 write_stall,
265 };
266
267 let mut column_family = BTreeMap::new();
268
269 let point_lookup = CfTuning {
272 bloom_filter_bits: Some(10.0),
273 ..Default::default()
274 };
275 for name in [tx_seq_by_digest::NAME, checkpoint_seq_by_digest::NAME] {
276 column_family.insert(name.to_string(), point_lookup.clone());
277 }
278
279 let bitmap = CfTuning {
282 write_buffer_size_mb: Some(256),
283 ..Default::default()
284 };
285 for name in [transaction_bitmap::NAME, event_bitmap::NAME] {
286 column_family.insert(name.to_string(), bitmap.clone());
287 }
288
289 RocksDbConfig {
290 db: DbWideConfig {
291 parallelism: Some(8),
292 max_background_jobs: None,
293 max_open_files: fdlimit::raise_fd_limit()
302 .map(|limit| (limit / 8).try_into().unwrap_or(i32::MAX)),
303 db_write_buffer_size_mb: Some(1024),
304 max_total_wal_size_mb: Some(1024),
305 enable_pipelined_write: Some(true),
306 table_cache_num_shard_bits: Some(10),
307 block_cache_size_mb: Some(1024),
308 },
309 default_cf,
310 column_family,
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use sui_consistent_store::Db;
317 use sui_consistent_store::DbOptions;
318 use sui_types::base_types::ObjectID;
319 use sui_types::base_types::SequenceNumber;
320
321 use super::*;
322
323 #[test]
324 fn opens_with_all_cfs() {
325 let dir = tempfile::tempdir().unwrap();
326 let (_db, schema) = Db::open::<RpcStoreSchema>(dir.path(), DbOptions::default()).unwrap();
327 assert!(
331 schema
332 .objects
333 .get(&objects::Key {
334 id: ObjectID::ZERO,
335 version: SequenceNumber::from_u64(0),
336 })
337 .unwrap()
338 .is_none()
339 );
340 assert!(
341 schema
342 .pruning_watermark
343 .get(&primitives::UnitKey)
344 .unwrap()
345 .is_none()
346 );
347 }
348
349 #[test]
350 fn default_config_validates() {
351 default_rocksdb_config()
352 .validate()
353 .expect("shipped default config must be internally consistent");
354 }
355
356 #[test]
357 fn default_config_opens_every_cf() {
358 let dir = tempfile::tempdir().unwrap();
362 let opts = DbOptions {
363 rocksdb: default_rocksdb_config(),
364 snapshot_capacity: 32,
365 };
366 let (_db, schema) = Db::open::<RpcStoreSchema>(dir.path(), opts).unwrap();
367 assert!(
368 schema
369 .pruning_watermark
370 .get(&primitives::UnitKey)
371 .unwrap()
372 .is_none()
373 );
374 }
375
376 #[test]
377 fn default_config_sets_per_cf_deviations() {
378 let cfg = default_rocksdb_config();
379 assert_eq!(
381 cfg.column_family[tx_seq_by_digest::NAME].bloom_filter_bits,
382 Some(10.0)
383 );
384 assert_eq!(
385 cfg.column_family[checkpoint_seq_by_digest::NAME].bloom_filter_bits,
386 Some(10.0)
387 );
388 assert_eq!(
390 cfg.column_family[transaction_bitmap::NAME].write_buffer_size_mb,
391 Some(256)
392 );
393 }
394}