typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    fn multi_get<I, K>(
231        &self,
232        cf: &ColumnFamily,
233        keys: I,
234        readopts: &ReadOptions,
235    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236    where
237        I: IntoIterator<Item = K>,
238        K: AsRef<[u8]>,
239    {
240        match (&self.storage, cf) {
241            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242                let keys_vec: Vec<K> = keys.into_iter().collect();
243                let res = db.underlying.batched_multi_get_cf_opt(
244                    &cf.rocks_cf(db),
245                    keys_vec.iter(),
246                    /* sorted_input */ false,
247                    readopts,
248                );
249                res.into_iter()
250                    .map(|r| {
251                        r.map_err(typed_store_err_from_rocks_err)
252                            .map(|item| item.map(GetResult::Rocks))
253                    })
254                    .collect()
255            }
256            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257                .multi_get(cf_name, keys)
258                .into_iter()
259                .map(|r| Ok(r.map(GetResult::InMemory)))
260                .collect(),
261            #[cfg(tidehunter)]
262            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263                let res = keys.into_iter().map(|k| {
264                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265                        .map_err(typed_store_error_from_th_error)
266                });
267                res.into_iter()
268                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269                    .collect()
270            }
271            _ => unreachable!("typed store invariant violation"),
272        }
273    }
274
275    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276        match &self.storage {
277            Storage::Rocks(db) => db.underlying.drop_cf(name),
278            Storage::InMemory(db) => {
279                db.drop_cf(name);
280                Ok(())
281            }
282            #[cfg(tidehunter)]
283            Storage::TideHunter(_) => {
284                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285            }
286        }
287    }
288
289    pub fn delete_file_in_range<K: AsRef<[u8]>>(
290        &self,
291        cf: &impl AsColumnFamilyRef,
292        from: K,
293        to: K,
294    ) -> Result<(), rocksdb::Error> {
295        match &self.storage {
296            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298        }
299    }
300
301    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302        fail_point!("delete-cf-before");
303        let ret = match (&self.storage, cf) {
304            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305                .underlying
306                .delete_cf(&cf.rocks_cf(db), key)
307                .map_err(typed_store_err_from_rocks_err),
308            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309                db.delete(cf_name, key.as_ref());
310                Ok(())
311            }
312            #[cfg(tidehunter)]
313            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314                .remove(*ks, transform_th_key(key.as_ref(), prefix))
315                .map_err(typed_store_error_from_th_error),
316            _ => Err(TypedStoreError::RocksDBError(
317                "typed store invariant violation".to_string(),
318            )),
319        };
320        fail_point!("delete-cf-after");
321        #[allow(clippy::let_and_return)]
322        ret
323    }
324
325    pub fn path_for_pruning(&self) -> &Path {
326        match &self.storage {
327            Storage::Rocks(rocks) => rocks.underlying.path(),
328            _ => unimplemented!("method is only supported for rocksdb backend"),
329        }
330    }
331
332    fn put_cf(
333        &self,
334        cf: &ColumnFamily,
335        key: Vec<u8>,
336        value: Vec<u8>,
337    ) -> Result<(), TypedStoreError> {
338        fail_point!("put-cf-before");
339        let ret = match (&self.storage, cf) {
340            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341                .underlying
342                .put_cf(&cf.rocks_cf(db), key, value)
343                .map_err(typed_store_err_from_rocks_err),
344            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345                db.put(cf_name, key, value);
346                Ok(())
347            }
348            #[cfg(tidehunter)]
349            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350                .insert(*ks, transform_th_key(&key, prefix), value)
351                .map_err(typed_store_error_from_th_error),
352            _ => Err(TypedStoreError::RocksDBError(
353                "typed store invariant violation".to_string(),
354            )),
355        };
356        fail_point!("put-cf-after");
357        #[allow(clippy::let_and_return)]
358        ret
359    }
360
361    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362        &self,
363        cf_name: &str,
364        key: K,
365        readopts: &ReadOptions,
366    ) -> bool {
367        match &self.storage {
368            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
369            // but no false negatives. We use it to short-circuit the absent case
370            Storage::Rocks(rocks) => {
371                rocks
372                    .underlying
373                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374            }
375            _ => true,
376        }
377    }
378
379    pub(crate) fn write_opt_internal(
380        &self,
381        batch: StorageWriteBatch,
382        write_options: &rocksdb::WriteOptions,
383    ) -> Result<(), TypedStoreError> {
384        fail_point!("batch-write-before");
385        let ret = match (&self.storage, batch) {
386            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387                .underlying
388                .write_opt(batch, write_options)
389                .map_err(typed_store_err_from_rocks_err),
390            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391                // InMemory doesn't support write options
392                db.write(batch);
393                Ok(())
394            }
395            #[cfg(tidehunter)]
396            (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397                // TideHunter doesn't support write options
398                batch.commit().map_err(typed_store_error_from_th_error)
399            }
400            _ => Err(TypedStoreError::RocksDBError(
401                "using invalid batch type for the database".to_string(),
402            )),
403        };
404        fail_point!("batch-write-after");
405        #[allow(clippy::let_and_return)]
406        ret
407    }
408
409    #[cfg(tidehunter)]
410    pub fn start_relocation(&self) -> anyhow::Result<()> {
411        if let Storage::TideHunter(db) = &self.storage {
412            db.start_relocation()?;
413        }
414        Ok(())
415    }
416
417    #[cfg(tidehunter)]
418    pub fn drop_cells_in_range(
419        &self,
420        ks: KeySpace,
421        from_inclusive: &[u8],
422        to_inclusive: &[u8],
423    ) -> anyhow::Result<()> {
424        if let Storage::TideHunter(db) = &self.storage {
425            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
426                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
427        } else {
428            panic!("drop_cells_in_range called on non-TideHunter storage");
429        }
430        Ok(())
431    }
432
433    pub fn compact_range_cf<K: AsRef<[u8]>>(
434        &self,
435        cf_name: &str,
436        start: Option<K>,
437        end: Option<K>,
438    ) {
439        if let Storage::Rocks(rocksdb) = &self.storage {
440            rocksdb
441                .underlying
442                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
443        }
444    }
445
446    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
447        // TODO: implement for other storage types
448        if let Storage::Rocks(rocks) = &self.storage {
449            let checkpoint =
450                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
451            checkpoint
452                .create_checkpoint(path)
453                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
454        }
455        Ok(())
456    }
457
458    pub fn get_sampling_interval(&self) -> SamplingInterval {
459        self.metric_conf.read_sample_interval.new_from_self()
460    }
461
462    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
463        self.metric_conf.read_sample_interval.new_from_self()
464    }
465
466    pub fn write_sampling_interval(&self) -> SamplingInterval {
467        self.metric_conf.write_sample_interval.new_from_self()
468    }
469
470    pub fn iter_sampling_interval(&self) -> SamplingInterval {
471        self.metric_conf.iter_sample_interval.new_from_self()
472    }
473
474    fn db_name(&self) -> String {
475        let name = &self.metric_conf.db_name;
476        if name.is_empty() {
477            "default".to_string()
478        } else {
479            name.clone()
480        }
481    }
482
483    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
484        match &self.storage {
485            Storage::Rocks(rocks) => rocks.underlying.live_files(),
486            _ => Ok(vec![]),
487        }
488    }
489}
490
491fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
492    rocks_db
493        .underlying
494        .cf_handle(cf_name)
495        .expect("Map-keying column family should have been checked at DB creation")
496}
497
498fn rocks_cf_from_db<'a>(
499    db: &'a Database,
500    cf_name: &str,
501) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
502    match &db.storage {
503        Storage::Rocks(rocksdb) => Ok(rocksdb
504            .underlying
505            .cf_handle(cf_name)
506            .expect("Map-keying column family should have been checked at DB creation")),
507        _ => Err(TypedStoreError::RocksDBError(
508            "using invalid batch type for the database".to_string(),
509        )),
510    }
511}
512
513#[derive(Debug, Default)]
514pub struct MetricConf {
515    pub db_name: String,
516    pub read_sample_interval: SamplingInterval,
517    pub write_sample_interval: SamplingInterval,
518    pub iter_sample_interval: SamplingInterval,
519}
520
521impl MetricConf {
522    pub fn new(db_name: &str) -> Self {
523        if db_name.is_empty() {
524            error!("A meaningful db name should be used for metrics reporting.")
525        }
526        Self {
527            db_name: db_name.to_string(),
528            read_sample_interval: SamplingInterval::default(),
529            write_sample_interval: SamplingInterval::default(),
530            iter_sample_interval: SamplingInterval::default(),
531        }
532    }
533
534    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
535        Self {
536            db_name: self.db_name,
537            read_sample_interval: read_interval,
538            write_sample_interval: SamplingInterval::default(),
539            iter_sample_interval: SamplingInterval::default(),
540        }
541    }
542}
543const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
544const METRICS_ERROR: i64 = -1;
545
546/// An interface to a rocksDB database, keyed by a columnfamily
547#[derive(Clone, Debug)]
548pub struct DBMap<K, V> {
549    pub db: Arc<Database>,
550    _phantom: PhantomData<fn(K) -> V>,
551    column_family: ColumnFamily,
552    // the column family under which the map is stored
553    cf: String,
554    pub opts: ReadWriteOptions,
555    db_metrics: Arc<DBMetrics>,
556    get_sample_interval: SamplingInterval,
557    multiget_sample_interval: SamplingInterval,
558    write_sample_interval: SamplingInterval,
559    iter_sample_interval: SamplingInterval,
560    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
561}
562
563unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
564
565impl<K, V> DBMap<K, V> {
566    pub(crate) fn new(
567        db: Arc<Database>,
568        opts: &ReadWriteOptions,
569        opt_cf: &str,
570        column_family: ColumnFamily,
571        is_deprecated: bool,
572    ) -> Self {
573        let db_cloned = Arc::downgrade(&db.clone());
574        let db_metrics = DBMetrics::get();
575        let db_metrics_cloned = db_metrics.clone();
576        let cf = opt_cf.to_string();
577
578        let (sender, mut recv) = tokio::sync::oneshot::channel();
579        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
580            tokio::task::spawn(async move {
581                let mut interval =
582                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
583                loop {
584                    tokio::select! {
585                        _ = interval.tick() => {
586                            if let Some(db) = db_cloned.upgrade() {
587                                let cf = cf.clone();
588                                let db_metrics = db_metrics.clone();
589                                if let Err(e) = tokio::task::spawn_blocking(move || {
590                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
591                                }).await {
592                                    error!("Failed to log metrics with error: {}", e);
593                                }
594                            }
595                        }
596                        _ = &mut recv => break,
597                    }
598                }
599                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
600            });
601        }
602        DBMap {
603            db: db.clone(),
604            opts: opts.clone(),
605            _phantom: PhantomData,
606            column_family,
607            cf: opt_cf.to_string(),
608            db_metrics: db_metrics_cloned,
609            _metrics_task_cancel_handle: Arc::new(sender),
610            get_sample_interval: db.get_sampling_interval(),
611            multiget_sample_interval: db.multiget_sampling_interval(),
612            write_sample_interval: db.write_sampling_interval(),
613            iter_sample_interval: db.iter_sampling_interval(),
614        }
615    }
616
617    /// Reopens an open database as a typed map operating under a specific column family.
618    /// if no column family is passed, the default column family is used.
619    #[instrument(level = "debug", skip(db), err)]
620    pub fn reopen(
621        db: &Arc<Database>,
622        opt_cf: Option<&str>,
623        rw_options: &ReadWriteOptions,
624        is_deprecated: bool,
625    ) -> Result<Self, TypedStoreError> {
626        let cf_key = opt_cf
627            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
628            .to_owned();
629        Ok(DBMap::new(
630            db.clone(),
631            rw_options,
632            &cf_key,
633            ColumnFamily::Rocks(cf_key.to_string()),
634            is_deprecated,
635        ))
636    }
637
638    #[cfg(tidehunter)]
639    pub fn reopen_th(
640        db: Arc<Database>,
641        cf_name: &str,
642        ks: KeySpace,
643        prefix: Option<Vec<u8>>,
644    ) -> Self {
645        DBMap::new(
646            db,
647            &ReadWriteOptions::default(),
648            cf_name,
649            ColumnFamily::TideHunter((ks, prefix.clone())),
650            false,
651        )
652    }
653
654    pub fn cf_name(&self) -> &str {
655        &self.cf
656    }
657
658    pub fn batch(&self) -> DBBatch {
659        let batch = match &self.db.storage {
660            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
661            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
662            #[cfg(tidehunter)]
663            Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
664        };
665        DBBatch::new(
666            &self.db,
667            batch,
668            &self.db_metrics,
669            &self.write_sample_interval,
670        )
671    }
672
673    pub fn flush(&self) -> Result<(), TypedStoreError> {
674        self.db.flush()
675    }
676
677    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
678        let from_buf = be_fix_int_ser(start);
679        let to_buf = be_fix_int_ser(end);
680        self.db
681            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
682        Ok(())
683    }
684
685    pub fn compact_range_raw(
686        &self,
687        cf_name: &str,
688        start: Vec<u8>,
689        end: Vec<u8>,
690    ) -> Result<(), TypedStoreError> {
691        self.db.compact_range_cf(cf_name, Some(start), Some(end));
692        Ok(())
693    }
694
695    #[cfg(tidehunter)]
696    pub fn drop_cells_in_range<J: Serialize>(
697        &self,
698        from_inclusive: &J,
699        to_inclusive: &J,
700    ) -> Result<(), TypedStoreError>
701    where
702        K: Serialize,
703    {
704        let from_buf = be_fix_int_ser(from_inclusive);
705        let to_buf = be_fix_int_ser(to_inclusive);
706        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
707            self.db
708                .drop_cells_in_range(*ks, &from_buf, &to_buf)
709                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
710        }
711        Ok(())
712    }
713
714    /// Returns a vector of raw values corresponding to the keys provided.
715    fn multi_get_pinned<J>(
716        &self,
717        keys: impl IntoIterator<Item = J>,
718    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
719    where
720        J: Borrow<K>,
721        K: Serialize,
722    {
723        let _timer = self
724            .db_metrics
725            .op_metrics
726            .rocksdb_multiget_latency_seconds
727            .with_label_values(&[&self.cf])
728            .start_timer();
729        let perf_ctx = if self.multiget_sample_interval.sample() {
730            Some(RocksDBPerfContext)
731        } else {
732            None
733        };
734        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
735        let results: Result<Vec<_>, TypedStoreError> = self
736            .db
737            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
738            .into_iter()
739            .collect();
740        let entries = results?;
741        let entry_size = entries
742            .iter()
743            .flatten()
744            .map(|entry| entry.len())
745            .sum::<usize>();
746        self.db_metrics
747            .op_metrics
748            .rocksdb_multiget_bytes
749            .with_label_values(&[&self.cf])
750            .observe(entry_size as f64);
751        if perf_ctx.is_some() {
752            self.db_metrics
753                .read_perf_ctx_metrics
754                .report_metrics(&self.cf);
755        }
756        Ok(entries)
757    }
758
759    fn get_rocksdb_int_property(
760        rocksdb: &RocksDB,
761        cf: &impl AsColumnFamilyRef,
762        property_name: &std::ffi::CStr,
763    ) -> Result<i64, TypedStoreError> {
764        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
765            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
766            Ok(None) => Ok(0),
767            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
768        }
769    }
770
771    fn report_rocksdb_metrics(
772        database: &Arc<Database>,
773        cf_name: &str,
774        db_metrics: &Arc<DBMetrics>,
775    ) {
776        let Storage::Rocks(rocksdb) = &database.storage else {
777            return;
778        };
779
780        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
781            tracing::warn!(
782                "unable to report metrics for cf {cf_name:?} in db {:?}",
783                database.db_name()
784            );
785            return;
786        };
787
788        db_metrics
789            .cf_metrics
790            .rocksdb_total_sst_files_size
791            .with_label_values(&[cf_name])
792            .set(
793                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
794                    .unwrap_or(METRICS_ERROR),
795            );
796        db_metrics
797            .cf_metrics
798            .rocksdb_total_blob_files_size
799            .with_label_values(&[cf_name])
800            .set(
801                Self::get_rocksdb_int_property(
802                    rocksdb,
803                    &cf,
804                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
805                )
806                .unwrap_or(METRICS_ERROR),
807            );
808        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
809        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
810        let total_num_files: i64 = (0..=6)
811            .map(|level| {
812                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
813                    .unwrap_or(METRICS_ERROR)
814            })
815            .sum();
816        db_metrics
817            .cf_metrics
818            .rocksdb_total_num_files
819            .with_label_values(&[cf_name])
820            .set(total_num_files);
821        db_metrics
822            .cf_metrics
823            .rocksdb_num_level0_files
824            .with_label_values(&[cf_name])
825            .set(
826                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
827                    .unwrap_or(METRICS_ERROR),
828            );
829        db_metrics
830            .cf_metrics
831            .rocksdb_current_size_active_mem_tables
832            .with_label_values(&[cf_name])
833            .set(
834                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
835                    .unwrap_or(METRICS_ERROR),
836            );
837        db_metrics
838            .cf_metrics
839            .rocksdb_size_all_mem_tables
840            .with_label_values(&[cf_name])
841            .set(
842                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
843                    .unwrap_or(METRICS_ERROR),
844            );
845        db_metrics
846            .cf_metrics
847            .rocksdb_num_snapshots
848            .with_label_values(&[cf_name])
849            .set(
850                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
851                    .unwrap_or(METRICS_ERROR),
852            );
853        db_metrics
854            .cf_metrics
855            .rocksdb_oldest_snapshot_time
856            .with_label_values(&[cf_name])
857            .set(
858                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
859                    .unwrap_or(METRICS_ERROR),
860            );
861        db_metrics
862            .cf_metrics
863            .rocksdb_actual_delayed_write_rate
864            .with_label_values(&[cf_name])
865            .set(
866                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
867                    .unwrap_or(METRICS_ERROR),
868            );
869        db_metrics
870            .cf_metrics
871            .rocksdb_is_write_stopped
872            .with_label_values(&[cf_name])
873            .set(
874                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
875                    .unwrap_or(METRICS_ERROR),
876            );
877        db_metrics
878            .cf_metrics
879            .rocksdb_block_cache_capacity
880            .with_label_values(&[cf_name])
881            .set(
882                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
883                    .unwrap_or(METRICS_ERROR),
884            );
885        db_metrics
886            .cf_metrics
887            .rocksdb_block_cache_usage
888            .with_label_values(&[cf_name])
889            .set(
890                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
891                    .unwrap_or(METRICS_ERROR),
892            );
893        db_metrics
894            .cf_metrics
895            .rocksdb_block_cache_pinned_usage
896            .with_label_values(&[cf_name])
897            .set(
898                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
899                    .unwrap_or(METRICS_ERROR),
900            );
901        db_metrics
902            .cf_metrics
903            .rocksdb_estimate_table_readers_mem
904            .with_label_values(&[cf_name])
905            .set(
906                Self::get_rocksdb_int_property(
907                    rocksdb,
908                    &cf,
909                    properties::ESTIMATE_TABLE_READERS_MEM,
910                )
911                .unwrap_or(METRICS_ERROR),
912            );
913        db_metrics
914            .cf_metrics
915            .rocksdb_estimated_num_keys
916            .with_label_values(&[cf_name])
917            .set(
918                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
919                    .unwrap_or(METRICS_ERROR),
920            );
921        db_metrics
922            .cf_metrics
923            .rocksdb_num_immutable_mem_tables
924            .with_label_values(&[cf_name])
925            .set(
926                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
927                    .unwrap_or(METRICS_ERROR),
928            );
929        db_metrics
930            .cf_metrics
931            .rocksdb_mem_table_flush_pending
932            .with_label_values(&[cf_name])
933            .set(
934                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
935                    .unwrap_or(METRICS_ERROR),
936            );
937        db_metrics
938            .cf_metrics
939            .rocksdb_compaction_pending
940            .with_label_values(&[cf_name])
941            .set(
942                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
943                    .unwrap_or(METRICS_ERROR),
944            );
945        db_metrics
946            .cf_metrics
947            .rocksdb_estimate_pending_compaction_bytes
948            .with_label_values(&[cf_name])
949            .set(
950                Self::get_rocksdb_int_property(
951                    rocksdb,
952                    &cf,
953                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
954                )
955                .unwrap_or(METRICS_ERROR),
956            );
957        db_metrics
958            .cf_metrics
959            .rocksdb_num_running_compactions
960            .with_label_values(&[cf_name])
961            .set(
962                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
963                    .unwrap_or(METRICS_ERROR),
964            );
965        db_metrics
966            .cf_metrics
967            .rocksdb_num_running_flushes
968            .with_label_values(&[cf_name])
969            .set(
970                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
971                    .unwrap_or(METRICS_ERROR),
972            );
973        db_metrics
974            .cf_metrics
975            .rocksdb_estimate_oldest_key_time
976            .with_label_values(&[cf_name])
977            .set(
978                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
979                    .unwrap_or(METRICS_ERROR),
980            );
981        db_metrics
982            .cf_metrics
983            .rocksdb_background_errors
984            .with_label_values(&[cf_name])
985            .set(
986                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
987                    .unwrap_or(METRICS_ERROR),
988            );
989        db_metrics
990            .cf_metrics
991            .rocksdb_base_level
992            .with_label_values(&[cf_name])
993            .set(
994                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
995                    .unwrap_or(METRICS_ERROR),
996            );
997    }
998
999    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1000        self.db.checkpoint(path)
1001    }
1002
1003    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1004    where
1005        K: Serialize + DeserializeOwned,
1006        V: Serialize + DeserializeOwned,
1007    {
1008        let mut num_keys = 0;
1009        let mut key_bytes_total = 0;
1010        let mut value_bytes_total = 0;
1011        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1012        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1013        for item in self.safe_iter() {
1014            let (key, value) = item?;
1015            num_keys += 1;
1016            let key_len = be_fix_int_ser(key.borrow()).len();
1017            let value_len = bcs::to_bytes(value.borrow())?.len();
1018            key_bytes_total += key_len;
1019            value_bytes_total += value_len;
1020            key_hist.record(key_len as u64)?;
1021            value_hist.record(value_len as u64)?;
1022        }
1023        Ok(TableSummary {
1024            num_keys,
1025            key_bytes_total,
1026            value_bytes_total,
1027            key_hist,
1028            value_hist,
1029        })
1030    }
1031
1032    fn start_iter_timer(&self) -> HistogramTimer {
1033        self.db_metrics
1034            .op_metrics
1035            .rocksdb_iter_latency_seconds
1036            .with_label_values(&[&self.cf])
1037            .start_timer()
1038    }
1039
1040    // Creates metrics and context for tracking an iterator usage and performance.
1041    fn create_iter_context(
1042        &self,
1043    ) -> (
1044        Option<HistogramTimer>,
1045        Option<Histogram>,
1046        Option<Histogram>,
1047        Option<RocksDBPerfContext>,
1048    ) {
1049        let timer = self.start_iter_timer();
1050        let bytes_scanned = self
1051            .db_metrics
1052            .op_metrics
1053            .rocksdb_iter_bytes
1054            .with_label_values(&[&self.cf]);
1055        let keys_scanned = self
1056            .db_metrics
1057            .op_metrics
1058            .rocksdb_iter_keys
1059            .with_label_values(&[&self.cf]);
1060        let perf_ctx = if self.iter_sample_interval.sample() {
1061            Some(RocksDBPerfContext)
1062        } else {
1063            None
1064        };
1065        (
1066            Some(timer),
1067            Some(bytes_scanned),
1068            Some(keys_scanned),
1069            perf_ctx,
1070        )
1071    }
1072
1073    /// Creates a safe reversed iterator with optional bounds.
1074    /// Both upper bound and lower bound are included.
1075    #[allow(clippy::complexity)]
1076    pub fn reversed_safe_iter_with_bounds(
1077        &self,
1078        lower_bound: Option<K>,
1079        upper_bound: Option<K>,
1080    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1081    where
1082        K: Serialize + DeserializeOwned,
1083        V: Serialize + DeserializeOwned,
1084    {
1085        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1086            lower_bound
1087                .as_ref()
1088                .map(Bound::Included)
1089                .unwrap_or(Bound::Unbounded),
1090            upper_bound
1091                .as_ref()
1092                .map(Bound::Included)
1093                .unwrap_or(Bound::Unbounded),
1094        ));
1095        match &self.db.storage {
1096            Storage::Rocks(db) => {
1097                let readopts = rocks_util::apply_range_bounds(
1098                    self.opts.readopts(),
1099                    it_lower_bound,
1100                    it_upper_bound,
1101                );
1102                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1103                let db_iter = db
1104                    .underlying
1105                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1106                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1107                let iter = SafeIter::new(
1108                    self.cf.clone(),
1109                    db_iter,
1110                    _timer,
1111                    _perf_ctx,
1112                    bytes_scanned,
1113                    keys_scanned,
1114                    Some(self.db_metrics.clone()),
1115                );
1116                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1117            }
1118            Storage::InMemory(db) => {
1119                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1120            }
1121            #[cfg(tidehunter)]
1122            Storage::TideHunter(db) => match &self.column_family {
1123                ColumnFamily::TideHunter((ks, prefix)) => {
1124                    let mut iter = db.iterator(*ks);
1125                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1126                    iter.reverse();
1127                    Ok(Box::new(transform_th_iterator(
1128                        iter,
1129                        prefix,
1130                        self.start_iter_timer(),
1131                    )))
1132                }
1133                _ => unreachable!("storage backend invariant violation"),
1134            },
1135        }
1136    }
1137}
1138
1139pub enum StorageWriteBatch {
1140    Rocks(rocksdb::WriteBatch),
1141    InMemory(InMemoryBatch),
1142    #[cfg(tidehunter)]
1143    TideHunter(tidehunter::batch::WriteBatch),
1144}
1145
1146/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1147/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1148/// so that slices can be produced without any per-entry allocation.
1149struct EntryHeader {
1150    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1151    offset: usize,
1152    cf_name_len: usize,
1153    key_len: usize,
1154    is_put: bool,
1155}
1156
1157/// A write batch that stores serialized operations in a flat byte buffer without
1158/// requiring a database reference. Can be replayed into a real `DBBatch` via
1159/// `DBBatch::concat`.
1160/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1161/// over write batches.
1162#[derive(Default)]
1163pub struct StagedBatch {
1164    data: Vec<u8>,
1165    entries: Vec<EntryHeader>,
1166}
1167
1168impl StagedBatch {
1169    pub fn new() -> Self {
1170        Self {
1171            data: Vec::with_capacity(1024),
1172            entries: Vec::with_capacity(16),
1173        }
1174    }
1175
1176    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1177        &mut self,
1178        db: &DBMap<K, V>,
1179        new_vals: impl IntoIterator<Item = (J, U)>,
1180    ) -> Result<&mut Self, TypedStoreError> {
1181        let cf_name = db.cf_name();
1182        new_vals
1183            .into_iter()
1184            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1185                let offset = self.data.len();
1186                self.data.extend_from_slice(cf_name.as_bytes());
1187                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1188                bcs::serialize_into(&mut self.data, v.borrow())
1189                    .map_err(typed_store_err_from_bcs_err)?;
1190                self.entries.push(EntryHeader {
1191                    offset,
1192                    cf_name_len: cf_name.len(),
1193                    key_len,
1194                    is_put: true,
1195                });
1196                Ok(())
1197            })?;
1198        Ok(self)
1199    }
1200
1201    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1202        &mut self,
1203        db: &DBMap<K, V>,
1204        purged_vals: impl IntoIterator<Item = J>,
1205    ) -> Result<(), TypedStoreError> {
1206        let cf_name = db.cf_name();
1207        purged_vals
1208            .into_iter()
1209            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1210                let offset = self.data.len();
1211                self.data.extend_from_slice(cf_name.as_bytes());
1212                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1213                self.entries.push(EntryHeader {
1214                    offset,
1215                    cf_name_len: cf_name.len(),
1216                    key_len,
1217                    is_put: false,
1218                });
1219                Ok(())
1220            })?;
1221        Ok(())
1222    }
1223
1224    pub fn size_in_bytes(&self) -> usize {
1225        self.data.len()
1226    }
1227}
1228
1229/// Provides a mutable struct to form a collection of database write operations, and execute them.
1230///
1231/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1232///  ie. they are all written or none is.
1233/// This is also true of operations across column families in the same database.
1234///
1235/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1236/// with each operation.
1237///
1238/// ```
1239/// use typed_store::rocks::*;
1240/// use tempfile::tempdir;
1241/// use typed_store::Map;
1242/// use typed_store::metrics::DBMetrics;
1243/// use prometheus::Registry;
1244/// use core::fmt::Error;
1245/// use std::sync::Arc;
1246///
1247/// #[tokio::main]
1248/// async fn main() -> Result<(), Error> {
1249/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1250///
1251/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1252///     .expect("Failed to open storage");
1253/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1254///
1255/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1256///     .expect("Failed to open storage");
1257/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1258///
1259/// let mut batch = db_cf_1.batch();
1260/// batch
1261///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1262///     .expect("Failed to batch insert")
1263///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1264///     .expect("Failed to batch insert");
1265///
1266/// let _ = batch.write().expect("Failed to execute batch");
1267/// for (k, v) in keys_vals_1 {
1268///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1269///     assert_eq!(Some(v), val);
1270/// }
1271///
1272/// for (k, v) in keys_vals_2 {
1273///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1274///     assert_eq!(Some(v), val);
1275/// }
1276/// Ok(())
1277/// }
1278/// ```
1279///
1280pub struct DBBatch {
1281    database: Arc<Database>,
1282    batch: StorageWriteBatch,
1283    db_metrics: Arc<DBMetrics>,
1284    write_sample_interval: SamplingInterval,
1285}
1286
1287impl DBBatch {
1288    /// Create a new batch associated with a DB reference.
1289    ///
1290    /// Use `open_cf` to get the DB reference or an existing open database.
1291    pub fn new(
1292        dbref: &Arc<Database>,
1293        batch: StorageWriteBatch,
1294        db_metrics: &Arc<DBMetrics>,
1295        write_sample_interval: &SamplingInterval,
1296    ) -> Self {
1297        DBBatch {
1298            database: dbref.clone(),
1299            batch,
1300            db_metrics: db_metrics.clone(),
1301            write_sample_interval: write_sample_interval.clone(),
1302        }
1303    }
1304
1305    /// Consume the batch and write its operations to the database
1306    #[instrument(level = "trace", skip_all, err)]
1307    pub fn write(self) -> Result<(), TypedStoreError> {
1308        let mut write_options = rocksdb::WriteOptions::default();
1309
1310        if write_sync_enabled() {
1311            write_options.set_sync(true);
1312        }
1313
1314        self.write_opt(write_options)
1315    }
1316
1317    /// Consume the batch and write its operations to the database with custom write options
1318    #[instrument(level = "trace", skip_all, err)]
1319    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1320        let db_name = self.database.db_name();
1321        let timer = self
1322            .db_metrics
1323            .op_metrics
1324            .rocksdb_batch_commit_latency_seconds
1325            .with_label_values(&[&db_name])
1326            .start_timer();
1327        let batch_size = self.size_in_bytes();
1328
1329        let perf_ctx = if self.write_sample_interval.sample() {
1330            Some(RocksDBPerfContext)
1331        } else {
1332            None
1333        };
1334
1335        self.database
1336            .write_opt_internal(self.batch, &write_options)?;
1337
1338        self.db_metrics
1339            .op_metrics
1340            .rocksdb_batch_commit_bytes
1341            .with_label_values(&[&db_name])
1342            .observe(batch_size as f64);
1343
1344        if perf_ctx.is_some() {
1345            self.db_metrics
1346                .write_perf_ctx_metrics
1347                .report_metrics(&db_name);
1348        }
1349        let elapsed = timer.stop_and_record();
1350        if elapsed > 1.0 {
1351            warn!(?elapsed, ?db_name, "very slow batch write");
1352            self.db_metrics
1353                .op_metrics
1354                .rocksdb_very_slow_batch_writes_count
1355                .with_label_values(&[&db_name])
1356                .inc();
1357            self.db_metrics
1358                .op_metrics
1359                .rocksdb_very_slow_batch_writes_duration_ms
1360                .with_label_values(&[&db_name])
1361                .inc_by((elapsed * 1000.0) as u64);
1362        }
1363        Ok(())
1364    }
1365
1366    pub fn size_in_bytes(&self) -> usize {
1367        match self.batch {
1368            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1369            StorageWriteBatch::InMemory(_) => 0,
1370            // TODO: implement size_in_bytes method
1371            #[cfg(tidehunter)]
1372            StorageWriteBatch::TideHunter(_) => 0,
1373        }
1374    }
1375
1376    /// Replay all operations from `StagedBatch`es into this batch.
1377    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1378        for raw_batch in raw_batches {
1379            let data = &raw_batch.data;
1380            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1381                let end = raw_batch
1382                    .entries
1383                    .get(i + 1)
1384                    .map_or(data.len(), |next| next.offset);
1385                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1386                let key_start = hdr.offset + hdr.cf_name_len;
1387                let key = &data[key_start..key_start + hdr.key_len];
1388                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1389                let cf_name = std::str::from_utf8(cf_bytes)
1390                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1391
1392                if hdr.is_put {
1393                    let value = &data[key_start + hdr.key_len..end];
1394                    match &mut self.batch {
1395                        StorageWriteBatch::Rocks(b) => {
1396                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1397                        }
1398                        StorageWriteBatch::InMemory(b) => {
1399                            b.put_cf(cf_name, key, value);
1400                        }
1401                        #[cfg(tidehunter)]
1402                        _ => {
1403                            return Err(TypedStoreError::RocksDBError(
1404                                "concat not supported for TideHunter".to_string(),
1405                            ));
1406                        }
1407                    }
1408                } else {
1409                    match &mut self.batch {
1410                        StorageWriteBatch::Rocks(b) => {
1411                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1412                        }
1413                        StorageWriteBatch::InMemory(b) => {
1414                            b.delete_cf(cf_name, key);
1415                        }
1416                        #[cfg(tidehunter)]
1417                        _ => {
1418                            return Err(TypedStoreError::RocksDBError(
1419                                "concat not supported for TideHunter".to_string(),
1420                            ));
1421                        }
1422                    }
1423                }
1424            }
1425        }
1426        Ok(self)
1427    }
1428
1429    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1430        &mut self,
1431        db: &DBMap<K, V>,
1432        purged_vals: impl IntoIterator<Item = J>,
1433    ) -> Result<(), TypedStoreError> {
1434        if !Arc::ptr_eq(&db.db, &self.database) {
1435            return Err(TypedStoreError::CrossDBBatch);
1436        }
1437
1438        purged_vals
1439            .into_iter()
1440            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1441                let k_buf = be_fix_int_ser(k.borrow());
1442                match (&mut self.batch, &db.column_family) {
1443                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1444                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1445                    }
1446                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1447                        b.delete_cf(name, k_buf)
1448                    }
1449                    #[cfg(tidehunter)]
1450                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1451                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1452                    }
1453                    _ => Err(TypedStoreError::RocksDBError(
1454                        "typed store invariant violation".to_string(),
1455                    ))?,
1456                }
1457                Ok(())
1458            })?;
1459        Ok(())
1460    }
1461
1462    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1463    /// by writing a range delete tombstone in the db map
1464    /// If the DBMap is configured with ignore_range_deletions set to false,
1465    /// the effect of this write will be visible immediately i.e. you won't
1466    /// see old values when you do a lookup or scan. But if it is configured
1467    /// with ignore_range_deletions set to true, the old value are visible until
1468    /// compaction actually deletes them which will happen sometime after. By
1469    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1470    /// overridden in the config), so please use this function with caution
1471    pub fn schedule_delete_range<K: Serialize, V>(
1472        &mut self,
1473        db: &DBMap<K, V>,
1474        from: &K,
1475        to: &K,
1476    ) -> Result<(), TypedStoreError> {
1477        if !Arc::ptr_eq(&db.db, &self.database) {
1478            return Err(TypedStoreError::CrossDBBatch);
1479        }
1480
1481        let from_buf = be_fix_int_ser(from);
1482        let to_buf = be_fix_int_ser(to);
1483
1484        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1485            b.delete_range_cf(
1486                &rocks_cf_from_db(&self.database, db.cf_name())?,
1487                from_buf,
1488                to_buf,
1489            );
1490        }
1491        Ok(())
1492    }
1493
1494    /// inserts a range of (key, value) pairs given as an iterator
1495    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1496        &mut self,
1497        db: &DBMap<K, V>,
1498        new_vals: impl IntoIterator<Item = (J, U)>,
1499    ) -> Result<&mut Self, TypedStoreError> {
1500        if !Arc::ptr_eq(&db.db, &self.database) {
1501            return Err(TypedStoreError::CrossDBBatch);
1502        }
1503        let mut total = 0usize;
1504        new_vals
1505            .into_iter()
1506            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1507                let k_buf = be_fix_int_ser(k.borrow());
1508                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1509                total += k_buf.len() + v_buf.len();
1510                if db.opts.log_value_hash {
1511                    let key_hash = default_hash(&k_buf);
1512                    let value_hash = default_hash(&v_buf);
1513                    debug!(
1514                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1515                        db.cf_name(),
1516                        key_hash,
1517                        value_hash
1518                    );
1519                }
1520                match (&mut self.batch, &db.column_family) {
1521                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1522                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1523                    }
1524                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1525                        b.put_cf(name, k_buf, v_buf)
1526                    }
1527                    #[cfg(tidehunter)]
1528                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1529                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1530                    }
1531                    _ => Err(TypedStoreError::RocksDBError(
1532                        "typed store invariant violation".to_string(),
1533                    ))?,
1534                }
1535                Ok(())
1536            })?;
1537        self.db_metrics
1538            .op_metrics
1539            .rocksdb_batch_put_bytes
1540            .with_label_values(&[&db.cf])
1541            .observe(total as f64);
1542        Ok(self)
1543    }
1544
1545    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1546        &mut self,
1547        db: &DBMap<K, V>,
1548        new_vals: impl IntoIterator<Item = (J, U)>,
1549    ) -> Result<&mut Self, TypedStoreError> {
1550        if !Arc::ptr_eq(&db.db, &self.database) {
1551            return Err(TypedStoreError::CrossDBBatch);
1552        }
1553        new_vals
1554            .into_iter()
1555            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1556                let k_buf = be_fix_int_ser(k.borrow());
1557                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1558                match &mut self.batch {
1559                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1560                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1561                        k_buf,
1562                        v_buf,
1563                    ),
1564                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1565                }
1566                Ok(())
1567            })?;
1568        Ok(self)
1569    }
1570}
1571
1572impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1573where
1574    K: Serialize + DeserializeOwned,
1575    V: Serialize + DeserializeOwned,
1576{
1577    type Error = TypedStoreError;
1578
1579    #[instrument(level = "trace", skip_all, err)]
1580    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1581        let key_buf = be_fix_int_ser(key);
1582        let readopts = self.opts.readopts();
1583        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1584            && self
1585                .db
1586                .get(&self.column_family, &key_buf, &readopts)?
1587                .is_some())
1588    }
1589
1590    #[instrument(level = "trace", skip_all, err)]
1591    fn multi_contains_keys<J>(
1592        &self,
1593        keys: impl IntoIterator<Item = J>,
1594    ) -> Result<Vec<bool>, Self::Error>
1595    where
1596        J: Borrow<K>,
1597    {
1598        let values = self.multi_get_pinned(keys)?;
1599        Ok(values.into_iter().map(|v| v.is_some()).collect())
1600    }
1601
1602    #[instrument(level = "trace", skip_all, err)]
1603    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1604        let _timer = self
1605            .db_metrics
1606            .op_metrics
1607            .rocksdb_get_latency_seconds
1608            .with_label_values(&[&self.cf])
1609            .start_timer();
1610        let perf_ctx = if self.get_sample_interval.sample() {
1611            Some(RocksDBPerfContext)
1612        } else {
1613            None
1614        };
1615        let key_buf = be_fix_int_ser(key);
1616        let res = self
1617            .db
1618            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1619        self.db_metrics
1620            .op_metrics
1621            .rocksdb_get_bytes
1622            .with_label_values(&[&self.cf])
1623            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1624        if perf_ctx.is_some() {
1625            self.db_metrics
1626                .read_perf_ctx_metrics
1627                .report_metrics(&self.cf);
1628        }
1629        match res {
1630            Some(data) => {
1631                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1632                if value.is_err() {
1633                    let key_hash = default_hash(&key_buf);
1634                    let value_hash = default_hash(&data);
1635                    debug_fatal!(
1636                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1637                        self.cf_name(),
1638                        key_hash,
1639                        value_hash,
1640                        value.as_ref().err().unwrap()
1641                    );
1642                }
1643                Ok(Some(value?))
1644            }
1645            None => Ok(None),
1646        }
1647    }
1648
1649    #[instrument(level = "trace", skip_all, err)]
1650    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1651        let timer = self
1652            .db_metrics
1653            .op_metrics
1654            .rocksdb_put_latency_seconds
1655            .with_label_values(&[&self.cf])
1656            .start_timer();
1657        let perf_ctx = if self.write_sample_interval.sample() {
1658            Some(RocksDBPerfContext)
1659        } else {
1660            None
1661        };
1662        let key_buf = be_fix_int_ser(key);
1663        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1664        self.db_metrics
1665            .op_metrics
1666            .rocksdb_put_bytes
1667            .with_label_values(&[&self.cf])
1668            .observe((key_buf.len() + value_buf.len()) as f64);
1669        if perf_ctx.is_some() {
1670            self.db_metrics
1671                .write_perf_ctx_metrics
1672                .report_metrics(&self.cf);
1673        }
1674        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1675
1676        let elapsed = timer.stop_and_record();
1677        if elapsed > 1.0 {
1678            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1679            self.db_metrics
1680                .op_metrics
1681                .rocksdb_very_slow_puts_count
1682                .with_label_values(&[&self.cf])
1683                .inc();
1684            self.db_metrics
1685                .op_metrics
1686                .rocksdb_very_slow_puts_duration_ms
1687                .with_label_values(&[&self.cf])
1688                .inc_by((elapsed * 1000.0) as u64);
1689        }
1690
1691        Ok(())
1692    }
1693
1694    #[instrument(level = "trace", skip_all, err)]
1695    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1696        let _timer = self
1697            .db_metrics
1698            .op_metrics
1699            .rocksdb_delete_latency_seconds
1700            .with_label_values(&[&self.cf])
1701            .start_timer();
1702        let perf_ctx = if self.write_sample_interval.sample() {
1703            Some(RocksDBPerfContext)
1704        } else {
1705            None
1706        };
1707        let key_buf = be_fix_int_ser(key);
1708        self.db.delete_cf(&self.column_family, key_buf)?;
1709        self.db_metrics
1710            .op_metrics
1711            .rocksdb_deletes
1712            .with_label_values(&[&self.cf])
1713            .inc();
1714        if perf_ctx.is_some() {
1715            self.db_metrics
1716                .write_perf_ctx_metrics
1717                .report_metrics(&self.cf);
1718        }
1719        Ok(())
1720    }
1721
1722    /// Writes a range delete tombstone to delete all entries in the db map
1723    /// If the DBMap is configured with ignore_range_deletions set to false,
1724    /// the effect of this write will be visible immediately i.e. you won't
1725    /// see old values when you do a lookup or scan. But if it is configured
1726    /// with ignore_range_deletions set to true, the old value are visible until
1727    /// compaction actually deletes them which will happen sometime after. By
1728    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1729    /// overridden in the config), so please use this function with caution
1730    #[instrument(level = "trace", skip_all, err)]
1731    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1732        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1733        let last_key = self
1734            .reversed_safe_iter_with_bounds(None, None)?
1735            .next()
1736            .transpose()?
1737            .map(|(k, _v)| k);
1738        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1739            let mut batch = self.batch();
1740            batch.schedule_delete_range(self, &first_key, &last_key)?;
1741            batch.write()?;
1742        }
1743        Ok(())
1744    }
1745
1746    fn is_empty(&self) -> bool {
1747        self.safe_iter().next().is_none()
1748    }
1749
1750    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1751        match &self.db.storage {
1752            Storage::Rocks(db) => {
1753                let db_iter = db
1754                    .underlying
1755                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1756                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1757                Box::new(SafeIter::new(
1758                    self.cf.clone(),
1759                    db_iter,
1760                    _timer,
1761                    _perf_ctx,
1762                    bytes_scanned,
1763                    keys_scanned,
1764                    Some(self.db_metrics.clone()),
1765                ))
1766            }
1767            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1768            #[cfg(tidehunter)]
1769            Storage::TideHunter(db) => match &self.column_family {
1770                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1771                    db.iterator(*ks),
1772                    prefix,
1773                    self.start_iter_timer(),
1774                )),
1775                _ => unreachable!("storage backend invariant violation"),
1776            },
1777        }
1778    }
1779
1780    fn safe_iter_with_bounds(
1781        &'a self,
1782        lower_bound: Option<K>,
1783        upper_bound: Option<K>,
1784    ) -> DbIterator<'a, (K, V)> {
1785        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1786        match &self.db.storage {
1787            Storage::Rocks(db) => {
1788                let readopts =
1789                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1790                let db_iter = db
1791                    .underlying
1792                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1793                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1794                Box::new(SafeIter::new(
1795                    self.cf.clone(),
1796                    db_iter,
1797                    _timer,
1798                    _perf_ctx,
1799                    bytes_scanned,
1800                    keys_scanned,
1801                    Some(self.db_metrics.clone()),
1802                ))
1803            }
1804            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1805            #[cfg(tidehunter)]
1806            Storage::TideHunter(db) => match &self.column_family {
1807                ColumnFamily::TideHunter((ks, prefix)) => {
1808                    let mut iter = db.iterator(*ks);
1809                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1810                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1811                }
1812                _ => unreachable!("storage backend invariant violation"),
1813            },
1814        }
1815    }
1816
1817    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1818        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1819        match &self.db.storage {
1820            Storage::Rocks(db) => {
1821                let readopts =
1822                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1823                let db_iter = db
1824                    .underlying
1825                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1826                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1827                Box::new(SafeIter::new(
1828                    self.cf.clone(),
1829                    db_iter,
1830                    _timer,
1831                    _perf_ctx,
1832                    bytes_scanned,
1833                    keys_scanned,
1834                    Some(self.db_metrics.clone()),
1835                ))
1836            }
1837            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1838            #[cfg(tidehunter)]
1839            Storage::TideHunter(db) => match &self.column_family {
1840                ColumnFamily::TideHunter((ks, prefix)) => {
1841                    let mut iter = db.iterator(*ks);
1842                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1843                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1844                }
1845                _ => unreachable!("storage backend invariant violation"),
1846            },
1847        }
1848    }
1849
1850    /// Returns a vector of values corresponding to the keys provided.
1851    #[instrument(level = "trace", skip_all, err)]
1852    fn multi_get<J>(
1853        &self,
1854        keys: impl IntoIterator<Item = J>,
1855    ) -> Result<Vec<Option<V>>, TypedStoreError>
1856    where
1857        J: Borrow<K>,
1858    {
1859        let results = self.multi_get_pinned(keys)?;
1860        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1861            .into_iter()
1862            .map(|value_byte| match value_byte {
1863                Some(data) => Ok(Some(
1864                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1865                )),
1866                None => Ok(None),
1867            })
1868            .collect();
1869
1870        values_parsed
1871    }
1872
1873    /// Convenience method for batch insertion
1874    #[instrument(level = "trace", skip_all, err)]
1875    fn multi_insert<J, U>(
1876        &self,
1877        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1878    ) -> Result<(), Self::Error>
1879    where
1880        J: Borrow<K>,
1881        U: Borrow<V>,
1882    {
1883        let mut batch = self.batch();
1884        batch.insert_batch(self, key_val_pairs)?;
1885        batch.write()
1886    }
1887
1888    /// Convenience method for batch removal
1889    #[instrument(level = "trace", skip_all, err)]
1890    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1891    where
1892        J: Borrow<K>,
1893    {
1894        let mut batch = self.batch();
1895        batch.delete_batch(self, keys)?;
1896        batch.write()
1897    }
1898
1899    /// Try to catch up with primary when running as secondary
1900    #[instrument(level = "trace", skip_all, err)]
1901    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1902        if let Storage::Rocks(rocks) = &self.db.storage {
1903            rocks
1904                .underlying
1905                .try_catch_up_with_primary()
1906                .map_err(typed_store_err_from_rocks_err)?;
1907        }
1908        Ok(())
1909    }
1910}
1911
1912/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1913#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1914pub fn open_cf_opts<P: AsRef<Path>>(
1915    path: P,
1916    db_options: Option<rocksdb::Options>,
1917    metric_conf: MetricConf,
1918    opt_cfs: &[(&str, rocksdb::Options)],
1919) -> Result<Arc<Database>, TypedStoreError> {
1920    let path = path.as_ref();
1921    ensure_database_type(path, StorageType::Rocks)
1922        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1923    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1924    // because rocksdb uses the simulated clock when creating its background threads, but then
1925    // those threads see the real wall clock (because they are not the test thread), which causes
1926    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1927    // resolves the issue.
1928    //
1929    // This is a no-op in non-simulator builds.
1930
1931    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1932    nondeterministic!({
1933        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1934        options.create_if_missing(true);
1935        options.create_missing_column_families(true);
1936        let rocksdb = {
1937            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1938                &options,
1939                path,
1940                cfs.into_iter()
1941                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1942            )
1943            .map_err(typed_store_err_from_rocks_err)?
1944        };
1945        Ok(Arc::new(Database::new(
1946            Storage::Rocks(RocksDB {
1947                underlying: rocksdb,
1948            }),
1949            metric_conf,
1950            None,
1951        )))
1952    })
1953}
1954
1955/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1956pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1957    primary_path: P,
1958    secondary_path: Option<P>,
1959    db_options: Option<rocksdb::Options>,
1960    metric_conf: MetricConf,
1961    opt_cfs: &[(&str, rocksdb::Options)],
1962) -> Result<Arc<Database>, TypedStoreError> {
1963    let primary_path = primary_path.as_ref();
1964    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1965    // See comment above for explanation of why nondeterministic is necessary here.
1966    nondeterministic!({
1967        // Customize database options
1968        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1969
1970        fdlimit::raise_fd_limit();
1971        // This is a requirement by RocksDB when opening as secondary
1972        options.set_max_open_files(-1);
1973
1974        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1975        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1976            .ok()
1977            .unwrap_or_default();
1978
1979        let default_db_options = default_db_options();
1980        // Add CFs not explicitly listed
1981        for cf_key in cfs.iter() {
1982            if !opt_cfs.contains_key(&cf_key[..]) {
1983                opt_cfs.insert(cf_key, default_db_options.options.clone());
1984            }
1985        }
1986
1987        let primary_path = primary_path.to_path_buf();
1988        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1989            let mut s = primary_path.clone();
1990            s.pop();
1991            s.push("SECONDARY");
1992            s.as_path().to_path_buf()
1993        });
1994
1995        ensure_database_type(&primary_path, StorageType::Rocks)
1996            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1997        ensure_database_type(&secondary_path, StorageType::Rocks)
1998            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1999
2000        let rocksdb = {
2001            options.create_if_missing(true);
2002            options.create_missing_column_families(true);
2003            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2004                &options,
2005                &primary_path,
2006                &secondary_path,
2007                opt_cfs
2008                    .iter()
2009                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2010            )
2011            .map_err(typed_store_err_from_rocks_err)?;
2012            db.try_catch_up_with_primary()
2013                .map_err(typed_store_err_from_rocks_err)?;
2014            db
2015        };
2016        Ok(Arc::new(Database::new(
2017            Storage::Rocks(RocksDB {
2018                underlying: rocksdb,
2019            }),
2020            metric_conf,
2021            None,
2022        )))
2023    })
2024}
2025
2026// Drops a database if there is no other handle to it, with retries and timeout.
2027#[cfg(not(tidehunter))]
2028pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2029    let mut backoff = backoff::ExponentialBackoff {
2030        max_elapsed_time: Some(timeout),
2031        ..Default::default()
2032    };
2033    loop {
2034        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2035            Ok(()) => return Ok(()),
2036            Err(err) => match backoff.next_backoff() {
2037                Some(duration) => tokio::time::sleep(duration).await,
2038                None => return Err(err),
2039            },
2040        }
2041    }
2042}
2043
2044#[cfg(tidehunter)]
2045pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
2046    std::fs::remove_dir_all(path)
2047}
2048
2049fn populate_missing_cfs(
2050    input_cfs: &[(&str, rocksdb::Options)],
2051    path: &Path,
2052) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2053    let mut cfs = vec![];
2054    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2055    let existing_cfs =
2056        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2057            .ok()
2058            .unwrap_or_default();
2059
2060    for cf_name in existing_cfs {
2061        if !input_cf_index.contains(&cf_name[..]) {
2062            cfs.push((cf_name, rocksdb::Options::default()));
2063        }
2064    }
2065    cfs.extend(
2066        input_cfs
2067            .iter()
2068            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2069    );
2070    Ok(cfs)
2071}
2072
2073fn default_hash(value: &[u8]) -> Digest<32> {
2074    let mut hasher = fastcrypto::hash::Blake2b256::default();
2075    hasher.update(value);
2076    hasher.finalize()
2077}