typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, ensure_database_type, iterator_bounds, iterator_bounds_with_range,
21};
22use crate::{DbIterator, StorageType, TypedStoreError};
23use crate::{
24    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
25    traits::{Map, TableSummary},
26};
27use backoff::backoff::Backoff;
28use fastcrypto::hash::{Digest, HashFunction};
29use mysten_common::debug_fatal;
30use mysten_metrics::RegistryID;
31use prometheus::{Histogram, HistogramTimer};
32use rocksdb::properties::num_files_at_level;
33use rocksdb::{
34    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
35    properties,
36};
37use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
38use serde::{Serialize, de::DeserializeOwned};
39use std::ops::{Bound, Deref};
40use std::{
41    borrow::Borrow,
42    marker::PhantomData,
43    ops::RangeBounds,
44    path::{Path, PathBuf},
45    sync::{Arc, OnceLock},
46    time::Duration,
47};
48use std::{collections::HashSet, ffi::CStr};
49use sui_macros::{fail_point, nondeterministic};
50#[cfg(tidehunter)]
51use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
52use tokio::sync::oneshot;
53use tracing::{debug, error, instrument, warn};
54
55// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
56// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
57const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
58    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
59
60static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
61
62fn write_sync_enabled() -> bool {
63    *WRITE_SYNC_ENABLED
64        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
65}
66
67/// Initialize the write sync setting from config.
68/// Must be called before any database writes occur.
69pub fn init_write_sync(enabled: Option<bool>) {
70    if let Some(value) = enabled {
71        let _ = WRITE_SYNC_ENABLED.set(value);
72    }
73}
74
75#[cfg(test)]
76mod tests;
77
78#[derive(Debug)]
79pub struct RocksDB {
80    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
81}
82
83impl Drop for RocksDB {
84    fn drop(&mut self) {
85        self.underlying.cancel_all_background_work(/* wait */ true);
86    }
87}
88
89#[derive(Clone)]
90pub enum ColumnFamily {
91    Rocks(String),
92    InMemory(String),
93    #[cfg(tidehunter)]
94    TideHunter((KeySpace, Option<Vec<u8>>)),
95}
96
97impl std::fmt::Debug for ColumnFamily {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self {
100            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
101            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
102            #[cfg(tidehunter)]
103            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
104        }
105    }
106}
107
108impl ColumnFamily {
109    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
110        match &self {
111            ColumnFamily::Rocks(name) => rocks_db
112                .underlying
113                .cf_handle(name)
114                .expect("Map-keying column family should have been checked at DB creation"),
115            _ => unreachable!("invariant is checked by the caller"),
116        }
117    }
118}
119
120pub enum Storage {
121    Rocks(RocksDB),
122    InMemory(InMemoryDB),
123    #[cfg(tidehunter)]
124    TideHunter(Arc<TideHunterDb>),
125}
126
127impl std::fmt::Debug for Storage {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
131            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
132            #[cfg(tidehunter)]
133            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
134        }
135    }
136}
137
138#[derive(Debug)]
139pub struct Database {
140    storage: Storage,
141    metric_conf: MetricConf,
142    registry_id: Option<RegistryID>,
143}
144
145impl Drop for Database {
146    fn drop(&mut self) {
147        let metrics = DBMetrics::get();
148        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
149        if let Some(registry_id) = self.registry_id {
150            metrics.registry_serivce.remove(registry_id);
151        }
152    }
153}
154
155enum GetResult<'a> {
156    Rocks(DBPinnableSlice<'a>),
157    InMemory(Vec<u8>),
158    #[cfg(tidehunter)]
159    TideHunter(tidehunter::minibytes::Bytes),
160}
161
162impl Deref for GetResult<'_> {
163    type Target = [u8];
164    fn deref(&self) -> &[u8] {
165        match self {
166            GetResult::Rocks(d) => d.deref(),
167            GetResult::InMemory(d) => d.deref(),
168            #[cfg(tidehunter)]
169            GetResult::TideHunter(d) => d.deref(),
170        }
171    }
172}
173
174impl Database {
175    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
176        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
177        Self {
178            storage,
179            metric_conf,
180            registry_id,
181        }
182    }
183
184    /// Flush all memtables to SST files on disk.
185    pub fn flush(&self) -> Result<(), TypedStoreError> {
186        match &self.storage {
187            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
188                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
189            }),
190            Storage::InMemory(_) => {
191                // InMemory databases don't need flushing
192                Ok(())
193            }
194            #[cfg(tidehunter)]
195            Storage::TideHunter(_) => {
196                // TideHunter doesn't support an explicit flush.
197                Ok(())
198            }
199        }
200    }
201
202    fn get<K: AsRef<[u8]>>(
203        &self,
204        cf: &ColumnFamily,
205        key: K,
206        readopts: &ReadOptions,
207    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
208        match (&self.storage, cf) {
209            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
210                .underlying
211                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
212                .map_err(typed_store_err_from_rocks_err)?
213                .map(GetResult::Rocks)),
214            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
215                Ok(db.get(cf_name, key).map(GetResult::InMemory))
216            }
217            #[cfg(tidehunter)]
218            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
219                .get(*ks, &transform_th_key(key.as_ref(), prefix))
220                .map_err(typed_store_error_from_th_error)?
221                .map(GetResult::TideHunter)),
222
223            _ => Err(TypedStoreError::RocksDBError(
224                "typed store invariant violation".to_string(),
225            )),
226        }
227    }
228
229    fn multi_get<I, K>(
230        &self,
231        cf: &ColumnFamily,
232        keys: I,
233        readopts: &ReadOptions,
234    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
235    where
236        I: IntoIterator<Item = K>,
237        K: AsRef<[u8]>,
238    {
239        match (&self.storage, cf) {
240            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
241                let keys_vec: Vec<K> = keys.into_iter().collect();
242                let res = db.underlying.batched_multi_get_cf_opt(
243                    &cf.rocks_cf(db),
244                    keys_vec.iter(),
245                    /* sorted_input */ false,
246                    readopts,
247                );
248                res.into_iter()
249                    .map(|r| {
250                        r.map_err(typed_store_err_from_rocks_err)
251                            .map(|item| item.map(GetResult::Rocks))
252                    })
253                    .collect()
254            }
255            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
256                .multi_get(cf_name, keys)
257                .into_iter()
258                .map(|r| Ok(r.map(GetResult::InMemory)))
259                .collect(),
260            #[cfg(tidehunter)]
261            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
262                let res = keys.into_iter().map(|k| {
263                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
264                        .map_err(typed_store_error_from_th_error)
265                });
266                res.into_iter()
267                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
268                    .collect()
269            }
270            _ => unreachable!("typed store invariant violation"),
271        }
272    }
273
274    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
275        match &self.storage {
276            Storage::Rocks(db) => db.underlying.drop_cf(name),
277            Storage::InMemory(db) => {
278                db.drop_cf(name);
279                Ok(())
280            }
281            #[cfg(tidehunter)]
282            Storage::TideHunter(_) => {
283                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
284            }
285        }
286    }
287
288    pub fn delete_file_in_range<K: AsRef<[u8]>>(
289        &self,
290        cf: &impl AsColumnFamilyRef,
291        from: K,
292        to: K,
293    ) -> Result<(), rocksdb::Error> {
294        match &self.storage {
295            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
296            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
297        }
298    }
299
300    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
301        fail_point!("delete-cf-before");
302        let ret = match (&self.storage, cf) {
303            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
304                .underlying
305                .delete_cf(&cf.rocks_cf(db), key)
306                .map_err(typed_store_err_from_rocks_err),
307            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
308                db.delete(cf_name, key.as_ref());
309                Ok(())
310            }
311            #[cfg(tidehunter)]
312            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
313                .remove(*ks, transform_th_key(key.as_ref(), prefix))
314                .map_err(typed_store_error_from_th_error),
315            _ => Err(TypedStoreError::RocksDBError(
316                "typed store invariant violation".to_string(),
317            )),
318        };
319        fail_point!("delete-cf-after");
320        #[allow(clippy::let_and_return)]
321        ret
322    }
323
324    pub fn path_for_pruning(&self) -> &Path {
325        match &self.storage {
326            Storage::Rocks(rocks) => rocks.underlying.path(),
327            _ => unimplemented!("method is only supported for rocksdb backend"),
328        }
329    }
330
331    fn put_cf(
332        &self,
333        cf: &ColumnFamily,
334        key: Vec<u8>,
335        value: Vec<u8>,
336    ) -> Result<(), TypedStoreError> {
337        fail_point!("put-cf-before");
338        let ret = match (&self.storage, cf) {
339            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
340                .underlying
341                .put_cf(&cf.rocks_cf(db), key, value)
342                .map_err(typed_store_err_from_rocks_err),
343            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
344                db.put(cf_name, key, value);
345                Ok(())
346            }
347            #[cfg(tidehunter)]
348            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
349                .insert(*ks, transform_th_key(&key, prefix), value)
350                .map_err(typed_store_error_from_th_error),
351            _ => Err(TypedStoreError::RocksDBError(
352                "typed store invariant violation".to_string(),
353            )),
354        };
355        fail_point!("put-cf-after");
356        #[allow(clippy::let_and_return)]
357        ret
358    }
359
360    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
361        &self,
362        cf_name: &str,
363        key: K,
364        readopts: &ReadOptions,
365    ) -> bool {
366        match &self.storage {
367            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
368            // but no false negatives. We use it to short-circuit the absent case
369            Storage::Rocks(rocks) => {
370                rocks
371                    .underlying
372                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
373            }
374            _ => true,
375        }
376    }
377
378    pub(crate) fn write_opt_internal(
379        &self,
380        batch: StorageWriteBatch,
381        write_options: &rocksdb::WriteOptions,
382    ) -> Result<(), TypedStoreError> {
383        fail_point!("batch-write-before");
384        let ret = match (&self.storage, batch) {
385            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
386                .underlying
387                .write_opt(batch, write_options)
388                .map_err(typed_store_err_from_rocks_err),
389            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
390                // InMemory doesn't support write options
391                db.write(batch);
392                Ok(())
393            }
394            #[cfg(tidehunter)]
395            (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
396                // TideHunter doesn't support write options
397                db.write_batch(batch)
398                    .map_err(typed_store_error_from_th_error)
399            }
400            _ => Err(TypedStoreError::RocksDBError(
401                "using invalid batch type for the database".to_string(),
402            )),
403        };
404        fail_point!("batch-write-after");
405        #[allow(clippy::let_and_return)]
406        ret
407    }
408
409    #[cfg(tidehunter)]
410    pub fn start_relocation(&self) -> anyhow::Result<()> {
411        if let Storage::TideHunter(db) = &self.storage {
412            db.start_relocation()?;
413        }
414        Ok(())
415    }
416
417    #[cfg(tidehunter)]
418    pub fn drop_cells_in_range(
419        &self,
420        ks: KeySpace,
421        from_inclusive: &[u8],
422        to_inclusive: &[u8],
423    ) -> anyhow::Result<()> {
424        if let Storage::TideHunter(db) = &self.storage {
425            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
426                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
427        } else {
428            panic!("drop_cells_in_range called on non-TideHunter storage");
429        }
430        Ok(())
431    }
432
433    pub fn compact_range_cf<K: AsRef<[u8]>>(
434        &self,
435        cf_name: &str,
436        start: Option<K>,
437        end: Option<K>,
438    ) {
439        if let Storage::Rocks(rocksdb) = &self.storage {
440            rocksdb
441                .underlying
442                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
443        }
444    }
445
446    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
447        // TODO: implement for other storage types
448        if let Storage::Rocks(rocks) = &self.storage {
449            let checkpoint =
450                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
451            checkpoint
452                .create_checkpoint(path)
453                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
454        }
455        Ok(())
456    }
457
458    pub fn get_sampling_interval(&self) -> SamplingInterval {
459        self.metric_conf.read_sample_interval.new_from_self()
460    }
461
462    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
463        self.metric_conf.read_sample_interval.new_from_self()
464    }
465
466    pub fn write_sampling_interval(&self) -> SamplingInterval {
467        self.metric_conf.write_sample_interval.new_from_self()
468    }
469
470    pub fn iter_sampling_interval(&self) -> SamplingInterval {
471        self.metric_conf.iter_sample_interval.new_from_self()
472    }
473
474    fn db_name(&self) -> String {
475        let name = &self.metric_conf.db_name;
476        if name.is_empty() {
477            "default".to_string()
478        } else {
479            name.clone()
480        }
481    }
482
483    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
484        match &self.storage {
485            Storage::Rocks(rocks) => rocks.underlying.live_files(),
486            _ => Ok(vec![]),
487        }
488    }
489}
490
491fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
492    rocks_db
493        .underlying
494        .cf_handle(cf_name)
495        .expect("Map-keying column family should have been checked at DB creation")
496}
497
498fn rocks_cf_from_db<'a>(
499    db: &'a Database,
500    cf_name: &str,
501) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
502    match &db.storage {
503        Storage::Rocks(rocksdb) => Ok(rocksdb
504            .underlying
505            .cf_handle(cf_name)
506            .expect("Map-keying column family should have been checked at DB creation")),
507        _ => Err(TypedStoreError::RocksDBError(
508            "using invalid batch type for the database".to_string(),
509        )),
510    }
511}
512
513#[derive(Debug, Default)]
514pub struct MetricConf {
515    pub db_name: String,
516    pub read_sample_interval: SamplingInterval,
517    pub write_sample_interval: SamplingInterval,
518    pub iter_sample_interval: SamplingInterval,
519}
520
521impl MetricConf {
522    pub fn new(db_name: &str) -> Self {
523        if db_name.is_empty() {
524            error!("A meaningful db name should be used for metrics reporting.")
525        }
526        Self {
527            db_name: db_name.to_string(),
528            read_sample_interval: SamplingInterval::default(),
529            write_sample_interval: SamplingInterval::default(),
530            iter_sample_interval: SamplingInterval::default(),
531        }
532    }
533
534    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
535        Self {
536            db_name: self.db_name,
537            read_sample_interval: read_interval,
538            write_sample_interval: SamplingInterval::default(),
539            iter_sample_interval: SamplingInterval::default(),
540        }
541    }
542}
543const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
544const METRICS_ERROR: i64 = -1;
545
546/// An interface to a rocksDB database, keyed by a columnfamily
547#[derive(Clone, Debug)]
548pub struct DBMap<K, V> {
549    pub db: Arc<Database>,
550    _phantom: PhantomData<fn(K) -> V>,
551    column_family: ColumnFamily,
552    // the column family under which the map is stored
553    cf: String,
554    pub opts: ReadWriteOptions,
555    db_metrics: Arc<DBMetrics>,
556    get_sample_interval: SamplingInterval,
557    multiget_sample_interval: SamplingInterval,
558    write_sample_interval: SamplingInterval,
559    iter_sample_interval: SamplingInterval,
560    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
561}
562
563unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
564
565impl<K, V> DBMap<K, V> {
566    pub(crate) fn new(
567        db: Arc<Database>,
568        opts: &ReadWriteOptions,
569        opt_cf: &str,
570        column_family: ColumnFamily,
571        is_deprecated: bool,
572    ) -> Self {
573        let db_cloned = Arc::downgrade(&db.clone());
574        let db_metrics = DBMetrics::get();
575        let db_metrics_cloned = db_metrics.clone();
576        let cf = opt_cf.to_string();
577
578        let (sender, mut recv) = tokio::sync::oneshot::channel();
579        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
580            tokio::task::spawn(async move {
581                let mut interval =
582                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
583                loop {
584                    tokio::select! {
585                        _ = interval.tick() => {
586                            if let Some(db) = db_cloned.upgrade() {
587                                let cf = cf.clone();
588                                let db_metrics = db_metrics.clone();
589                                if let Err(e) = tokio::task::spawn_blocking(move || {
590                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
591                                }).await {
592                                    error!("Failed to log metrics with error: {}", e);
593                                }
594                            }
595                        }
596                        _ = &mut recv => break,
597                    }
598                }
599                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
600            });
601        }
602        DBMap {
603            db: db.clone(),
604            opts: opts.clone(),
605            _phantom: PhantomData,
606            column_family,
607            cf: opt_cf.to_string(),
608            db_metrics: db_metrics_cloned,
609            _metrics_task_cancel_handle: Arc::new(sender),
610            get_sample_interval: db.get_sampling_interval(),
611            multiget_sample_interval: db.multiget_sampling_interval(),
612            write_sample_interval: db.write_sampling_interval(),
613            iter_sample_interval: db.iter_sampling_interval(),
614        }
615    }
616
617    /// Reopens an open database as a typed map operating under a specific column family.
618    /// if no column family is passed, the default column family is used.
619    #[instrument(level = "debug", skip(db), err)]
620    pub fn reopen(
621        db: &Arc<Database>,
622        opt_cf: Option<&str>,
623        rw_options: &ReadWriteOptions,
624        is_deprecated: bool,
625    ) -> Result<Self, TypedStoreError> {
626        let cf_key = opt_cf
627            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
628            .to_owned();
629        Ok(DBMap::new(
630            db.clone(),
631            rw_options,
632            &cf_key,
633            ColumnFamily::Rocks(cf_key.to_string()),
634            is_deprecated,
635        ))
636    }
637
638    #[cfg(tidehunter)]
639    pub fn reopen_th(
640        db: Arc<Database>,
641        cf_name: &str,
642        ks: KeySpace,
643        prefix: Option<Vec<u8>>,
644    ) -> Self {
645        DBMap::new(
646            db,
647            &ReadWriteOptions::default(),
648            cf_name,
649            ColumnFamily::TideHunter((ks, prefix.clone())),
650            false,
651        )
652    }
653
654    pub fn cf_name(&self) -> &str {
655        &self.cf
656    }
657
658    pub fn batch(&self) -> DBBatch {
659        let batch = match &self.db.storage {
660            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
661            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
662            #[cfg(tidehunter)]
663            Storage::TideHunter(_) => {
664                StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
665            }
666        };
667        DBBatch::new(
668            &self.db,
669            batch,
670            &self.db_metrics,
671            &self.write_sample_interval,
672        )
673    }
674
675    pub fn flush(&self) -> Result<(), TypedStoreError> {
676        self.db.flush()
677    }
678
679    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
680        let from_buf = be_fix_int_ser(start);
681        let to_buf = be_fix_int_ser(end);
682        self.db
683            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
684        Ok(())
685    }
686
687    pub fn compact_range_raw(
688        &self,
689        cf_name: &str,
690        start: Vec<u8>,
691        end: Vec<u8>,
692    ) -> Result<(), TypedStoreError> {
693        self.db.compact_range_cf(cf_name, Some(start), Some(end));
694        Ok(())
695    }
696
697    #[cfg(tidehunter)]
698    pub fn drop_cells_in_range<J: Serialize>(
699        &self,
700        from_inclusive: &J,
701        to_inclusive: &J,
702    ) -> Result<(), TypedStoreError>
703    where
704        K: Serialize,
705    {
706        let from_buf = be_fix_int_ser(from_inclusive);
707        let to_buf = be_fix_int_ser(to_inclusive);
708        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
709            self.db
710                .drop_cells_in_range(*ks, &from_buf, &to_buf)
711                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
712        }
713        Ok(())
714    }
715
716    /// Returns a vector of raw values corresponding to the keys provided.
717    fn multi_get_pinned<J>(
718        &self,
719        keys: impl IntoIterator<Item = J>,
720    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
721    where
722        J: Borrow<K>,
723        K: Serialize,
724    {
725        let _timer = self
726            .db_metrics
727            .op_metrics
728            .rocksdb_multiget_latency_seconds
729            .with_label_values(&[&self.cf])
730            .start_timer();
731        let perf_ctx = if self.multiget_sample_interval.sample() {
732            Some(RocksDBPerfContext)
733        } else {
734            None
735        };
736        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
737        let results: Result<Vec<_>, TypedStoreError> = self
738            .db
739            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
740            .into_iter()
741            .collect();
742        let entries = results?;
743        let entry_size = entries
744            .iter()
745            .flatten()
746            .map(|entry| entry.len())
747            .sum::<usize>();
748        self.db_metrics
749            .op_metrics
750            .rocksdb_multiget_bytes
751            .with_label_values(&[&self.cf])
752            .observe(entry_size as f64);
753        if perf_ctx.is_some() {
754            self.db_metrics
755                .read_perf_ctx_metrics
756                .report_metrics(&self.cf);
757        }
758        Ok(entries)
759    }
760
761    fn get_rocksdb_int_property(
762        rocksdb: &RocksDB,
763        cf: &impl AsColumnFamilyRef,
764        property_name: &std::ffi::CStr,
765    ) -> Result<i64, TypedStoreError> {
766        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
767            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
768            Ok(None) => Ok(0),
769            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
770        }
771    }
772
773    fn report_rocksdb_metrics(
774        database: &Arc<Database>,
775        cf_name: &str,
776        db_metrics: &Arc<DBMetrics>,
777    ) {
778        let Storage::Rocks(rocksdb) = &database.storage else {
779            return;
780        };
781
782        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
783            tracing::warn!(
784                "unable to report metrics for cf {cf_name:?} in db {:?}",
785                database.db_name()
786            );
787            return;
788        };
789
790        db_metrics
791            .cf_metrics
792            .rocksdb_total_sst_files_size
793            .with_label_values(&[cf_name])
794            .set(
795                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
796                    .unwrap_or(METRICS_ERROR),
797            );
798        db_metrics
799            .cf_metrics
800            .rocksdb_total_blob_files_size
801            .with_label_values(&[cf_name])
802            .set(
803                Self::get_rocksdb_int_property(
804                    rocksdb,
805                    &cf,
806                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
807                )
808                .unwrap_or(METRICS_ERROR),
809            );
810        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
811        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
812        let total_num_files: i64 = (0..=6)
813            .map(|level| {
814                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
815                    .unwrap_or(METRICS_ERROR)
816            })
817            .sum();
818        db_metrics
819            .cf_metrics
820            .rocksdb_total_num_files
821            .with_label_values(&[cf_name])
822            .set(total_num_files);
823        db_metrics
824            .cf_metrics
825            .rocksdb_num_level0_files
826            .with_label_values(&[cf_name])
827            .set(
828                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
829                    .unwrap_or(METRICS_ERROR),
830            );
831        db_metrics
832            .cf_metrics
833            .rocksdb_current_size_active_mem_tables
834            .with_label_values(&[cf_name])
835            .set(
836                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
837                    .unwrap_or(METRICS_ERROR),
838            );
839        db_metrics
840            .cf_metrics
841            .rocksdb_size_all_mem_tables
842            .with_label_values(&[cf_name])
843            .set(
844                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
845                    .unwrap_or(METRICS_ERROR),
846            );
847        db_metrics
848            .cf_metrics
849            .rocksdb_num_snapshots
850            .with_label_values(&[cf_name])
851            .set(
852                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
853                    .unwrap_or(METRICS_ERROR),
854            );
855        db_metrics
856            .cf_metrics
857            .rocksdb_oldest_snapshot_time
858            .with_label_values(&[cf_name])
859            .set(
860                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
861                    .unwrap_or(METRICS_ERROR),
862            );
863        db_metrics
864            .cf_metrics
865            .rocksdb_actual_delayed_write_rate
866            .with_label_values(&[cf_name])
867            .set(
868                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
869                    .unwrap_or(METRICS_ERROR),
870            );
871        db_metrics
872            .cf_metrics
873            .rocksdb_is_write_stopped
874            .with_label_values(&[cf_name])
875            .set(
876                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
877                    .unwrap_or(METRICS_ERROR),
878            );
879        db_metrics
880            .cf_metrics
881            .rocksdb_block_cache_capacity
882            .with_label_values(&[cf_name])
883            .set(
884                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
885                    .unwrap_or(METRICS_ERROR),
886            );
887        db_metrics
888            .cf_metrics
889            .rocksdb_block_cache_usage
890            .with_label_values(&[cf_name])
891            .set(
892                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
893                    .unwrap_or(METRICS_ERROR),
894            );
895        db_metrics
896            .cf_metrics
897            .rocksdb_block_cache_pinned_usage
898            .with_label_values(&[cf_name])
899            .set(
900                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
901                    .unwrap_or(METRICS_ERROR),
902            );
903        db_metrics
904            .cf_metrics
905            .rocksdb_estimate_table_readers_mem
906            .with_label_values(&[cf_name])
907            .set(
908                Self::get_rocksdb_int_property(
909                    rocksdb,
910                    &cf,
911                    properties::ESTIMATE_TABLE_READERS_MEM,
912                )
913                .unwrap_or(METRICS_ERROR),
914            );
915        db_metrics
916            .cf_metrics
917            .rocksdb_estimated_num_keys
918            .with_label_values(&[cf_name])
919            .set(
920                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
921                    .unwrap_or(METRICS_ERROR),
922            );
923        db_metrics
924            .cf_metrics
925            .rocksdb_num_immutable_mem_tables
926            .with_label_values(&[cf_name])
927            .set(
928                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
929                    .unwrap_or(METRICS_ERROR),
930            );
931        db_metrics
932            .cf_metrics
933            .rocksdb_mem_table_flush_pending
934            .with_label_values(&[cf_name])
935            .set(
936                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
937                    .unwrap_or(METRICS_ERROR),
938            );
939        db_metrics
940            .cf_metrics
941            .rocksdb_compaction_pending
942            .with_label_values(&[cf_name])
943            .set(
944                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
945                    .unwrap_or(METRICS_ERROR),
946            );
947        db_metrics
948            .cf_metrics
949            .rocksdb_estimate_pending_compaction_bytes
950            .with_label_values(&[cf_name])
951            .set(
952                Self::get_rocksdb_int_property(
953                    rocksdb,
954                    &cf,
955                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
956                )
957                .unwrap_or(METRICS_ERROR),
958            );
959        db_metrics
960            .cf_metrics
961            .rocksdb_num_running_compactions
962            .with_label_values(&[cf_name])
963            .set(
964                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
965                    .unwrap_or(METRICS_ERROR),
966            );
967        db_metrics
968            .cf_metrics
969            .rocksdb_num_running_flushes
970            .with_label_values(&[cf_name])
971            .set(
972                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
973                    .unwrap_or(METRICS_ERROR),
974            );
975        db_metrics
976            .cf_metrics
977            .rocksdb_estimate_oldest_key_time
978            .with_label_values(&[cf_name])
979            .set(
980                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
981                    .unwrap_or(METRICS_ERROR),
982            );
983        db_metrics
984            .cf_metrics
985            .rocksdb_background_errors
986            .with_label_values(&[cf_name])
987            .set(
988                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
989                    .unwrap_or(METRICS_ERROR),
990            );
991        db_metrics
992            .cf_metrics
993            .rocksdb_base_level
994            .with_label_values(&[cf_name])
995            .set(
996                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
997                    .unwrap_or(METRICS_ERROR),
998            );
999    }
1000
1001    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1002        self.db.checkpoint(path)
1003    }
1004
1005    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1006    where
1007        K: Serialize + DeserializeOwned,
1008        V: Serialize + DeserializeOwned,
1009    {
1010        let mut num_keys = 0;
1011        let mut key_bytes_total = 0;
1012        let mut value_bytes_total = 0;
1013        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1014        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1015        for item in self.safe_iter() {
1016            let (key, value) = item?;
1017            num_keys += 1;
1018            let key_len = be_fix_int_ser(key.borrow()).len();
1019            let value_len = bcs::to_bytes(value.borrow())?.len();
1020            key_bytes_total += key_len;
1021            value_bytes_total += value_len;
1022            key_hist.record(key_len as u64)?;
1023            value_hist.record(value_len as u64)?;
1024        }
1025        Ok(TableSummary {
1026            num_keys,
1027            key_bytes_total,
1028            value_bytes_total,
1029            key_hist,
1030            value_hist,
1031        })
1032    }
1033
1034    fn start_iter_timer(&self) -> HistogramTimer {
1035        self.db_metrics
1036            .op_metrics
1037            .rocksdb_iter_latency_seconds
1038            .with_label_values(&[&self.cf])
1039            .start_timer()
1040    }
1041
1042    // Creates metrics and context for tracking an iterator usage and performance.
1043    fn create_iter_context(
1044        &self,
1045    ) -> (
1046        Option<HistogramTimer>,
1047        Option<Histogram>,
1048        Option<Histogram>,
1049        Option<RocksDBPerfContext>,
1050    ) {
1051        let timer = self.start_iter_timer();
1052        let bytes_scanned = self
1053            .db_metrics
1054            .op_metrics
1055            .rocksdb_iter_bytes
1056            .with_label_values(&[&self.cf]);
1057        let keys_scanned = self
1058            .db_metrics
1059            .op_metrics
1060            .rocksdb_iter_keys
1061            .with_label_values(&[&self.cf]);
1062        let perf_ctx = if self.iter_sample_interval.sample() {
1063            Some(RocksDBPerfContext)
1064        } else {
1065            None
1066        };
1067        (
1068            Some(timer),
1069            Some(bytes_scanned),
1070            Some(keys_scanned),
1071            perf_ctx,
1072        )
1073    }
1074
1075    /// Creates a safe reversed iterator with optional bounds.
1076    /// Both upper bound and lower bound are included.
1077    #[allow(clippy::complexity)]
1078    pub fn reversed_safe_iter_with_bounds(
1079        &self,
1080        lower_bound: Option<K>,
1081        upper_bound: Option<K>,
1082    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1083    where
1084        K: Serialize + DeserializeOwned,
1085        V: Serialize + DeserializeOwned,
1086    {
1087        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1088            lower_bound
1089                .as_ref()
1090                .map(Bound::Included)
1091                .unwrap_or(Bound::Unbounded),
1092            upper_bound
1093                .as_ref()
1094                .map(Bound::Included)
1095                .unwrap_or(Bound::Unbounded),
1096        ));
1097        match &self.db.storage {
1098            Storage::Rocks(db) => {
1099                let readopts = rocks_util::apply_range_bounds(
1100                    self.opts.readopts(),
1101                    it_lower_bound,
1102                    it_upper_bound,
1103                );
1104                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1105                let db_iter = db
1106                    .underlying
1107                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1108                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1109                let iter = SafeIter::new(
1110                    self.cf.clone(),
1111                    db_iter,
1112                    _timer,
1113                    _perf_ctx,
1114                    bytes_scanned,
1115                    keys_scanned,
1116                    Some(self.db_metrics.clone()),
1117                );
1118                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1119            }
1120            Storage::InMemory(db) => {
1121                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1122            }
1123            #[cfg(tidehunter)]
1124            Storage::TideHunter(db) => match &self.column_family {
1125                ColumnFamily::TideHunter((ks, prefix)) => {
1126                    let mut iter = db.iterator(*ks);
1127                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1128                    iter.reverse();
1129                    Ok(Box::new(transform_th_iterator(
1130                        iter,
1131                        prefix,
1132                        self.start_iter_timer(),
1133                    )))
1134                }
1135                _ => unreachable!("storage backend invariant violation"),
1136            },
1137        }
1138    }
1139}
1140
1141pub enum StorageWriteBatch {
1142    Rocks(rocksdb::WriteBatch),
1143    InMemory(InMemoryBatch),
1144    #[cfg(tidehunter)]
1145    TideHunter(tidehunter::batch::WriteBatch),
1146}
1147
1148/// Provides a mutable struct to form a collection of database write operations, and execute them.
1149///
1150/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1151///  ie. they are all written or none is.
1152/// This is also true of operations across column families in the same database.
1153///
1154/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1155/// with each operation.
1156///
1157/// ```
1158/// use typed_store::rocks::*;
1159/// use tempfile::tempdir;
1160/// use typed_store::Map;
1161/// use typed_store::metrics::DBMetrics;
1162/// use prometheus::Registry;
1163/// use core::fmt::Error;
1164/// use std::sync::Arc;
1165///
1166/// #[tokio::main]
1167/// async fn main() -> Result<(), Error> {
1168/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1169///
1170/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1171///     .expect("Failed to open storage");
1172/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1173///
1174/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1175///     .expect("Failed to open storage");
1176/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1177///
1178/// let mut batch = db_cf_1.batch();
1179/// batch
1180///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1181///     .expect("Failed to batch insert")
1182///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1183///     .expect("Failed to batch insert");
1184///
1185/// let _ = batch.write().expect("Failed to execute batch");
1186/// for (k, v) in keys_vals_1 {
1187///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1188///     assert_eq!(Some(v), val);
1189/// }
1190///
1191/// for (k, v) in keys_vals_2 {
1192///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1193///     assert_eq!(Some(v), val);
1194/// }
1195/// Ok(())
1196/// }
1197/// ```
1198///
1199pub struct DBBatch {
1200    database: Arc<Database>,
1201    batch: StorageWriteBatch,
1202    db_metrics: Arc<DBMetrics>,
1203    write_sample_interval: SamplingInterval,
1204}
1205
1206impl DBBatch {
1207    /// Create a new batch associated with a DB reference.
1208    ///
1209    /// Use `open_cf` to get the DB reference or an existing open database.
1210    pub fn new(
1211        dbref: &Arc<Database>,
1212        batch: StorageWriteBatch,
1213        db_metrics: &Arc<DBMetrics>,
1214        write_sample_interval: &SamplingInterval,
1215    ) -> Self {
1216        DBBatch {
1217            database: dbref.clone(),
1218            batch,
1219            db_metrics: db_metrics.clone(),
1220            write_sample_interval: write_sample_interval.clone(),
1221        }
1222    }
1223
1224    /// Consume the batch and write its operations to the database
1225    #[instrument(level = "trace", skip_all, err)]
1226    pub fn write(self) -> Result<(), TypedStoreError> {
1227        let mut write_options = rocksdb::WriteOptions::default();
1228
1229        if write_sync_enabled() {
1230            write_options.set_sync(true);
1231        }
1232
1233        self.write_opt(write_options)
1234    }
1235
1236    /// Consume the batch and write its operations to the database with custom write options
1237    #[instrument(level = "trace", skip_all, err)]
1238    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1239        let db_name = self.database.db_name();
1240        let timer = self
1241            .db_metrics
1242            .op_metrics
1243            .rocksdb_batch_commit_latency_seconds
1244            .with_label_values(&[&db_name])
1245            .start_timer();
1246        let batch_size = self.size_in_bytes();
1247
1248        let perf_ctx = if self.write_sample_interval.sample() {
1249            Some(RocksDBPerfContext)
1250        } else {
1251            None
1252        };
1253
1254        self.database
1255            .write_opt_internal(self.batch, &write_options)?;
1256
1257        self.db_metrics
1258            .op_metrics
1259            .rocksdb_batch_commit_bytes
1260            .with_label_values(&[&db_name])
1261            .observe(batch_size as f64);
1262
1263        if perf_ctx.is_some() {
1264            self.db_metrics
1265                .write_perf_ctx_metrics
1266                .report_metrics(&db_name);
1267        }
1268        let elapsed = timer.stop_and_record();
1269        if elapsed > 1.0 {
1270            warn!(?elapsed, ?db_name, "very slow batch write");
1271            self.db_metrics
1272                .op_metrics
1273                .rocksdb_very_slow_batch_writes_count
1274                .with_label_values(&[&db_name])
1275                .inc();
1276            self.db_metrics
1277                .op_metrics
1278                .rocksdb_very_slow_batch_writes_duration_ms
1279                .with_label_values(&[&db_name])
1280                .inc_by((elapsed * 1000.0) as u64);
1281        }
1282        Ok(())
1283    }
1284
1285    pub fn size_in_bytes(&self) -> usize {
1286        match self.batch {
1287            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1288            StorageWriteBatch::InMemory(_) => 0,
1289            // TODO: implement size_in_bytes method
1290            #[cfg(tidehunter)]
1291            StorageWriteBatch::TideHunter(_) => 0,
1292        }
1293    }
1294
1295    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1296        &mut self,
1297        db: &DBMap<K, V>,
1298        purged_vals: impl IntoIterator<Item = J>,
1299    ) -> Result<(), TypedStoreError> {
1300        if !Arc::ptr_eq(&db.db, &self.database) {
1301            return Err(TypedStoreError::CrossDBBatch);
1302        }
1303
1304        purged_vals
1305            .into_iter()
1306            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1307                let k_buf = be_fix_int_ser(k.borrow());
1308                match (&mut self.batch, &db.column_family) {
1309                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1310                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1311                    }
1312                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1313                        b.delete_cf(name, k_buf)
1314                    }
1315                    #[cfg(tidehunter)]
1316                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1317                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1318                    }
1319                    _ => Err(TypedStoreError::RocksDBError(
1320                        "typed store invariant violation".to_string(),
1321                    ))?,
1322                }
1323                Ok(())
1324            })?;
1325        Ok(())
1326    }
1327
1328    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1329    /// by writing a range delete tombstone in the db map
1330    /// If the DBMap is configured with ignore_range_deletions set to false,
1331    /// the effect of this write will be visible immediately i.e. you won't
1332    /// see old values when you do a lookup or scan. But if it is configured
1333    /// with ignore_range_deletions set to true, the old value are visible until
1334    /// compaction actually deletes them which will happen sometime after. By
1335    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1336    /// overridden in the config), so please use this function with caution
1337    pub fn schedule_delete_range<K: Serialize, V>(
1338        &mut self,
1339        db: &DBMap<K, V>,
1340        from: &K,
1341        to: &K,
1342    ) -> Result<(), TypedStoreError> {
1343        if !Arc::ptr_eq(&db.db, &self.database) {
1344            return Err(TypedStoreError::CrossDBBatch);
1345        }
1346
1347        let from_buf = be_fix_int_ser(from);
1348        let to_buf = be_fix_int_ser(to);
1349
1350        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1351            b.delete_range_cf(
1352                &rocks_cf_from_db(&self.database, db.cf_name())?,
1353                from_buf,
1354                to_buf,
1355            );
1356        }
1357        Ok(())
1358    }
1359
1360    /// inserts a range of (key, value) pairs given as an iterator
1361    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1362        &mut self,
1363        db: &DBMap<K, V>,
1364        new_vals: impl IntoIterator<Item = (J, U)>,
1365    ) -> Result<&mut Self, TypedStoreError> {
1366        if !Arc::ptr_eq(&db.db, &self.database) {
1367            return Err(TypedStoreError::CrossDBBatch);
1368        }
1369        let mut total = 0usize;
1370        new_vals
1371            .into_iter()
1372            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1373                let k_buf = be_fix_int_ser(k.borrow());
1374                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1375                total += k_buf.len() + v_buf.len();
1376                if db.opts.log_value_hash {
1377                    let key_hash = default_hash(&k_buf);
1378                    let value_hash = default_hash(&v_buf);
1379                    debug!(
1380                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1381                        db.cf_name(),
1382                        key_hash,
1383                        value_hash
1384                    );
1385                }
1386                match (&mut self.batch, &db.column_family) {
1387                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1388                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1389                    }
1390                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1391                        b.put_cf(name, k_buf, v_buf)
1392                    }
1393                    #[cfg(tidehunter)]
1394                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1395                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1396                    }
1397                    _ => Err(TypedStoreError::RocksDBError(
1398                        "typed store invariant violation".to_string(),
1399                    ))?,
1400                }
1401                Ok(())
1402            })?;
1403        self.db_metrics
1404            .op_metrics
1405            .rocksdb_batch_put_bytes
1406            .with_label_values(&[&db.cf])
1407            .observe(total as f64);
1408        Ok(self)
1409    }
1410
1411    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1412        &mut self,
1413        db: &DBMap<K, V>,
1414        new_vals: impl IntoIterator<Item = (J, U)>,
1415    ) -> Result<&mut Self, TypedStoreError> {
1416        if !Arc::ptr_eq(&db.db, &self.database) {
1417            return Err(TypedStoreError::CrossDBBatch);
1418        }
1419        new_vals
1420            .into_iter()
1421            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1422                let k_buf = be_fix_int_ser(k.borrow());
1423                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1424                match &mut self.batch {
1425                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1426                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1427                        k_buf,
1428                        v_buf,
1429                    ),
1430                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1431                }
1432                Ok(())
1433            })?;
1434        Ok(self)
1435    }
1436}
1437
1438impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1439where
1440    K: Serialize + DeserializeOwned,
1441    V: Serialize + DeserializeOwned,
1442{
1443    type Error = TypedStoreError;
1444
1445    #[instrument(level = "trace", skip_all, err)]
1446    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1447        let key_buf = be_fix_int_ser(key);
1448        let readopts = self.opts.readopts();
1449        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1450            && self
1451                .db
1452                .get(&self.column_family, &key_buf, &readopts)?
1453                .is_some())
1454    }
1455
1456    #[instrument(level = "trace", skip_all, err)]
1457    fn multi_contains_keys<J>(
1458        &self,
1459        keys: impl IntoIterator<Item = J>,
1460    ) -> Result<Vec<bool>, Self::Error>
1461    where
1462        J: Borrow<K>,
1463    {
1464        let values = self.multi_get_pinned(keys)?;
1465        Ok(values.into_iter().map(|v| v.is_some()).collect())
1466    }
1467
1468    #[instrument(level = "trace", skip_all, err)]
1469    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1470        let _timer = self
1471            .db_metrics
1472            .op_metrics
1473            .rocksdb_get_latency_seconds
1474            .with_label_values(&[&self.cf])
1475            .start_timer();
1476        let perf_ctx = if self.get_sample_interval.sample() {
1477            Some(RocksDBPerfContext)
1478        } else {
1479            None
1480        };
1481        let key_buf = be_fix_int_ser(key);
1482        let res = self
1483            .db
1484            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1485        self.db_metrics
1486            .op_metrics
1487            .rocksdb_get_bytes
1488            .with_label_values(&[&self.cf])
1489            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1490        if perf_ctx.is_some() {
1491            self.db_metrics
1492                .read_perf_ctx_metrics
1493                .report_metrics(&self.cf);
1494        }
1495        match res {
1496            Some(data) => {
1497                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1498                if value.is_err() {
1499                    let key_hash = default_hash(&key_buf);
1500                    let value_hash = default_hash(&data);
1501                    debug_fatal!(
1502                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1503                        self.cf_name(),
1504                        key_hash,
1505                        value_hash,
1506                        value.as_ref().err().unwrap()
1507                    );
1508                }
1509                Ok(Some(value?))
1510            }
1511            None => Ok(None),
1512        }
1513    }
1514
1515    #[instrument(level = "trace", skip_all, err)]
1516    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1517        let timer = self
1518            .db_metrics
1519            .op_metrics
1520            .rocksdb_put_latency_seconds
1521            .with_label_values(&[&self.cf])
1522            .start_timer();
1523        let perf_ctx = if self.write_sample_interval.sample() {
1524            Some(RocksDBPerfContext)
1525        } else {
1526            None
1527        };
1528        let key_buf = be_fix_int_ser(key);
1529        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1530        self.db_metrics
1531            .op_metrics
1532            .rocksdb_put_bytes
1533            .with_label_values(&[&self.cf])
1534            .observe((key_buf.len() + value_buf.len()) as f64);
1535        if perf_ctx.is_some() {
1536            self.db_metrics
1537                .write_perf_ctx_metrics
1538                .report_metrics(&self.cf);
1539        }
1540        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1541
1542        let elapsed = timer.stop_and_record();
1543        if elapsed > 1.0 {
1544            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1545            self.db_metrics
1546                .op_metrics
1547                .rocksdb_very_slow_puts_count
1548                .with_label_values(&[&self.cf])
1549                .inc();
1550            self.db_metrics
1551                .op_metrics
1552                .rocksdb_very_slow_puts_duration_ms
1553                .with_label_values(&[&self.cf])
1554                .inc_by((elapsed * 1000.0) as u64);
1555        }
1556
1557        Ok(())
1558    }
1559
1560    #[instrument(level = "trace", skip_all, err)]
1561    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1562        let _timer = self
1563            .db_metrics
1564            .op_metrics
1565            .rocksdb_delete_latency_seconds
1566            .with_label_values(&[&self.cf])
1567            .start_timer();
1568        let perf_ctx = if self.write_sample_interval.sample() {
1569            Some(RocksDBPerfContext)
1570        } else {
1571            None
1572        };
1573        let key_buf = be_fix_int_ser(key);
1574        self.db.delete_cf(&self.column_family, key_buf)?;
1575        self.db_metrics
1576            .op_metrics
1577            .rocksdb_deletes
1578            .with_label_values(&[&self.cf])
1579            .inc();
1580        if perf_ctx.is_some() {
1581            self.db_metrics
1582                .write_perf_ctx_metrics
1583                .report_metrics(&self.cf);
1584        }
1585        Ok(())
1586    }
1587
1588    /// Writes a range delete tombstone to delete all entries in the db map
1589    /// If the DBMap is configured with ignore_range_deletions set to false,
1590    /// the effect of this write will be visible immediately i.e. you won't
1591    /// see old values when you do a lookup or scan. But if it is configured
1592    /// with ignore_range_deletions set to true, the old value are visible until
1593    /// compaction actually deletes them which will happen sometime after. By
1594    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1595    /// overridden in the config), so please use this function with caution
1596    #[instrument(level = "trace", skip_all, err)]
1597    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1598        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1599        let last_key = self
1600            .reversed_safe_iter_with_bounds(None, None)?
1601            .next()
1602            .transpose()?
1603            .map(|(k, _v)| k);
1604        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1605            let mut batch = self.batch();
1606            batch.schedule_delete_range(self, &first_key, &last_key)?;
1607            batch.write()?;
1608        }
1609        Ok(())
1610    }
1611
1612    fn is_empty(&self) -> bool {
1613        self.safe_iter().next().is_none()
1614    }
1615
1616    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1617        match &self.db.storage {
1618            Storage::Rocks(db) => {
1619                let db_iter = db
1620                    .underlying
1621                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1622                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1623                Box::new(SafeIter::new(
1624                    self.cf.clone(),
1625                    db_iter,
1626                    _timer,
1627                    _perf_ctx,
1628                    bytes_scanned,
1629                    keys_scanned,
1630                    Some(self.db_metrics.clone()),
1631                ))
1632            }
1633            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1634            #[cfg(tidehunter)]
1635            Storage::TideHunter(db) => match &self.column_family {
1636                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1637                    db.iterator(*ks),
1638                    prefix,
1639                    self.start_iter_timer(),
1640                )),
1641                _ => unreachable!("storage backend invariant violation"),
1642            },
1643        }
1644    }
1645
1646    fn safe_iter_with_bounds(
1647        &'a self,
1648        lower_bound: Option<K>,
1649        upper_bound: Option<K>,
1650    ) -> DbIterator<'a, (K, V)> {
1651        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1652        match &self.db.storage {
1653            Storage::Rocks(db) => {
1654                let readopts =
1655                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1656                let db_iter = db
1657                    .underlying
1658                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1659                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1660                Box::new(SafeIter::new(
1661                    self.cf.clone(),
1662                    db_iter,
1663                    _timer,
1664                    _perf_ctx,
1665                    bytes_scanned,
1666                    keys_scanned,
1667                    Some(self.db_metrics.clone()),
1668                ))
1669            }
1670            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1671            #[cfg(tidehunter)]
1672            Storage::TideHunter(db) => match &self.column_family {
1673                ColumnFamily::TideHunter((ks, prefix)) => {
1674                    let mut iter = db.iterator(*ks);
1675                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1676                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1677                }
1678                _ => unreachable!("storage backend invariant violation"),
1679            },
1680        }
1681    }
1682
1683    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1684        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1685        match &self.db.storage {
1686            Storage::Rocks(db) => {
1687                let readopts =
1688                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1689                let db_iter = db
1690                    .underlying
1691                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1692                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1693                Box::new(SafeIter::new(
1694                    self.cf.clone(),
1695                    db_iter,
1696                    _timer,
1697                    _perf_ctx,
1698                    bytes_scanned,
1699                    keys_scanned,
1700                    Some(self.db_metrics.clone()),
1701                ))
1702            }
1703            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1704            #[cfg(tidehunter)]
1705            Storage::TideHunter(db) => match &self.column_family {
1706                ColumnFamily::TideHunter((ks, prefix)) => {
1707                    let mut iter = db.iterator(*ks);
1708                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1709                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1710                }
1711                _ => unreachable!("storage backend invariant violation"),
1712            },
1713        }
1714    }
1715
1716    /// Returns a vector of values corresponding to the keys provided.
1717    #[instrument(level = "trace", skip_all, err)]
1718    fn multi_get<J>(
1719        &self,
1720        keys: impl IntoIterator<Item = J>,
1721    ) -> Result<Vec<Option<V>>, TypedStoreError>
1722    where
1723        J: Borrow<K>,
1724    {
1725        let results = self.multi_get_pinned(keys)?;
1726        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1727            .into_iter()
1728            .map(|value_byte| match value_byte {
1729                Some(data) => Ok(Some(
1730                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1731                )),
1732                None => Ok(None),
1733            })
1734            .collect();
1735
1736        values_parsed
1737    }
1738
1739    /// Convenience method for batch insertion
1740    #[instrument(level = "trace", skip_all, err)]
1741    fn multi_insert<J, U>(
1742        &self,
1743        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1744    ) -> Result<(), Self::Error>
1745    where
1746        J: Borrow<K>,
1747        U: Borrow<V>,
1748    {
1749        let mut batch = self.batch();
1750        batch.insert_batch(self, key_val_pairs)?;
1751        batch.write()
1752    }
1753
1754    /// Convenience method for batch removal
1755    #[instrument(level = "trace", skip_all, err)]
1756    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1757    where
1758        J: Borrow<K>,
1759    {
1760        let mut batch = self.batch();
1761        batch.delete_batch(self, keys)?;
1762        batch.write()
1763    }
1764
1765    /// Try to catch up with primary when running as secondary
1766    #[instrument(level = "trace", skip_all, err)]
1767    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1768        if let Storage::Rocks(rocks) = &self.db.storage {
1769            rocks
1770                .underlying
1771                .try_catch_up_with_primary()
1772                .map_err(typed_store_err_from_rocks_err)?;
1773        }
1774        Ok(())
1775    }
1776}
1777
1778/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1779#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1780pub fn open_cf_opts<P: AsRef<Path>>(
1781    path: P,
1782    db_options: Option<rocksdb::Options>,
1783    metric_conf: MetricConf,
1784    opt_cfs: &[(&str, rocksdb::Options)],
1785) -> Result<Arc<Database>, TypedStoreError> {
1786    let path = path.as_ref();
1787    ensure_database_type(path, StorageType::Rocks)
1788        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1789    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1790    // because rocksdb uses the simulated clock when creating its background threads, but then
1791    // those threads see the real wall clock (because they are not the test thread), which causes
1792    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1793    // resolves the issue.
1794    //
1795    // This is a no-op in non-simulator builds.
1796
1797    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1798    nondeterministic!({
1799        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1800        options.create_if_missing(true);
1801        options.create_missing_column_families(true);
1802        let rocksdb = {
1803            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1804                &options,
1805                path,
1806                cfs.into_iter()
1807                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1808            )
1809            .map_err(typed_store_err_from_rocks_err)?
1810        };
1811        Ok(Arc::new(Database::new(
1812            Storage::Rocks(RocksDB {
1813                underlying: rocksdb,
1814            }),
1815            metric_conf,
1816            None,
1817        )))
1818    })
1819}
1820
1821/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1822pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1823    primary_path: P,
1824    secondary_path: Option<P>,
1825    db_options: Option<rocksdb::Options>,
1826    metric_conf: MetricConf,
1827    opt_cfs: &[(&str, rocksdb::Options)],
1828) -> Result<Arc<Database>, TypedStoreError> {
1829    let primary_path = primary_path.as_ref();
1830    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1831    // See comment above for explanation of why nondeterministic is necessary here.
1832    nondeterministic!({
1833        // Customize database options
1834        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1835
1836        fdlimit::raise_fd_limit();
1837        // This is a requirement by RocksDB when opening as secondary
1838        options.set_max_open_files(-1);
1839
1840        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1841        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1842            .ok()
1843            .unwrap_or_default();
1844
1845        let default_db_options = default_db_options();
1846        // Add CFs not explicitly listed
1847        for cf_key in cfs.iter() {
1848            if !opt_cfs.contains_key(&cf_key[..]) {
1849                opt_cfs.insert(cf_key, default_db_options.options.clone());
1850            }
1851        }
1852
1853        let primary_path = primary_path.to_path_buf();
1854        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1855            let mut s = primary_path.clone();
1856            s.pop();
1857            s.push("SECONDARY");
1858            s.as_path().to_path_buf()
1859        });
1860
1861        ensure_database_type(&primary_path, StorageType::Rocks)
1862            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1863        ensure_database_type(&secondary_path, StorageType::Rocks)
1864            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1865
1866        let rocksdb = {
1867            options.create_if_missing(true);
1868            options.create_missing_column_families(true);
1869            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1870                &options,
1871                &primary_path,
1872                &secondary_path,
1873                opt_cfs
1874                    .iter()
1875                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1876            )
1877            .map_err(typed_store_err_from_rocks_err)?;
1878            db.try_catch_up_with_primary()
1879                .map_err(typed_store_err_from_rocks_err)?;
1880            db
1881        };
1882        Ok(Arc::new(Database::new(
1883            Storage::Rocks(RocksDB {
1884                underlying: rocksdb,
1885            }),
1886            metric_conf,
1887            None,
1888        )))
1889    })
1890}
1891
1892// Drops a database if there is no other handle to it, with retries and timeout.
1893#[cfg(not(tidehunter))]
1894pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1895    let mut backoff = backoff::ExponentialBackoff {
1896        max_elapsed_time: Some(timeout),
1897        ..Default::default()
1898    };
1899    loop {
1900        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1901            Ok(()) => return Ok(()),
1902            Err(err) => match backoff.next_backoff() {
1903                Some(duration) => tokio::time::sleep(duration).await,
1904                None => return Err(err),
1905            },
1906        }
1907    }
1908}
1909
1910#[cfg(tidehunter)]
1911pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1912    std::fs::remove_dir_all(path)
1913}
1914
1915fn populate_missing_cfs(
1916    input_cfs: &[(&str, rocksdb::Options)],
1917    path: &Path,
1918) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1919    let mut cfs = vec![];
1920    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1921    let existing_cfs =
1922        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1923            .ok()
1924            .unwrap_or_default();
1925
1926    for cf_name in existing_cfs {
1927        if !input_cf_index.contains(&cf_name[..]) {
1928            cfs.push((cf_name, rocksdb::Options::default()));
1929        }
1930    }
1931    cfs.extend(
1932        input_cfs
1933            .iter()
1934            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1935    );
1936    Ok(cfs)
1937}
1938
1939fn default_hash(value: &[u8]) -> Digest<32> {
1940    let mut hasher = fastcrypto::hash::Blake2b256::default();
1941    hasher.update(value);
1942    hasher.finalize()
1943}