typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    fn multi_get<I, K>(
231        &self,
232        cf: &ColumnFamily,
233        keys: I,
234        readopts: &ReadOptions,
235    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236    where
237        I: IntoIterator<Item = K>,
238        K: AsRef<[u8]>,
239    {
240        match (&self.storage, cf) {
241            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242                let keys_vec: Vec<K> = keys.into_iter().collect();
243                let res = db.underlying.batched_multi_get_cf_opt(
244                    &cf.rocks_cf(db),
245                    keys_vec.iter(),
246                    /* sorted_input */ false,
247                    readopts,
248                );
249                res.into_iter()
250                    .map(|r| {
251                        r.map_err(typed_store_err_from_rocks_err)
252                            .map(|item| item.map(GetResult::Rocks))
253                    })
254                    .collect()
255            }
256            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257                .multi_get(cf_name, keys)
258                .into_iter()
259                .map(|r| Ok(r.map(GetResult::InMemory)))
260                .collect(),
261            #[cfg(tidehunter)]
262            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263                let res = keys.into_iter().map(|k| {
264                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265                        .map_err(typed_store_error_from_th_error)
266                });
267                res.into_iter()
268                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269                    .collect()
270            }
271            _ => unreachable!("typed store invariant violation"),
272        }
273    }
274
275    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276        match &self.storage {
277            Storage::Rocks(db) => db.underlying.drop_cf(name),
278            Storage::InMemory(db) => {
279                db.drop_cf(name);
280                Ok(())
281            }
282            #[cfg(tidehunter)]
283            Storage::TideHunter(_) => {
284                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285            }
286        }
287    }
288
289    pub fn delete_file_in_range<K: AsRef<[u8]>>(
290        &self,
291        cf: &impl AsColumnFamilyRef,
292        from: K,
293        to: K,
294    ) -> Result<(), rocksdb::Error> {
295        match &self.storage {
296            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298        }
299    }
300
301    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302        fail_point!("delete-cf-before");
303        let ret = match (&self.storage, cf) {
304            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305                .underlying
306                .delete_cf(&cf.rocks_cf(db), key)
307                .map_err(typed_store_err_from_rocks_err),
308            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309                db.delete(cf_name, key.as_ref());
310                Ok(())
311            }
312            #[cfg(tidehunter)]
313            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314                .remove(*ks, transform_th_key(key.as_ref(), prefix))
315                .map_err(typed_store_error_from_th_error),
316            _ => Err(TypedStoreError::RocksDBError(
317                "typed store invariant violation".to_string(),
318            )),
319        };
320        fail_point!("delete-cf-after");
321        #[allow(clippy::let_and_return)]
322        ret
323    }
324
325    pub fn path_for_pruning(&self) -> &Path {
326        match &self.storage {
327            Storage::Rocks(rocks) => rocks.underlying.path(),
328            _ => unimplemented!("method is only supported for rocksdb backend"),
329        }
330    }
331
332    fn put_cf(
333        &self,
334        cf: &ColumnFamily,
335        key: Vec<u8>,
336        value: Vec<u8>,
337    ) -> Result<(), TypedStoreError> {
338        fail_point!("put-cf-before");
339        let ret = match (&self.storage, cf) {
340            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341                .underlying
342                .put_cf(&cf.rocks_cf(db), key, value)
343                .map_err(typed_store_err_from_rocks_err),
344            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345                db.put(cf_name, key, value);
346                Ok(())
347            }
348            #[cfg(tidehunter)]
349            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350                .insert(*ks, transform_th_key(&key, prefix), value)
351                .map_err(typed_store_error_from_th_error),
352            _ => Err(TypedStoreError::RocksDBError(
353                "typed store invariant violation".to_string(),
354            )),
355        };
356        fail_point!("put-cf-after");
357        #[allow(clippy::let_and_return)]
358        ret
359    }
360
361    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362        &self,
363        cf_name: &str,
364        key: K,
365        readopts: &ReadOptions,
366    ) -> bool {
367        match &self.storage {
368            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
369            // but no false negatives. We use it to short-circuit the absent case
370            Storage::Rocks(rocks) => {
371                rocks
372                    .underlying
373                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374            }
375            _ => true,
376        }
377    }
378
379    pub(crate) fn write_opt_internal(
380        &self,
381        batch: StorageWriteBatch,
382        write_options: &rocksdb::WriteOptions,
383    ) -> Result<(), TypedStoreError> {
384        fail_point!("batch-write-before");
385        let ret = match (&self.storage, batch) {
386            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387                .underlying
388                .write_opt(batch, write_options)
389                .map_err(typed_store_err_from_rocks_err),
390            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391                // InMemory doesn't support write options
392                db.write(batch);
393                Ok(())
394            }
395            #[cfg(tidehunter)]
396            (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
397                // TideHunter doesn't support write options
398                db.write_batch(batch)
399                    .map_err(typed_store_error_from_th_error)
400            }
401            _ => Err(TypedStoreError::RocksDBError(
402                "using invalid batch type for the database".to_string(),
403            )),
404        };
405        fail_point!("batch-write-after");
406        #[allow(clippy::let_and_return)]
407        ret
408    }
409
410    #[cfg(tidehunter)]
411    pub fn start_relocation(&self) -> anyhow::Result<()> {
412        if let Storage::TideHunter(db) = &self.storage {
413            db.start_relocation()?;
414        }
415        Ok(())
416    }
417
418    #[cfg(tidehunter)]
419    pub fn drop_cells_in_range(
420        &self,
421        ks: KeySpace,
422        from_inclusive: &[u8],
423        to_inclusive: &[u8],
424    ) -> anyhow::Result<()> {
425        if let Storage::TideHunter(db) = &self.storage {
426            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
427                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
428        } else {
429            panic!("drop_cells_in_range called on non-TideHunter storage");
430        }
431        Ok(())
432    }
433
434    pub fn compact_range_cf<K: AsRef<[u8]>>(
435        &self,
436        cf_name: &str,
437        start: Option<K>,
438        end: Option<K>,
439    ) {
440        if let Storage::Rocks(rocksdb) = &self.storage {
441            rocksdb
442                .underlying
443                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
444        }
445    }
446
447    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
448        // TODO: implement for other storage types
449        if let Storage::Rocks(rocks) = &self.storage {
450            let checkpoint =
451                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
452            checkpoint
453                .create_checkpoint(path)
454                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
455        }
456        Ok(())
457    }
458
459    pub fn get_sampling_interval(&self) -> SamplingInterval {
460        self.metric_conf.read_sample_interval.new_from_self()
461    }
462
463    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
464        self.metric_conf.read_sample_interval.new_from_self()
465    }
466
467    pub fn write_sampling_interval(&self) -> SamplingInterval {
468        self.metric_conf.write_sample_interval.new_from_self()
469    }
470
471    pub fn iter_sampling_interval(&self) -> SamplingInterval {
472        self.metric_conf.iter_sample_interval.new_from_self()
473    }
474
475    fn db_name(&self) -> String {
476        let name = &self.metric_conf.db_name;
477        if name.is_empty() {
478            "default".to_string()
479        } else {
480            name.clone()
481        }
482    }
483
484    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
485        match &self.storage {
486            Storage::Rocks(rocks) => rocks.underlying.live_files(),
487            _ => Ok(vec![]),
488        }
489    }
490}
491
492fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
493    rocks_db
494        .underlying
495        .cf_handle(cf_name)
496        .expect("Map-keying column family should have been checked at DB creation")
497}
498
499fn rocks_cf_from_db<'a>(
500    db: &'a Database,
501    cf_name: &str,
502) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
503    match &db.storage {
504        Storage::Rocks(rocksdb) => Ok(rocksdb
505            .underlying
506            .cf_handle(cf_name)
507            .expect("Map-keying column family should have been checked at DB creation")),
508        _ => Err(TypedStoreError::RocksDBError(
509            "using invalid batch type for the database".to_string(),
510        )),
511    }
512}
513
514#[derive(Debug, Default)]
515pub struct MetricConf {
516    pub db_name: String,
517    pub read_sample_interval: SamplingInterval,
518    pub write_sample_interval: SamplingInterval,
519    pub iter_sample_interval: SamplingInterval,
520}
521
522impl MetricConf {
523    pub fn new(db_name: &str) -> Self {
524        if db_name.is_empty() {
525            error!("A meaningful db name should be used for metrics reporting.")
526        }
527        Self {
528            db_name: db_name.to_string(),
529            read_sample_interval: SamplingInterval::default(),
530            write_sample_interval: SamplingInterval::default(),
531            iter_sample_interval: SamplingInterval::default(),
532        }
533    }
534
535    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
536        Self {
537            db_name: self.db_name,
538            read_sample_interval: read_interval,
539            write_sample_interval: SamplingInterval::default(),
540            iter_sample_interval: SamplingInterval::default(),
541        }
542    }
543}
544const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
545const METRICS_ERROR: i64 = -1;
546
547/// An interface to a rocksDB database, keyed by a columnfamily
548#[derive(Clone, Debug)]
549pub struct DBMap<K, V> {
550    pub db: Arc<Database>,
551    _phantom: PhantomData<fn(K) -> V>,
552    column_family: ColumnFamily,
553    // the column family under which the map is stored
554    cf: String,
555    pub opts: ReadWriteOptions,
556    db_metrics: Arc<DBMetrics>,
557    get_sample_interval: SamplingInterval,
558    multiget_sample_interval: SamplingInterval,
559    write_sample_interval: SamplingInterval,
560    iter_sample_interval: SamplingInterval,
561    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
562}
563
564unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
565
566impl<K, V> DBMap<K, V> {
567    pub(crate) fn new(
568        db: Arc<Database>,
569        opts: &ReadWriteOptions,
570        opt_cf: &str,
571        column_family: ColumnFamily,
572        is_deprecated: bool,
573    ) -> Self {
574        let db_cloned = Arc::downgrade(&db.clone());
575        let db_metrics = DBMetrics::get();
576        let db_metrics_cloned = db_metrics.clone();
577        let cf = opt_cf.to_string();
578
579        let (sender, mut recv) = tokio::sync::oneshot::channel();
580        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
581            tokio::task::spawn(async move {
582                let mut interval =
583                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
584                loop {
585                    tokio::select! {
586                        _ = interval.tick() => {
587                            if let Some(db) = db_cloned.upgrade() {
588                                let cf = cf.clone();
589                                let db_metrics = db_metrics.clone();
590                                if let Err(e) = tokio::task::spawn_blocking(move || {
591                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
592                                }).await {
593                                    error!("Failed to log metrics with error: {}", e);
594                                }
595                            }
596                        }
597                        _ = &mut recv => break,
598                    }
599                }
600                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
601            });
602        }
603        DBMap {
604            db: db.clone(),
605            opts: opts.clone(),
606            _phantom: PhantomData,
607            column_family,
608            cf: opt_cf.to_string(),
609            db_metrics: db_metrics_cloned,
610            _metrics_task_cancel_handle: Arc::new(sender),
611            get_sample_interval: db.get_sampling_interval(),
612            multiget_sample_interval: db.multiget_sampling_interval(),
613            write_sample_interval: db.write_sampling_interval(),
614            iter_sample_interval: db.iter_sampling_interval(),
615        }
616    }
617
618    /// Reopens an open database as a typed map operating under a specific column family.
619    /// if no column family is passed, the default column family is used.
620    #[instrument(level = "debug", skip(db), err)]
621    pub fn reopen(
622        db: &Arc<Database>,
623        opt_cf: Option<&str>,
624        rw_options: &ReadWriteOptions,
625        is_deprecated: bool,
626    ) -> Result<Self, TypedStoreError> {
627        let cf_key = opt_cf
628            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
629            .to_owned();
630        Ok(DBMap::new(
631            db.clone(),
632            rw_options,
633            &cf_key,
634            ColumnFamily::Rocks(cf_key.to_string()),
635            is_deprecated,
636        ))
637    }
638
639    #[cfg(tidehunter)]
640    pub fn reopen_th(
641        db: Arc<Database>,
642        cf_name: &str,
643        ks: KeySpace,
644        prefix: Option<Vec<u8>>,
645    ) -> Self {
646        DBMap::new(
647            db,
648            &ReadWriteOptions::default(),
649            cf_name,
650            ColumnFamily::TideHunter((ks, prefix.clone())),
651            false,
652        )
653    }
654
655    pub fn cf_name(&self) -> &str {
656        &self.cf
657    }
658
659    pub fn batch(&self) -> DBBatch {
660        let batch = match &self.db.storage {
661            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
662            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
663            #[cfg(tidehunter)]
664            Storage::TideHunter(_) => {
665                StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
666            }
667        };
668        DBBatch::new(
669            &self.db,
670            batch,
671            &self.db_metrics,
672            &self.write_sample_interval,
673        )
674    }
675
676    pub fn flush(&self) -> Result<(), TypedStoreError> {
677        self.db.flush()
678    }
679
680    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
681        let from_buf = be_fix_int_ser(start);
682        let to_buf = be_fix_int_ser(end);
683        self.db
684            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
685        Ok(())
686    }
687
688    pub fn compact_range_raw(
689        &self,
690        cf_name: &str,
691        start: Vec<u8>,
692        end: Vec<u8>,
693    ) -> Result<(), TypedStoreError> {
694        self.db.compact_range_cf(cf_name, Some(start), Some(end));
695        Ok(())
696    }
697
698    #[cfg(tidehunter)]
699    pub fn drop_cells_in_range<J: Serialize>(
700        &self,
701        from_inclusive: &J,
702        to_inclusive: &J,
703    ) -> Result<(), TypedStoreError>
704    where
705        K: Serialize,
706    {
707        let from_buf = be_fix_int_ser(from_inclusive);
708        let to_buf = be_fix_int_ser(to_inclusive);
709        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
710            self.db
711                .drop_cells_in_range(*ks, &from_buf, &to_buf)
712                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
713        }
714        Ok(())
715    }
716
717    /// Returns a vector of raw values corresponding to the keys provided.
718    fn multi_get_pinned<J>(
719        &self,
720        keys: impl IntoIterator<Item = J>,
721    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
722    where
723        J: Borrow<K>,
724        K: Serialize,
725    {
726        let _timer = self
727            .db_metrics
728            .op_metrics
729            .rocksdb_multiget_latency_seconds
730            .with_label_values(&[&self.cf])
731            .start_timer();
732        let perf_ctx = if self.multiget_sample_interval.sample() {
733            Some(RocksDBPerfContext)
734        } else {
735            None
736        };
737        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
738        let results: Result<Vec<_>, TypedStoreError> = self
739            .db
740            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
741            .into_iter()
742            .collect();
743        let entries = results?;
744        let entry_size = entries
745            .iter()
746            .flatten()
747            .map(|entry| entry.len())
748            .sum::<usize>();
749        self.db_metrics
750            .op_metrics
751            .rocksdb_multiget_bytes
752            .with_label_values(&[&self.cf])
753            .observe(entry_size as f64);
754        if perf_ctx.is_some() {
755            self.db_metrics
756                .read_perf_ctx_metrics
757                .report_metrics(&self.cf);
758        }
759        Ok(entries)
760    }
761
762    fn get_rocksdb_int_property(
763        rocksdb: &RocksDB,
764        cf: &impl AsColumnFamilyRef,
765        property_name: &std::ffi::CStr,
766    ) -> Result<i64, TypedStoreError> {
767        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
768            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
769            Ok(None) => Ok(0),
770            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
771        }
772    }
773
774    fn report_rocksdb_metrics(
775        database: &Arc<Database>,
776        cf_name: &str,
777        db_metrics: &Arc<DBMetrics>,
778    ) {
779        let Storage::Rocks(rocksdb) = &database.storage else {
780            return;
781        };
782
783        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
784            tracing::warn!(
785                "unable to report metrics for cf {cf_name:?} in db {:?}",
786                database.db_name()
787            );
788            return;
789        };
790
791        db_metrics
792            .cf_metrics
793            .rocksdb_total_sst_files_size
794            .with_label_values(&[cf_name])
795            .set(
796                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
797                    .unwrap_or(METRICS_ERROR),
798            );
799        db_metrics
800            .cf_metrics
801            .rocksdb_total_blob_files_size
802            .with_label_values(&[cf_name])
803            .set(
804                Self::get_rocksdb_int_property(
805                    rocksdb,
806                    &cf,
807                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
808                )
809                .unwrap_or(METRICS_ERROR),
810            );
811        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
812        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
813        let total_num_files: i64 = (0..=6)
814            .map(|level| {
815                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
816                    .unwrap_or(METRICS_ERROR)
817            })
818            .sum();
819        db_metrics
820            .cf_metrics
821            .rocksdb_total_num_files
822            .with_label_values(&[cf_name])
823            .set(total_num_files);
824        db_metrics
825            .cf_metrics
826            .rocksdb_num_level0_files
827            .with_label_values(&[cf_name])
828            .set(
829                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
830                    .unwrap_or(METRICS_ERROR),
831            );
832        db_metrics
833            .cf_metrics
834            .rocksdb_current_size_active_mem_tables
835            .with_label_values(&[cf_name])
836            .set(
837                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
838                    .unwrap_or(METRICS_ERROR),
839            );
840        db_metrics
841            .cf_metrics
842            .rocksdb_size_all_mem_tables
843            .with_label_values(&[cf_name])
844            .set(
845                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
846                    .unwrap_or(METRICS_ERROR),
847            );
848        db_metrics
849            .cf_metrics
850            .rocksdb_num_snapshots
851            .with_label_values(&[cf_name])
852            .set(
853                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
854                    .unwrap_or(METRICS_ERROR),
855            );
856        db_metrics
857            .cf_metrics
858            .rocksdb_oldest_snapshot_time
859            .with_label_values(&[cf_name])
860            .set(
861                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
862                    .unwrap_or(METRICS_ERROR),
863            );
864        db_metrics
865            .cf_metrics
866            .rocksdb_actual_delayed_write_rate
867            .with_label_values(&[cf_name])
868            .set(
869                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
870                    .unwrap_or(METRICS_ERROR),
871            );
872        db_metrics
873            .cf_metrics
874            .rocksdb_is_write_stopped
875            .with_label_values(&[cf_name])
876            .set(
877                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
878                    .unwrap_or(METRICS_ERROR),
879            );
880        db_metrics
881            .cf_metrics
882            .rocksdb_block_cache_capacity
883            .with_label_values(&[cf_name])
884            .set(
885                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
886                    .unwrap_or(METRICS_ERROR),
887            );
888        db_metrics
889            .cf_metrics
890            .rocksdb_block_cache_usage
891            .with_label_values(&[cf_name])
892            .set(
893                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
894                    .unwrap_or(METRICS_ERROR),
895            );
896        db_metrics
897            .cf_metrics
898            .rocksdb_block_cache_pinned_usage
899            .with_label_values(&[cf_name])
900            .set(
901                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
902                    .unwrap_or(METRICS_ERROR),
903            );
904        db_metrics
905            .cf_metrics
906            .rocksdb_estimate_table_readers_mem
907            .with_label_values(&[cf_name])
908            .set(
909                Self::get_rocksdb_int_property(
910                    rocksdb,
911                    &cf,
912                    properties::ESTIMATE_TABLE_READERS_MEM,
913                )
914                .unwrap_or(METRICS_ERROR),
915            );
916        db_metrics
917            .cf_metrics
918            .rocksdb_estimated_num_keys
919            .with_label_values(&[cf_name])
920            .set(
921                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
922                    .unwrap_or(METRICS_ERROR),
923            );
924        db_metrics
925            .cf_metrics
926            .rocksdb_num_immutable_mem_tables
927            .with_label_values(&[cf_name])
928            .set(
929                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
930                    .unwrap_or(METRICS_ERROR),
931            );
932        db_metrics
933            .cf_metrics
934            .rocksdb_mem_table_flush_pending
935            .with_label_values(&[cf_name])
936            .set(
937                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
938                    .unwrap_or(METRICS_ERROR),
939            );
940        db_metrics
941            .cf_metrics
942            .rocksdb_compaction_pending
943            .with_label_values(&[cf_name])
944            .set(
945                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
946                    .unwrap_or(METRICS_ERROR),
947            );
948        db_metrics
949            .cf_metrics
950            .rocksdb_estimate_pending_compaction_bytes
951            .with_label_values(&[cf_name])
952            .set(
953                Self::get_rocksdb_int_property(
954                    rocksdb,
955                    &cf,
956                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
957                )
958                .unwrap_or(METRICS_ERROR),
959            );
960        db_metrics
961            .cf_metrics
962            .rocksdb_num_running_compactions
963            .with_label_values(&[cf_name])
964            .set(
965                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
966                    .unwrap_or(METRICS_ERROR),
967            );
968        db_metrics
969            .cf_metrics
970            .rocksdb_num_running_flushes
971            .with_label_values(&[cf_name])
972            .set(
973                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
974                    .unwrap_or(METRICS_ERROR),
975            );
976        db_metrics
977            .cf_metrics
978            .rocksdb_estimate_oldest_key_time
979            .with_label_values(&[cf_name])
980            .set(
981                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
982                    .unwrap_or(METRICS_ERROR),
983            );
984        db_metrics
985            .cf_metrics
986            .rocksdb_background_errors
987            .with_label_values(&[cf_name])
988            .set(
989                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
990                    .unwrap_or(METRICS_ERROR),
991            );
992        db_metrics
993            .cf_metrics
994            .rocksdb_base_level
995            .with_label_values(&[cf_name])
996            .set(
997                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
998                    .unwrap_or(METRICS_ERROR),
999            );
1000    }
1001
1002    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1003        self.db.checkpoint(path)
1004    }
1005
1006    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1007    where
1008        K: Serialize + DeserializeOwned,
1009        V: Serialize + DeserializeOwned,
1010    {
1011        let mut num_keys = 0;
1012        let mut key_bytes_total = 0;
1013        let mut value_bytes_total = 0;
1014        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1015        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1016        for item in self.safe_iter() {
1017            let (key, value) = item?;
1018            num_keys += 1;
1019            let key_len = be_fix_int_ser(key.borrow()).len();
1020            let value_len = bcs::to_bytes(value.borrow())?.len();
1021            key_bytes_total += key_len;
1022            value_bytes_total += value_len;
1023            key_hist.record(key_len as u64)?;
1024            value_hist.record(value_len as u64)?;
1025        }
1026        Ok(TableSummary {
1027            num_keys,
1028            key_bytes_total,
1029            value_bytes_total,
1030            key_hist,
1031            value_hist,
1032        })
1033    }
1034
1035    fn start_iter_timer(&self) -> HistogramTimer {
1036        self.db_metrics
1037            .op_metrics
1038            .rocksdb_iter_latency_seconds
1039            .with_label_values(&[&self.cf])
1040            .start_timer()
1041    }
1042
1043    // Creates metrics and context for tracking an iterator usage and performance.
1044    fn create_iter_context(
1045        &self,
1046    ) -> (
1047        Option<HistogramTimer>,
1048        Option<Histogram>,
1049        Option<Histogram>,
1050        Option<RocksDBPerfContext>,
1051    ) {
1052        let timer = self.start_iter_timer();
1053        let bytes_scanned = self
1054            .db_metrics
1055            .op_metrics
1056            .rocksdb_iter_bytes
1057            .with_label_values(&[&self.cf]);
1058        let keys_scanned = self
1059            .db_metrics
1060            .op_metrics
1061            .rocksdb_iter_keys
1062            .with_label_values(&[&self.cf]);
1063        let perf_ctx = if self.iter_sample_interval.sample() {
1064            Some(RocksDBPerfContext)
1065        } else {
1066            None
1067        };
1068        (
1069            Some(timer),
1070            Some(bytes_scanned),
1071            Some(keys_scanned),
1072            perf_ctx,
1073        )
1074    }
1075
1076    /// Creates a safe reversed iterator with optional bounds.
1077    /// Both upper bound and lower bound are included.
1078    #[allow(clippy::complexity)]
1079    pub fn reversed_safe_iter_with_bounds(
1080        &self,
1081        lower_bound: Option<K>,
1082        upper_bound: Option<K>,
1083    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1084    where
1085        K: Serialize + DeserializeOwned,
1086        V: Serialize + DeserializeOwned,
1087    {
1088        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1089            lower_bound
1090                .as_ref()
1091                .map(Bound::Included)
1092                .unwrap_or(Bound::Unbounded),
1093            upper_bound
1094                .as_ref()
1095                .map(Bound::Included)
1096                .unwrap_or(Bound::Unbounded),
1097        ));
1098        match &self.db.storage {
1099            Storage::Rocks(db) => {
1100                let readopts = rocks_util::apply_range_bounds(
1101                    self.opts.readopts(),
1102                    it_lower_bound,
1103                    it_upper_bound,
1104                );
1105                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1106                let db_iter = db
1107                    .underlying
1108                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1109                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1110                let iter = SafeIter::new(
1111                    self.cf.clone(),
1112                    db_iter,
1113                    _timer,
1114                    _perf_ctx,
1115                    bytes_scanned,
1116                    keys_scanned,
1117                    Some(self.db_metrics.clone()),
1118                );
1119                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1120            }
1121            Storage::InMemory(db) => {
1122                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1123            }
1124            #[cfg(tidehunter)]
1125            Storage::TideHunter(db) => match &self.column_family {
1126                ColumnFamily::TideHunter((ks, prefix)) => {
1127                    let mut iter = db.iterator(*ks);
1128                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1129                    iter.reverse();
1130                    Ok(Box::new(transform_th_iterator(
1131                        iter,
1132                        prefix,
1133                        self.start_iter_timer(),
1134                    )))
1135                }
1136                _ => unreachable!("storage backend invariant violation"),
1137            },
1138        }
1139    }
1140}
1141
1142pub enum StorageWriteBatch {
1143    Rocks(rocksdb::WriteBatch),
1144    InMemory(InMemoryBatch),
1145    #[cfg(tidehunter)]
1146    TideHunter(tidehunter::batch::WriteBatch),
1147}
1148
1149/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1150/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1151/// so that slices can be produced without any per-entry allocation.
1152struct EntryHeader {
1153    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1154    offset: usize,
1155    cf_name_len: usize,
1156    key_len: usize,
1157    is_put: bool,
1158}
1159
1160/// A write batch that stores serialized operations in a flat byte buffer without
1161/// requiring a database reference. Can be replayed into a real `DBBatch` via
1162/// `DBBatch::concat`.
1163/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1164/// over write batches.
1165#[derive(Default)]
1166pub struct StagedBatch {
1167    data: Vec<u8>,
1168    entries: Vec<EntryHeader>,
1169}
1170
1171impl StagedBatch {
1172    pub fn new() -> Self {
1173        Self {
1174            data: Vec::with_capacity(1024),
1175            entries: Vec::with_capacity(16),
1176        }
1177    }
1178
1179    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1180        &mut self,
1181        db: &DBMap<K, V>,
1182        new_vals: impl IntoIterator<Item = (J, U)>,
1183    ) -> Result<&mut Self, TypedStoreError> {
1184        let cf_name = db.cf_name();
1185        new_vals
1186            .into_iter()
1187            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1188                let offset = self.data.len();
1189                self.data.extend_from_slice(cf_name.as_bytes());
1190                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1191                bcs::serialize_into(&mut self.data, v.borrow())
1192                    .map_err(typed_store_err_from_bcs_err)?;
1193                self.entries.push(EntryHeader {
1194                    offset,
1195                    cf_name_len: cf_name.len(),
1196                    key_len,
1197                    is_put: true,
1198                });
1199                Ok(())
1200            })?;
1201        Ok(self)
1202    }
1203
1204    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1205        &mut self,
1206        db: &DBMap<K, V>,
1207        purged_vals: impl IntoIterator<Item = J>,
1208    ) -> Result<(), TypedStoreError> {
1209        let cf_name = db.cf_name();
1210        purged_vals
1211            .into_iter()
1212            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1213                let offset = self.data.len();
1214                self.data.extend_from_slice(cf_name.as_bytes());
1215                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1216                self.entries.push(EntryHeader {
1217                    offset,
1218                    cf_name_len: cf_name.len(),
1219                    key_len,
1220                    is_put: false,
1221                });
1222                Ok(())
1223            })?;
1224        Ok(())
1225    }
1226
1227    pub fn size_in_bytes(&self) -> usize {
1228        self.data.len()
1229    }
1230}
1231
1232/// Provides a mutable struct to form a collection of database write operations, and execute them.
1233///
1234/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1235///  ie. they are all written or none is.
1236/// This is also true of operations across column families in the same database.
1237///
1238/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1239/// with each operation.
1240///
1241/// ```
1242/// use typed_store::rocks::*;
1243/// use tempfile::tempdir;
1244/// use typed_store::Map;
1245/// use typed_store::metrics::DBMetrics;
1246/// use prometheus::Registry;
1247/// use core::fmt::Error;
1248/// use std::sync::Arc;
1249///
1250/// #[tokio::main]
1251/// async fn main() -> Result<(), Error> {
1252/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1253///
1254/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1255///     .expect("Failed to open storage");
1256/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1257///
1258/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1259///     .expect("Failed to open storage");
1260/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1261///
1262/// let mut batch = db_cf_1.batch();
1263/// batch
1264///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1265///     .expect("Failed to batch insert")
1266///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1267///     .expect("Failed to batch insert");
1268///
1269/// let _ = batch.write().expect("Failed to execute batch");
1270/// for (k, v) in keys_vals_1 {
1271///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1272///     assert_eq!(Some(v), val);
1273/// }
1274///
1275/// for (k, v) in keys_vals_2 {
1276///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1277///     assert_eq!(Some(v), val);
1278/// }
1279/// Ok(())
1280/// }
1281/// ```
1282///
1283pub struct DBBatch {
1284    database: Arc<Database>,
1285    batch: StorageWriteBatch,
1286    db_metrics: Arc<DBMetrics>,
1287    write_sample_interval: SamplingInterval,
1288}
1289
1290impl DBBatch {
1291    /// Create a new batch associated with a DB reference.
1292    ///
1293    /// Use `open_cf` to get the DB reference or an existing open database.
1294    pub fn new(
1295        dbref: &Arc<Database>,
1296        batch: StorageWriteBatch,
1297        db_metrics: &Arc<DBMetrics>,
1298        write_sample_interval: &SamplingInterval,
1299    ) -> Self {
1300        DBBatch {
1301            database: dbref.clone(),
1302            batch,
1303            db_metrics: db_metrics.clone(),
1304            write_sample_interval: write_sample_interval.clone(),
1305        }
1306    }
1307
1308    /// Consume the batch and write its operations to the database
1309    #[instrument(level = "trace", skip_all, err)]
1310    pub fn write(self) -> Result<(), TypedStoreError> {
1311        let mut write_options = rocksdb::WriteOptions::default();
1312
1313        if write_sync_enabled() {
1314            write_options.set_sync(true);
1315        }
1316
1317        self.write_opt(write_options)
1318    }
1319
1320    /// Consume the batch and write its operations to the database with custom write options
1321    #[instrument(level = "trace", skip_all, err)]
1322    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1323        let db_name = self.database.db_name();
1324        let timer = self
1325            .db_metrics
1326            .op_metrics
1327            .rocksdb_batch_commit_latency_seconds
1328            .with_label_values(&[&db_name])
1329            .start_timer();
1330        let batch_size = self.size_in_bytes();
1331
1332        let perf_ctx = if self.write_sample_interval.sample() {
1333            Some(RocksDBPerfContext)
1334        } else {
1335            None
1336        };
1337
1338        self.database
1339            .write_opt_internal(self.batch, &write_options)?;
1340
1341        self.db_metrics
1342            .op_metrics
1343            .rocksdb_batch_commit_bytes
1344            .with_label_values(&[&db_name])
1345            .observe(batch_size as f64);
1346
1347        if perf_ctx.is_some() {
1348            self.db_metrics
1349                .write_perf_ctx_metrics
1350                .report_metrics(&db_name);
1351        }
1352        let elapsed = timer.stop_and_record();
1353        if elapsed > 1.0 {
1354            warn!(?elapsed, ?db_name, "very slow batch write");
1355            self.db_metrics
1356                .op_metrics
1357                .rocksdb_very_slow_batch_writes_count
1358                .with_label_values(&[&db_name])
1359                .inc();
1360            self.db_metrics
1361                .op_metrics
1362                .rocksdb_very_slow_batch_writes_duration_ms
1363                .with_label_values(&[&db_name])
1364                .inc_by((elapsed * 1000.0) as u64);
1365        }
1366        Ok(())
1367    }
1368
1369    pub fn size_in_bytes(&self) -> usize {
1370        match self.batch {
1371            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1372            StorageWriteBatch::InMemory(_) => 0,
1373            // TODO: implement size_in_bytes method
1374            #[cfg(tidehunter)]
1375            StorageWriteBatch::TideHunter(_) => 0,
1376        }
1377    }
1378
1379    /// Replay all operations from `StagedBatch`es into this batch.
1380    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1381        for raw_batch in raw_batches {
1382            let data = &raw_batch.data;
1383            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1384                let end = raw_batch
1385                    .entries
1386                    .get(i + 1)
1387                    .map_or(data.len(), |next| next.offset);
1388                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1389                let key_start = hdr.offset + hdr.cf_name_len;
1390                let key = &data[key_start..key_start + hdr.key_len];
1391                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1392                let cf_name = std::str::from_utf8(cf_bytes)
1393                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1394
1395                if hdr.is_put {
1396                    let value = &data[key_start + hdr.key_len..end];
1397                    match &mut self.batch {
1398                        StorageWriteBatch::Rocks(b) => {
1399                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1400                        }
1401                        StorageWriteBatch::InMemory(b) => {
1402                            b.put_cf(cf_name, key, value);
1403                        }
1404                        #[cfg(tidehunter)]
1405                        _ => {
1406                            return Err(TypedStoreError::RocksDBError(
1407                                "concat not supported for TideHunter".to_string(),
1408                            ));
1409                        }
1410                    }
1411                } else {
1412                    match &mut self.batch {
1413                        StorageWriteBatch::Rocks(b) => {
1414                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1415                        }
1416                        StorageWriteBatch::InMemory(b) => {
1417                            b.delete_cf(cf_name, key);
1418                        }
1419                        #[cfg(tidehunter)]
1420                        _ => {
1421                            return Err(TypedStoreError::RocksDBError(
1422                                "concat not supported for TideHunter".to_string(),
1423                            ));
1424                        }
1425                    }
1426                }
1427            }
1428        }
1429        Ok(self)
1430    }
1431
1432    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1433        &mut self,
1434        db: &DBMap<K, V>,
1435        purged_vals: impl IntoIterator<Item = J>,
1436    ) -> Result<(), TypedStoreError> {
1437        if !Arc::ptr_eq(&db.db, &self.database) {
1438            return Err(TypedStoreError::CrossDBBatch);
1439        }
1440
1441        purged_vals
1442            .into_iter()
1443            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1444                let k_buf = be_fix_int_ser(k.borrow());
1445                match (&mut self.batch, &db.column_family) {
1446                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1447                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1448                    }
1449                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1450                        b.delete_cf(name, k_buf)
1451                    }
1452                    #[cfg(tidehunter)]
1453                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1454                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1455                    }
1456                    _ => Err(TypedStoreError::RocksDBError(
1457                        "typed store invariant violation".to_string(),
1458                    ))?,
1459                }
1460                Ok(())
1461            })?;
1462        Ok(())
1463    }
1464
1465    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1466    /// by writing a range delete tombstone in the db map
1467    /// If the DBMap is configured with ignore_range_deletions set to false,
1468    /// the effect of this write will be visible immediately i.e. you won't
1469    /// see old values when you do a lookup or scan. But if it is configured
1470    /// with ignore_range_deletions set to true, the old value are visible until
1471    /// compaction actually deletes them which will happen sometime after. By
1472    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1473    /// overridden in the config), so please use this function with caution
1474    pub fn schedule_delete_range<K: Serialize, V>(
1475        &mut self,
1476        db: &DBMap<K, V>,
1477        from: &K,
1478        to: &K,
1479    ) -> Result<(), TypedStoreError> {
1480        if !Arc::ptr_eq(&db.db, &self.database) {
1481            return Err(TypedStoreError::CrossDBBatch);
1482        }
1483
1484        let from_buf = be_fix_int_ser(from);
1485        let to_buf = be_fix_int_ser(to);
1486
1487        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1488            b.delete_range_cf(
1489                &rocks_cf_from_db(&self.database, db.cf_name())?,
1490                from_buf,
1491                to_buf,
1492            );
1493        }
1494        Ok(())
1495    }
1496
1497    /// inserts a range of (key, value) pairs given as an iterator
1498    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1499        &mut self,
1500        db: &DBMap<K, V>,
1501        new_vals: impl IntoIterator<Item = (J, U)>,
1502    ) -> Result<&mut Self, TypedStoreError> {
1503        if !Arc::ptr_eq(&db.db, &self.database) {
1504            return Err(TypedStoreError::CrossDBBatch);
1505        }
1506        let mut total = 0usize;
1507        new_vals
1508            .into_iter()
1509            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1510                let k_buf = be_fix_int_ser(k.borrow());
1511                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1512                total += k_buf.len() + v_buf.len();
1513                if db.opts.log_value_hash {
1514                    let key_hash = default_hash(&k_buf);
1515                    let value_hash = default_hash(&v_buf);
1516                    debug!(
1517                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1518                        db.cf_name(),
1519                        key_hash,
1520                        value_hash
1521                    );
1522                }
1523                match (&mut self.batch, &db.column_family) {
1524                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1525                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1526                    }
1527                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1528                        b.put_cf(name, k_buf, v_buf)
1529                    }
1530                    #[cfg(tidehunter)]
1531                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1532                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1533                    }
1534                    _ => Err(TypedStoreError::RocksDBError(
1535                        "typed store invariant violation".to_string(),
1536                    ))?,
1537                }
1538                Ok(())
1539            })?;
1540        self.db_metrics
1541            .op_metrics
1542            .rocksdb_batch_put_bytes
1543            .with_label_values(&[&db.cf])
1544            .observe(total as f64);
1545        Ok(self)
1546    }
1547
1548    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1549        &mut self,
1550        db: &DBMap<K, V>,
1551        new_vals: impl IntoIterator<Item = (J, U)>,
1552    ) -> Result<&mut Self, TypedStoreError> {
1553        if !Arc::ptr_eq(&db.db, &self.database) {
1554            return Err(TypedStoreError::CrossDBBatch);
1555        }
1556        new_vals
1557            .into_iter()
1558            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1559                let k_buf = be_fix_int_ser(k.borrow());
1560                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1561                match &mut self.batch {
1562                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1563                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1564                        k_buf,
1565                        v_buf,
1566                    ),
1567                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1568                }
1569                Ok(())
1570            })?;
1571        Ok(self)
1572    }
1573}
1574
1575impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1576where
1577    K: Serialize + DeserializeOwned,
1578    V: Serialize + DeserializeOwned,
1579{
1580    type Error = TypedStoreError;
1581
1582    #[instrument(level = "trace", skip_all, err)]
1583    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1584        let key_buf = be_fix_int_ser(key);
1585        let readopts = self.opts.readopts();
1586        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1587            && self
1588                .db
1589                .get(&self.column_family, &key_buf, &readopts)?
1590                .is_some())
1591    }
1592
1593    #[instrument(level = "trace", skip_all, err)]
1594    fn multi_contains_keys<J>(
1595        &self,
1596        keys: impl IntoIterator<Item = J>,
1597    ) -> Result<Vec<bool>, Self::Error>
1598    where
1599        J: Borrow<K>,
1600    {
1601        let values = self.multi_get_pinned(keys)?;
1602        Ok(values.into_iter().map(|v| v.is_some()).collect())
1603    }
1604
1605    #[instrument(level = "trace", skip_all, err)]
1606    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1607        let _timer = self
1608            .db_metrics
1609            .op_metrics
1610            .rocksdb_get_latency_seconds
1611            .with_label_values(&[&self.cf])
1612            .start_timer();
1613        let perf_ctx = if self.get_sample_interval.sample() {
1614            Some(RocksDBPerfContext)
1615        } else {
1616            None
1617        };
1618        let key_buf = be_fix_int_ser(key);
1619        let res = self
1620            .db
1621            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1622        self.db_metrics
1623            .op_metrics
1624            .rocksdb_get_bytes
1625            .with_label_values(&[&self.cf])
1626            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1627        if perf_ctx.is_some() {
1628            self.db_metrics
1629                .read_perf_ctx_metrics
1630                .report_metrics(&self.cf);
1631        }
1632        match res {
1633            Some(data) => {
1634                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1635                if value.is_err() {
1636                    let key_hash = default_hash(&key_buf);
1637                    let value_hash = default_hash(&data);
1638                    debug_fatal!(
1639                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1640                        self.cf_name(),
1641                        key_hash,
1642                        value_hash,
1643                        value.as_ref().err().unwrap()
1644                    );
1645                }
1646                Ok(Some(value?))
1647            }
1648            None => Ok(None),
1649        }
1650    }
1651
1652    #[instrument(level = "trace", skip_all, err)]
1653    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1654        let timer = self
1655            .db_metrics
1656            .op_metrics
1657            .rocksdb_put_latency_seconds
1658            .with_label_values(&[&self.cf])
1659            .start_timer();
1660        let perf_ctx = if self.write_sample_interval.sample() {
1661            Some(RocksDBPerfContext)
1662        } else {
1663            None
1664        };
1665        let key_buf = be_fix_int_ser(key);
1666        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1667        self.db_metrics
1668            .op_metrics
1669            .rocksdb_put_bytes
1670            .with_label_values(&[&self.cf])
1671            .observe((key_buf.len() + value_buf.len()) as f64);
1672        if perf_ctx.is_some() {
1673            self.db_metrics
1674                .write_perf_ctx_metrics
1675                .report_metrics(&self.cf);
1676        }
1677        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1678
1679        let elapsed = timer.stop_and_record();
1680        if elapsed > 1.0 {
1681            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1682            self.db_metrics
1683                .op_metrics
1684                .rocksdb_very_slow_puts_count
1685                .with_label_values(&[&self.cf])
1686                .inc();
1687            self.db_metrics
1688                .op_metrics
1689                .rocksdb_very_slow_puts_duration_ms
1690                .with_label_values(&[&self.cf])
1691                .inc_by((elapsed * 1000.0) as u64);
1692        }
1693
1694        Ok(())
1695    }
1696
1697    #[instrument(level = "trace", skip_all, err)]
1698    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1699        let _timer = self
1700            .db_metrics
1701            .op_metrics
1702            .rocksdb_delete_latency_seconds
1703            .with_label_values(&[&self.cf])
1704            .start_timer();
1705        let perf_ctx = if self.write_sample_interval.sample() {
1706            Some(RocksDBPerfContext)
1707        } else {
1708            None
1709        };
1710        let key_buf = be_fix_int_ser(key);
1711        self.db.delete_cf(&self.column_family, key_buf)?;
1712        self.db_metrics
1713            .op_metrics
1714            .rocksdb_deletes
1715            .with_label_values(&[&self.cf])
1716            .inc();
1717        if perf_ctx.is_some() {
1718            self.db_metrics
1719                .write_perf_ctx_metrics
1720                .report_metrics(&self.cf);
1721        }
1722        Ok(())
1723    }
1724
1725    /// Writes a range delete tombstone to delete all entries in the db map
1726    /// If the DBMap is configured with ignore_range_deletions set to false,
1727    /// the effect of this write will be visible immediately i.e. you won't
1728    /// see old values when you do a lookup or scan. But if it is configured
1729    /// with ignore_range_deletions set to true, the old value are visible until
1730    /// compaction actually deletes them which will happen sometime after. By
1731    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1732    /// overridden in the config), so please use this function with caution
1733    #[instrument(level = "trace", skip_all, err)]
1734    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1735        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1736        let last_key = self
1737            .reversed_safe_iter_with_bounds(None, None)?
1738            .next()
1739            .transpose()?
1740            .map(|(k, _v)| k);
1741        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1742            let mut batch = self.batch();
1743            batch.schedule_delete_range(self, &first_key, &last_key)?;
1744            batch.write()?;
1745        }
1746        Ok(())
1747    }
1748
1749    fn is_empty(&self) -> bool {
1750        self.safe_iter().next().is_none()
1751    }
1752
1753    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1754        match &self.db.storage {
1755            Storage::Rocks(db) => {
1756                let db_iter = db
1757                    .underlying
1758                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1759                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1760                Box::new(SafeIter::new(
1761                    self.cf.clone(),
1762                    db_iter,
1763                    _timer,
1764                    _perf_ctx,
1765                    bytes_scanned,
1766                    keys_scanned,
1767                    Some(self.db_metrics.clone()),
1768                ))
1769            }
1770            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1771            #[cfg(tidehunter)]
1772            Storage::TideHunter(db) => match &self.column_family {
1773                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1774                    db.iterator(*ks),
1775                    prefix,
1776                    self.start_iter_timer(),
1777                )),
1778                _ => unreachable!("storage backend invariant violation"),
1779            },
1780        }
1781    }
1782
1783    fn safe_iter_with_bounds(
1784        &'a self,
1785        lower_bound: Option<K>,
1786        upper_bound: Option<K>,
1787    ) -> DbIterator<'a, (K, V)> {
1788        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1789        match &self.db.storage {
1790            Storage::Rocks(db) => {
1791                let readopts =
1792                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1793                let db_iter = db
1794                    .underlying
1795                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1796                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1797                Box::new(SafeIter::new(
1798                    self.cf.clone(),
1799                    db_iter,
1800                    _timer,
1801                    _perf_ctx,
1802                    bytes_scanned,
1803                    keys_scanned,
1804                    Some(self.db_metrics.clone()),
1805                ))
1806            }
1807            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1808            #[cfg(tidehunter)]
1809            Storage::TideHunter(db) => match &self.column_family {
1810                ColumnFamily::TideHunter((ks, prefix)) => {
1811                    let mut iter = db.iterator(*ks);
1812                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1813                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1814                }
1815                _ => unreachable!("storage backend invariant violation"),
1816            },
1817        }
1818    }
1819
1820    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1821        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1822        match &self.db.storage {
1823            Storage::Rocks(db) => {
1824                let readopts =
1825                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1826                let db_iter = db
1827                    .underlying
1828                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1829                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1830                Box::new(SafeIter::new(
1831                    self.cf.clone(),
1832                    db_iter,
1833                    _timer,
1834                    _perf_ctx,
1835                    bytes_scanned,
1836                    keys_scanned,
1837                    Some(self.db_metrics.clone()),
1838                ))
1839            }
1840            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1841            #[cfg(tidehunter)]
1842            Storage::TideHunter(db) => match &self.column_family {
1843                ColumnFamily::TideHunter((ks, prefix)) => {
1844                    let mut iter = db.iterator(*ks);
1845                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1846                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1847                }
1848                _ => unreachable!("storage backend invariant violation"),
1849            },
1850        }
1851    }
1852
1853    /// Returns a vector of values corresponding to the keys provided.
1854    #[instrument(level = "trace", skip_all, err)]
1855    fn multi_get<J>(
1856        &self,
1857        keys: impl IntoIterator<Item = J>,
1858    ) -> Result<Vec<Option<V>>, TypedStoreError>
1859    where
1860        J: Borrow<K>,
1861    {
1862        let results = self.multi_get_pinned(keys)?;
1863        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1864            .into_iter()
1865            .map(|value_byte| match value_byte {
1866                Some(data) => Ok(Some(
1867                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1868                )),
1869                None => Ok(None),
1870            })
1871            .collect();
1872
1873        values_parsed
1874    }
1875
1876    /// Convenience method for batch insertion
1877    #[instrument(level = "trace", skip_all, err)]
1878    fn multi_insert<J, U>(
1879        &self,
1880        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1881    ) -> Result<(), Self::Error>
1882    where
1883        J: Borrow<K>,
1884        U: Borrow<V>,
1885    {
1886        let mut batch = self.batch();
1887        batch.insert_batch(self, key_val_pairs)?;
1888        batch.write()
1889    }
1890
1891    /// Convenience method for batch removal
1892    #[instrument(level = "trace", skip_all, err)]
1893    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1894    where
1895        J: Borrow<K>,
1896    {
1897        let mut batch = self.batch();
1898        batch.delete_batch(self, keys)?;
1899        batch.write()
1900    }
1901
1902    /// Try to catch up with primary when running as secondary
1903    #[instrument(level = "trace", skip_all, err)]
1904    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1905        if let Storage::Rocks(rocks) = &self.db.storage {
1906            rocks
1907                .underlying
1908                .try_catch_up_with_primary()
1909                .map_err(typed_store_err_from_rocks_err)?;
1910        }
1911        Ok(())
1912    }
1913}
1914
1915/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1916#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1917pub fn open_cf_opts<P: AsRef<Path>>(
1918    path: P,
1919    db_options: Option<rocksdb::Options>,
1920    metric_conf: MetricConf,
1921    opt_cfs: &[(&str, rocksdb::Options)],
1922) -> Result<Arc<Database>, TypedStoreError> {
1923    let path = path.as_ref();
1924    ensure_database_type(path, StorageType::Rocks)
1925        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1926    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1927    // because rocksdb uses the simulated clock when creating its background threads, but then
1928    // those threads see the real wall clock (because they are not the test thread), which causes
1929    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1930    // resolves the issue.
1931    //
1932    // This is a no-op in non-simulator builds.
1933
1934    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1935    nondeterministic!({
1936        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1937        options.create_if_missing(true);
1938        options.create_missing_column_families(true);
1939        let rocksdb = {
1940            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1941                &options,
1942                path,
1943                cfs.into_iter()
1944                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1945            )
1946            .map_err(typed_store_err_from_rocks_err)?
1947        };
1948        Ok(Arc::new(Database::new(
1949            Storage::Rocks(RocksDB {
1950                underlying: rocksdb,
1951            }),
1952            metric_conf,
1953            None,
1954        )))
1955    })
1956}
1957
1958/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1959pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1960    primary_path: P,
1961    secondary_path: Option<P>,
1962    db_options: Option<rocksdb::Options>,
1963    metric_conf: MetricConf,
1964    opt_cfs: &[(&str, rocksdb::Options)],
1965) -> Result<Arc<Database>, TypedStoreError> {
1966    let primary_path = primary_path.as_ref();
1967    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1968    // See comment above for explanation of why nondeterministic is necessary here.
1969    nondeterministic!({
1970        // Customize database options
1971        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1972
1973        fdlimit::raise_fd_limit();
1974        // This is a requirement by RocksDB when opening as secondary
1975        options.set_max_open_files(-1);
1976
1977        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1978        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1979            .ok()
1980            .unwrap_or_default();
1981
1982        let default_db_options = default_db_options();
1983        // Add CFs not explicitly listed
1984        for cf_key in cfs.iter() {
1985            if !opt_cfs.contains_key(&cf_key[..]) {
1986                opt_cfs.insert(cf_key, default_db_options.options.clone());
1987            }
1988        }
1989
1990        let primary_path = primary_path.to_path_buf();
1991        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1992            let mut s = primary_path.clone();
1993            s.pop();
1994            s.push("SECONDARY");
1995            s.as_path().to_path_buf()
1996        });
1997
1998        ensure_database_type(&primary_path, StorageType::Rocks)
1999            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2000        ensure_database_type(&secondary_path, StorageType::Rocks)
2001            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2002
2003        let rocksdb = {
2004            options.create_if_missing(true);
2005            options.create_missing_column_families(true);
2006            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2007                &options,
2008                &primary_path,
2009                &secondary_path,
2010                opt_cfs
2011                    .iter()
2012                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2013            )
2014            .map_err(typed_store_err_from_rocks_err)?;
2015            db.try_catch_up_with_primary()
2016                .map_err(typed_store_err_from_rocks_err)?;
2017            db
2018        };
2019        Ok(Arc::new(Database::new(
2020            Storage::Rocks(RocksDB {
2021                underlying: rocksdb,
2022            }),
2023            metric_conf,
2024            None,
2025        )))
2026    })
2027}
2028
2029// Drops a database if there is no other handle to it, with retries and timeout.
2030#[cfg(not(tidehunter))]
2031pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2032    let mut backoff = backoff::ExponentialBackoff {
2033        max_elapsed_time: Some(timeout),
2034        ..Default::default()
2035    };
2036    loop {
2037        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2038            Ok(()) => return Ok(()),
2039            Err(err) => match backoff.next_backoff() {
2040                Some(duration) => tokio::time::sleep(duration).await,
2041                None => return Err(err),
2042            },
2043        }
2044    }
2045}
2046
2047#[cfg(tidehunter)]
2048pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
2049    std::fs::remove_dir_all(path)
2050}
2051
2052fn populate_missing_cfs(
2053    input_cfs: &[(&str, rocksdb::Options)],
2054    path: &Path,
2055) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2056    let mut cfs = vec![];
2057    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2058    let existing_cfs =
2059        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2060            .ok()
2061            .unwrap_or_default();
2062
2063    for cf_name in existing_cfs {
2064        if !input_cf_index.contains(&cf_name[..]) {
2065            cfs.push((cf_name, rocksdb::Options::default()));
2066        }
2067    }
2068    cfs.extend(
2069        input_cfs
2070            .iter()
2071            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2072    );
2073    Ok(cfs)
2074}
2075
2076fn default_hash(value: &[u8]) -> Digest<32> {
2077    let mut hasher = fastcrypto::hash::Blake2b256::default();
2078    hasher.update(value);
2079    hasher.finalize()
2080}