typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    fn multi_get<I, K>(
231        &self,
232        cf: &ColumnFamily,
233        keys: I,
234        readopts: &ReadOptions,
235    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236    where
237        I: IntoIterator<Item = K>,
238        K: AsRef<[u8]>,
239    {
240        match (&self.storage, cf) {
241            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242                let keys_vec: Vec<K> = keys.into_iter().collect();
243                let res = db.underlying.batched_multi_get_cf_opt(
244                    &cf.rocks_cf(db),
245                    keys_vec.iter(),
246                    /* sorted_input */ false,
247                    readopts,
248                );
249                res.into_iter()
250                    .map(|r| {
251                        r.map_err(typed_store_err_from_rocks_err)
252                            .map(|item| item.map(GetResult::Rocks))
253                    })
254                    .collect()
255            }
256            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257                .multi_get(cf_name, keys)
258                .into_iter()
259                .map(|r| Ok(r.map(GetResult::InMemory)))
260                .collect(),
261            #[cfg(tidehunter)]
262            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263                let res = keys.into_iter().map(|k| {
264                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265                        .map_err(typed_store_error_from_th_error)
266                });
267                res.into_iter()
268                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269                    .collect()
270            }
271            _ => unreachable!("typed store invariant violation"),
272        }
273    }
274
275    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276        match &self.storage {
277            Storage::Rocks(db) => db.underlying.drop_cf(name),
278            Storage::InMemory(db) => {
279                db.drop_cf(name);
280                Ok(())
281            }
282            #[cfg(tidehunter)]
283            Storage::TideHunter(_) => {
284                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285            }
286        }
287    }
288
289    pub fn delete_file_in_range<K: AsRef<[u8]>>(
290        &self,
291        cf: &impl AsColumnFamilyRef,
292        from: K,
293        to: K,
294    ) -> Result<(), rocksdb::Error> {
295        match &self.storage {
296            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298        }
299    }
300
301    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302        fail_point!("delete-cf-before");
303        let ret = match (&self.storage, cf) {
304            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305                .underlying
306                .delete_cf(&cf.rocks_cf(db), key)
307                .map_err(typed_store_err_from_rocks_err),
308            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309                db.delete(cf_name, key.as_ref());
310                Ok(())
311            }
312            #[cfg(tidehunter)]
313            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314                .remove(*ks, transform_th_key(key.as_ref(), prefix))
315                .map_err(typed_store_error_from_th_error),
316            _ => Err(TypedStoreError::RocksDBError(
317                "typed store invariant violation".to_string(),
318            )),
319        };
320        fail_point!("delete-cf-after");
321        #[allow(clippy::let_and_return)]
322        ret
323    }
324
325    pub fn path_for_pruning(&self) -> &Path {
326        match &self.storage {
327            Storage::Rocks(rocks) => rocks.underlying.path(),
328            _ => unimplemented!("method is only supported for rocksdb backend"),
329        }
330    }
331
332    fn put_cf(
333        &self,
334        cf: &ColumnFamily,
335        key: Vec<u8>,
336        value: Vec<u8>,
337    ) -> Result<(), TypedStoreError> {
338        fail_point!("put-cf-before");
339        let ret = match (&self.storage, cf) {
340            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341                .underlying
342                .put_cf(&cf.rocks_cf(db), key, value)
343                .map_err(typed_store_err_from_rocks_err),
344            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345                db.put(cf_name, key, value);
346                Ok(())
347            }
348            #[cfg(tidehunter)]
349            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350                .insert(*ks, transform_th_key(&key, prefix), value)
351                .map_err(typed_store_error_from_th_error),
352            _ => Err(TypedStoreError::RocksDBError(
353                "typed store invariant violation".to_string(),
354            )),
355        };
356        fail_point!("put-cf-after");
357        #[allow(clippy::let_and_return)]
358        ret
359    }
360
361    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362        &self,
363        cf_name: &str,
364        key: K,
365        readopts: &ReadOptions,
366    ) -> bool {
367        match &self.storage {
368            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
369            // but no false negatives. We use it to short-circuit the absent case
370            Storage::Rocks(rocks) => {
371                rocks
372                    .underlying
373                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374            }
375            _ => true,
376        }
377    }
378
379    pub(crate) fn write_opt_internal(
380        &self,
381        batch: StorageWriteBatch,
382        write_options: &rocksdb::WriteOptions,
383    ) -> Result<(), TypedStoreError> {
384        fail_point!("batch-write-before");
385        let ret = match (&self.storage, batch) {
386            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387                .underlying
388                .write_opt(batch, write_options)
389                .map_err(typed_store_err_from_rocks_err),
390            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391                // InMemory doesn't support write options
392                db.write(batch);
393                Ok(())
394            }
395            #[cfg(tidehunter)]
396            (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397                // TideHunter doesn't support write options
398                batch.commit().map_err(typed_store_error_from_th_error)
399            }
400            _ => Err(TypedStoreError::RocksDBError(
401                "using invalid batch type for the database".to_string(),
402            )),
403        };
404        fail_point!("batch-write-after");
405        #[allow(clippy::let_and_return)]
406        ret
407    }
408
409    #[cfg(tidehunter)]
410    pub fn start_relocation(&self) -> anyhow::Result<()> {
411        if let Storage::TideHunter(db) = &self.storage {
412            db.start_relocation()?;
413        }
414        Ok(())
415    }
416
417    #[cfg(tidehunter)]
418    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
419        if let Storage::TideHunter(db) = &self.storage {
420            db.force_rebuild_control_region()
421                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
422        }
423        Ok(())
424    }
425
426    #[cfg(tidehunter)]
427    pub fn drop_cells_in_range(
428        &self,
429        ks: KeySpace,
430        from_inclusive: &[u8],
431        to_inclusive: &[u8],
432    ) -> anyhow::Result<()> {
433        if let Storage::TideHunter(db) = &self.storage {
434            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
435                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
436        } else {
437            panic!("drop_cells_in_range called on non-TideHunter storage");
438        }
439        Ok(())
440    }
441
442    pub fn compact_range_cf<K: AsRef<[u8]>>(
443        &self,
444        cf_name: &str,
445        start: Option<K>,
446        end: Option<K>,
447    ) {
448        if let Storage::Rocks(rocksdb) = &self.storage {
449            rocksdb
450                .underlying
451                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
452        }
453    }
454
455    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
456        // TODO: implement for other storage types
457        if let Storage::Rocks(rocks) = &self.storage {
458            let checkpoint =
459                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
460            checkpoint
461                .create_checkpoint(path)
462                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
463        }
464        Ok(())
465    }
466
467    pub fn get_sampling_interval(&self) -> SamplingInterval {
468        self.metric_conf.read_sample_interval.new_from_self()
469    }
470
471    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
472        self.metric_conf.read_sample_interval.new_from_self()
473    }
474
475    pub fn write_sampling_interval(&self) -> SamplingInterval {
476        self.metric_conf.write_sample_interval.new_from_self()
477    }
478
479    pub fn iter_sampling_interval(&self) -> SamplingInterval {
480        self.metric_conf.iter_sample_interval.new_from_self()
481    }
482
483    fn db_name(&self) -> String {
484        let name = &self.metric_conf.db_name;
485        if name.is_empty() {
486            "default".to_string()
487        } else {
488            name.clone()
489        }
490    }
491
492    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
493        match &self.storage {
494            Storage::Rocks(rocks) => rocks.underlying.live_files(),
495            _ => Ok(vec![]),
496        }
497    }
498}
499
500fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
501    rocks_db
502        .underlying
503        .cf_handle(cf_name)
504        .expect("Map-keying column family should have been checked at DB creation")
505}
506
507fn rocks_cf_from_db<'a>(
508    db: &'a Database,
509    cf_name: &str,
510) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
511    match &db.storage {
512        Storage::Rocks(rocksdb) => Ok(rocksdb
513            .underlying
514            .cf_handle(cf_name)
515            .expect("Map-keying column family should have been checked at DB creation")),
516        _ => Err(TypedStoreError::RocksDBError(
517            "using invalid batch type for the database".to_string(),
518        )),
519    }
520}
521
522#[derive(Debug, Default)]
523pub struct MetricConf {
524    pub db_name: String,
525    pub read_sample_interval: SamplingInterval,
526    pub write_sample_interval: SamplingInterval,
527    pub iter_sample_interval: SamplingInterval,
528}
529
530impl MetricConf {
531    pub fn new(db_name: &str) -> Self {
532        if db_name.is_empty() {
533            error!("A meaningful db name should be used for metrics reporting.")
534        }
535        Self {
536            db_name: db_name.to_string(),
537            read_sample_interval: SamplingInterval::default(),
538            write_sample_interval: SamplingInterval::default(),
539            iter_sample_interval: SamplingInterval::default(),
540        }
541    }
542
543    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
544        Self {
545            db_name: self.db_name,
546            read_sample_interval: read_interval,
547            write_sample_interval: SamplingInterval::default(),
548            iter_sample_interval: SamplingInterval::default(),
549        }
550    }
551}
552const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
553const METRICS_ERROR: i64 = -1;
554
555/// An interface to a rocksDB database, keyed by a columnfamily
556#[derive(Clone, Debug)]
557pub struct DBMap<K, V> {
558    pub db: Arc<Database>,
559    _phantom: PhantomData<fn(K) -> V>,
560    column_family: ColumnFamily,
561    // the column family under which the map is stored
562    cf: String,
563    pub opts: ReadWriteOptions,
564    db_metrics: Arc<DBMetrics>,
565    get_sample_interval: SamplingInterval,
566    multiget_sample_interval: SamplingInterval,
567    write_sample_interval: SamplingInterval,
568    iter_sample_interval: SamplingInterval,
569    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
570}
571
572unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
573
574impl<K, V> DBMap<K, V> {
575    pub(crate) fn new(
576        db: Arc<Database>,
577        opts: &ReadWriteOptions,
578        opt_cf: &str,
579        column_family: ColumnFamily,
580        is_deprecated: bool,
581    ) -> Self {
582        let db_cloned = Arc::downgrade(&db.clone());
583        let db_metrics = DBMetrics::get();
584        let db_metrics_cloned = db_metrics.clone();
585        let cf = opt_cf.to_string();
586
587        let (sender, mut recv) = tokio::sync::oneshot::channel();
588        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
589            tokio::task::spawn(async move {
590                let mut interval =
591                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
592                loop {
593                    tokio::select! {
594                        _ = interval.tick() => {
595                            if let Some(db) = db_cloned.upgrade() {
596                                let cf = cf.clone();
597                                let db_metrics = db_metrics.clone();
598                                if let Err(e) = tokio::task::spawn_blocking(move || {
599                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
600                                }).await {
601                                    error!("Failed to log metrics with error: {}", e);
602                                }
603                            }
604                        }
605                        _ = &mut recv => break,
606                    }
607                }
608                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
609            });
610        }
611        DBMap {
612            db: db.clone(),
613            opts: opts.clone(),
614            _phantom: PhantomData,
615            column_family,
616            cf: opt_cf.to_string(),
617            db_metrics: db_metrics_cloned,
618            _metrics_task_cancel_handle: Arc::new(sender),
619            get_sample_interval: db.get_sampling_interval(),
620            multiget_sample_interval: db.multiget_sampling_interval(),
621            write_sample_interval: db.write_sampling_interval(),
622            iter_sample_interval: db.iter_sampling_interval(),
623        }
624    }
625
626    /// Reopens an open database as a typed map operating under a specific column family.
627    /// if no column family is passed, the default column family is used.
628    #[instrument(level = "debug", skip(db), err)]
629    pub fn reopen(
630        db: &Arc<Database>,
631        opt_cf: Option<&str>,
632        rw_options: &ReadWriteOptions,
633        is_deprecated: bool,
634    ) -> Result<Self, TypedStoreError> {
635        let cf_key = opt_cf
636            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
637            .to_owned();
638        Ok(DBMap::new(
639            db.clone(),
640            rw_options,
641            &cf_key,
642            ColumnFamily::Rocks(cf_key.to_string()),
643            is_deprecated,
644        ))
645    }
646
647    #[cfg(tidehunter)]
648    pub fn reopen_th(
649        db: Arc<Database>,
650        cf_name: &str,
651        ks: KeySpace,
652        prefix: Option<Vec<u8>>,
653    ) -> Self {
654        DBMap::new(
655            db,
656            &ReadWriteOptions::default(),
657            cf_name,
658            ColumnFamily::TideHunter((ks, prefix.clone())),
659            false,
660        )
661    }
662
663    pub fn cf_name(&self) -> &str {
664        &self.cf
665    }
666
667    pub fn batch(&self) -> DBBatch {
668        let batch = match &self.db.storage {
669            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
670            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
671            #[cfg(tidehunter)]
672            Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
673        };
674        DBBatch::new(
675            &self.db,
676            batch,
677            &self.db_metrics,
678            &self.write_sample_interval,
679        )
680    }
681
682    pub fn flush(&self) -> Result<(), TypedStoreError> {
683        self.db.flush()
684    }
685
686    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
687        let from_buf = be_fix_int_ser(start);
688        let to_buf = be_fix_int_ser(end);
689        self.db
690            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
691        Ok(())
692    }
693
694    pub fn compact_range_raw(
695        &self,
696        cf_name: &str,
697        start: Vec<u8>,
698        end: Vec<u8>,
699    ) -> Result<(), TypedStoreError> {
700        self.db.compact_range_cf(cf_name, Some(start), Some(end));
701        Ok(())
702    }
703
704    #[cfg(tidehunter)]
705    pub fn drop_cells_in_range<J: Serialize>(
706        &self,
707        from_inclusive: &J,
708        to_inclusive: &J,
709    ) -> Result<(), TypedStoreError>
710    where
711        K: Serialize,
712    {
713        let from_buf = be_fix_int_ser(from_inclusive);
714        let to_buf = be_fix_int_ser(to_inclusive);
715        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
716            self.db
717                .drop_cells_in_range(*ks, &from_buf, &to_buf)
718                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
719        }
720        Ok(())
721    }
722
723    #[cfg(tidehunter)]
724    pub fn drop_cells_in_range_raw(
725        &self,
726        from_inclusive: &[u8],
727        to_inclusive: &[u8],
728    ) -> Result<(), TypedStoreError> {
729        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
730            self.db
731                .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
732                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
733        }
734        Ok(())
735    }
736
737    /// Returns a vector of raw values corresponding to the keys provided.
738    fn multi_get_pinned<J>(
739        &self,
740        keys: impl IntoIterator<Item = J>,
741    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
742    where
743        J: Borrow<K>,
744        K: Serialize,
745    {
746        let _timer = self
747            .db_metrics
748            .op_metrics
749            .rocksdb_multiget_latency_seconds
750            .with_label_values(&[&self.cf])
751            .start_timer();
752        let perf_ctx = if self.multiget_sample_interval.sample() {
753            Some(RocksDBPerfContext)
754        } else {
755            None
756        };
757        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
758        let results: Result<Vec<_>, TypedStoreError> = self
759            .db
760            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
761            .into_iter()
762            .collect();
763        let entries = results?;
764        let entry_size = entries
765            .iter()
766            .flatten()
767            .map(|entry| entry.len())
768            .sum::<usize>();
769        self.db_metrics
770            .op_metrics
771            .rocksdb_multiget_bytes
772            .with_label_values(&[&self.cf])
773            .observe(entry_size as f64);
774        if perf_ctx.is_some() {
775            self.db_metrics
776                .read_perf_ctx_metrics
777                .report_metrics(&self.cf);
778        }
779        Ok(entries)
780    }
781
782    fn get_rocksdb_int_property(
783        rocksdb: &RocksDB,
784        cf: &impl AsColumnFamilyRef,
785        property_name: &std::ffi::CStr,
786    ) -> Result<i64, TypedStoreError> {
787        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
788            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
789            Ok(None) => Ok(0),
790            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
791        }
792    }
793
794    fn report_rocksdb_metrics(
795        database: &Arc<Database>,
796        cf_name: &str,
797        db_metrics: &Arc<DBMetrics>,
798    ) {
799        let Storage::Rocks(rocksdb) = &database.storage else {
800            return;
801        };
802
803        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
804            tracing::warn!(
805                "unable to report metrics for cf {cf_name:?} in db {:?}",
806                database.db_name()
807            );
808            return;
809        };
810
811        db_metrics
812            .cf_metrics
813            .rocksdb_total_sst_files_size
814            .with_label_values(&[cf_name])
815            .set(
816                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
817                    .unwrap_or(METRICS_ERROR),
818            );
819        db_metrics
820            .cf_metrics
821            .rocksdb_total_blob_files_size
822            .with_label_values(&[cf_name])
823            .set(
824                Self::get_rocksdb_int_property(
825                    rocksdb,
826                    &cf,
827                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
828                )
829                .unwrap_or(METRICS_ERROR),
830            );
831        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
832        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
833        let total_num_files: i64 = (0..=6)
834            .map(|level| {
835                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
836                    .unwrap_or(METRICS_ERROR)
837            })
838            .sum();
839        db_metrics
840            .cf_metrics
841            .rocksdb_total_num_files
842            .with_label_values(&[cf_name])
843            .set(total_num_files);
844        db_metrics
845            .cf_metrics
846            .rocksdb_num_level0_files
847            .with_label_values(&[cf_name])
848            .set(
849                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
850                    .unwrap_or(METRICS_ERROR),
851            );
852        db_metrics
853            .cf_metrics
854            .rocksdb_current_size_active_mem_tables
855            .with_label_values(&[cf_name])
856            .set(
857                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
858                    .unwrap_or(METRICS_ERROR),
859            );
860        db_metrics
861            .cf_metrics
862            .rocksdb_size_all_mem_tables
863            .with_label_values(&[cf_name])
864            .set(
865                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
866                    .unwrap_or(METRICS_ERROR),
867            );
868        db_metrics
869            .cf_metrics
870            .rocksdb_num_snapshots
871            .with_label_values(&[cf_name])
872            .set(
873                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
874                    .unwrap_or(METRICS_ERROR),
875            );
876        db_metrics
877            .cf_metrics
878            .rocksdb_oldest_snapshot_time
879            .with_label_values(&[cf_name])
880            .set(
881                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
882                    .unwrap_or(METRICS_ERROR),
883            );
884        db_metrics
885            .cf_metrics
886            .rocksdb_actual_delayed_write_rate
887            .with_label_values(&[cf_name])
888            .set(
889                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
890                    .unwrap_or(METRICS_ERROR),
891            );
892        db_metrics
893            .cf_metrics
894            .rocksdb_is_write_stopped
895            .with_label_values(&[cf_name])
896            .set(
897                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
898                    .unwrap_or(METRICS_ERROR),
899            );
900        db_metrics
901            .cf_metrics
902            .rocksdb_block_cache_capacity
903            .with_label_values(&[cf_name])
904            .set(
905                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
906                    .unwrap_or(METRICS_ERROR),
907            );
908        db_metrics
909            .cf_metrics
910            .rocksdb_block_cache_usage
911            .with_label_values(&[cf_name])
912            .set(
913                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
914                    .unwrap_or(METRICS_ERROR),
915            );
916        db_metrics
917            .cf_metrics
918            .rocksdb_block_cache_pinned_usage
919            .with_label_values(&[cf_name])
920            .set(
921                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
922                    .unwrap_or(METRICS_ERROR),
923            );
924        db_metrics
925            .cf_metrics
926            .rocksdb_estimate_table_readers_mem
927            .with_label_values(&[cf_name])
928            .set(
929                Self::get_rocksdb_int_property(
930                    rocksdb,
931                    &cf,
932                    properties::ESTIMATE_TABLE_READERS_MEM,
933                )
934                .unwrap_or(METRICS_ERROR),
935            );
936        db_metrics
937            .cf_metrics
938            .rocksdb_estimated_num_keys
939            .with_label_values(&[cf_name])
940            .set(
941                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
942                    .unwrap_or(METRICS_ERROR),
943            );
944        db_metrics
945            .cf_metrics
946            .rocksdb_num_immutable_mem_tables
947            .with_label_values(&[cf_name])
948            .set(
949                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
950                    .unwrap_or(METRICS_ERROR),
951            );
952        db_metrics
953            .cf_metrics
954            .rocksdb_mem_table_flush_pending
955            .with_label_values(&[cf_name])
956            .set(
957                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
958                    .unwrap_or(METRICS_ERROR),
959            );
960        db_metrics
961            .cf_metrics
962            .rocksdb_compaction_pending
963            .with_label_values(&[cf_name])
964            .set(
965                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
966                    .unwrap_or(METRICS_ERROR),
967            );
968        db_metrics
969            .cf_metrics
970            .rocksdb_estimate_pending_compaction_bytes
971            .with_label_values(&[cf_name])
972            .set(
973                Self::get_rocksdb_int_property(
974                    rocksdb,
975                    &cf,
976                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
977                )
978                .unwrap_or(METRICS_ERROR),
979            );
980        db_metrics
981            .cf_metrics
982            .rocksdb_num_running_compactions
983            .with_label_values(&[cf_name])
984            .set(
985                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
986                    .unwrap_or(METRICS_ERROR),
987            );
988        db_metrics
989            .cf_metrics
990            .rocksdb_num_running_flushes
991            .with_label_values(&[cf_name])
992            .set(
993                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
994                    .unwrap_or(METRICS_ERROR),
995            );
996        db_metrics
997            .cf_metrics
998            .rocksdb_estimate_oldest_key_time
999            .with_label_values(&[cf_name])
1000            .set(
1001                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1002                    .unwrap_or(METRICS_ERROR),
1003            );
1004        db_metrics
1005            .cf_metrics
1006            .rocksdb_background_errors
1007            .with_label_values(&[cf_name])
1008            .set(
1009                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1010                    .unwrap_or(METRICS_ERROR),
1011            );
1012        db_metrics
1013            .cf_metrics
1014            .rocksdb_base_level
1015            .with_label_values(&[cf_name])
1016            .set(
1017                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1018                    .unwrap_or(METRICS_ERROR),
1019            );
1020    }
1021
1022    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1023        self.db.checkpoint(path)
1024    }
1025
1026    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1027    where
1028        K: Serialize + DeserializeOwned,
1029        V: Serialize + DeserializeOwned,
1030    {
1031        let mut num_keys = 0;
1032        let mut key_bytes_total = 0;
1033        let mut value_bytes_total = 0;
1034        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1035        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1036        for item in self.safe_iter() {
1037            let (key, value) = item?;
1038            num_keys += 1;
1039            let key_len = be_fix_int_ser(key.borrow()).len();
1040            let value_len = bcs::to_bytes(value.borrow())?.len();
1041            key_bytes_total += key_len;
1042            value_bytes_total += value_len;
1043            key_hist.record(key_len as u64)?;
1044            value_hist.record(value_len as u64)?;
1045        }
1046        Ok(TableSummary {
1047            num_keys,
1048            key_bytes_total,
1049            value_bytes_total,
1050            key_hist,
1051            value_hist,
1052        })
1053    }
1054
1055    fn start_iter_timer(&self) -> HistogramTimer {
1056        self.db_metrics
1057            .op_metrics
1058            .rocksdb_iter_latency_seconds
1059            .with_label_values(&[&self.cf])
1060            .start_timer()
1061    }
1062
1063    // Creates metrics and context for tracking an iterator usage and performance.
1064    fn create_iter_context(
1065        &self,
1066    ) -> (
1067        Option<HistogramTimer>,
1068        Option<Histogram>,
1069        Option<Histogram>,
1070        Option<RocksDBPerfContext>,
1071    ) {
1072        let timer = self.start_iter_timer();
1073        let bytes_scanned = self
1074            .db_metrics
1075            .op_metrics
1076            .rocksdb_iter_bytes
1077            .with_label_values(&[&self.cf]);
1078        let keys_scanned = self
1079            .db_metrics
1080            .op_metrics
1081            .rocksdb_iter_keys
1082            .with_label_values(&[&self.cf]);
1083        let perf_ctx = if self.iter_sample_interval.sample() {
1084            Some(RocksDBPerfContext)
1085        } else {
1086            None
1087        };
1088        (
1089            Some(timer),
1090            Some(bytes_scanned),
1091            Some(keys_scanned),
1092            perf_ctx,
1093        )
1094    }
1095
1096    /// Creates a safe reversed iterator with optional bounds.
1097    /// Both upper bound and lower bound are included.
1098    #[allow(clippy::complexity)]
1099    pub fn reversed_safe_iter_with_bounds(
1100        &self,
1101        lower_bound: Option<K>,
1102        upper_bound: Option<K>,
1103    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1104    where
1105        K: Serialize + DeserializeOwned,
1106        V: Serialize + DeserializeOwned,
1107    {
1108        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1109            lower_bound
1110                .as_ref()
1111                .map(Bound::Included)
1112                .unwrap_or(Bound::Unbounded),
1113            upper_bound
1114                .as_ref()
1115                .map(Bound::Included)
1116                .unwrap_or(Bound::Unbounded),
1117        ));
1118        match &self.db.storage {
1119            Storage::Rocks(db) => {
1120                let readopts = rocks_util::apply_range_bounds(
1121                    self.opts.readopts(),
1122                    it_lower_bound,
1123                    it_upper_bound,
1124                );
1125                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1126                let db_iter = db
1127                    .underlying
1128                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1129                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1130                let iter = SafeIter::new(
1131                    self.cf.clone(),
1132                    db_iter,
1133                    _timer,
1134                    _perf_ctx,
1135                    bytes_scanned,
1136                    keys_scanned,
1137                    Some(self.db_metrics.clone()),
1138                );
1139                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1140            }
1141            Storage::InMemory(db) => {
1142                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1143            }
1144            #[cfg(tidehunter)]
1145            Storage::TideHunter(db) => match &self.column_family {
1146                ColumnFamily::TideHunter((ks, prefix)) => {
1147                    let mut iter = db.iterator(*ks);
1148                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1149                    iter.reverse();
1150                    Ok(Box::new(transform_th_iterator(
1151                        iter,
1152                        prefix,
1153                        self.start_iter_timer(),
1154                    )))
1155                }
1156                _ => unreachable!("storage backend invariant violation"),
1157            },
1158        }
1159    }
1160}
1161
1162pub enum StorageWriteBatch {
1163    Rocks(rocksdb::WriteBatch),
1164    InMemory(InMemoryBatch),
1165    #[cfg(tidehunter)]
1166    TideHunter(tidehunter::batch::WriteBatch),
1167}
1168
1169/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1170/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1171/// so that slices can be produced without any per-entry allocation.
1172struct EntryHeader {
1173    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1174    offset: usize,
1175    cf_name_len: usize,
1176    key_len: usize,
1177    is_put: bool,
1178}
1179
1180/// A write batch that stores serialized operations in a flat byte buffer without
1181/// requiring a database reference. Can be replayed into a real `DBBatch` via
1182/// `DBBatch::concat`.
1183/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1184/// over write batches.
1185#[derive(Default)]
1186pub struct StagedBatch {
1187    data: Vec<u8>,
1188    entries: Vec<EntryHeader>,
1189}
1190
1191impl StagedBatch {
1192    pub fn new() -> Self {
1193        Self {
1194            data: Vec::with_capacity(1024),
1195            entries: Vec::with_capacity(16),
1196        }
1197    }
1198
1199    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1200        &mut self,
1201        db: &DBMap<K, V>,
1202        new_vals: impl IntoIterator<Item = (J, U)>,
1203    ) -> Result<&mut Self, TypedStoreError> {
1204        let cf_name = db.cf_name();
1205        new_vals
1206            .into_iter()
1207            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1208                let offset = self.data.len();
1209                self.data.extend_from_slice(cf_name.as_bytes());
1210                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1211                bcs::serialize_into(&mut self.data, v.borrow())
1212                    .map_err(typed_store_err_from_bcs_err)?;
1213                self.entries.push(EntryHeader {
1214                    offset,
1215                    cf_name_len: cf_name.len(),
1216                    key_len,
1217                    is_put: true,
1218                });
1219                Ok(())
1220            })?;
1221        Ok(self)
1222    }
1223
1224    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1225        &mut self,
1226        db: &DBMap<K, V>,
1227        purged_vals: impl IntoIterator<Item = J>,
1228    ) -> Result<(), TypedStoreError> {
1229        let cf_name = db.cf_name();
1230        purged_vals
1231            .into_iter()
1232            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1233                let offset = self.data.len();
1234                self.data.extend_from_slice(cf_name.as_bytes());
1235                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1236                self.entries.push(EntryHeader {
1237                    offset,
1238                    cf_name_len: cf_name.len(),
1239                    key_len,
1240                    is_put: false,
1241                });
1242                Ok(())
1243            })?;
1244        Ok(())
1245    }
1246
1247    pub fn size_in_bytes(&self) -> usize {
1248        self.data.len()
1249    }
1250}
1251
1252/// Provides a mutable struct to form a collection of database write operations, and execute them.
1253///
1254/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1255///  ie. they are all written or none is.
1256/// This is also true of operations across column families in the same database.
1257///
1258/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1259/// with each operation.
1260///
1261/// ```
1262/// use typed_store::rocks::*;
1263/// use tempfile::tempdir;
1264/// use typed_store::Map;
1265/// use typed_store::metrics::DBMetrics;
1266/// use prometheus::Registry;
1267/// use core::fmt::Error;
1268/// use std::sync::Arc;
1269///
1270/// #[tokio::main]
1271/// async fn main() -> Result<(), Error> {
1272/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1273///
1274/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1275///     .expect("Failed to open storage");
1276/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1277///
1278/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1279///     .expect("Failed to open storage");
1280/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1281///
1282/// let mut batch = db_cf_1.batch();
1283/// batch
1284///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1285///     .expect("Failed to batch insert")
1286///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1287///     .expect("Failed to batch insert");
1288///
1289/// let _ = batch.write().expect("Failed to execute batch");
1290/// for (k, v) in keys_vals_1 {
1291///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1292///     assert_eq!(Some(v), val);
1293/// }
1294///
1295/// for (k, v) in keys_vals_2 {
1296///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1297///     assert_eq!(Some(v), val);
1298/// }
1299/// Ok(())
1300/// }
1301/// ```
1302///
1303pub struct DBBatch {
1304    database: Arc<Database>,
1305    batch: StorageWriteBatch,
1306    db_metrics: Arc<DBMetrics>,
1307    write_sample_interval: SamplingInterval,
1308}
1309
1310impl DBBatch {
1311    /// Create a new batch associated with a DB reference.
1312    ///
1313    /// Use `open_cf` to get the DB reference or an existing open database.
1314    pub fn new(
1315        dbref: &Arc<Database>,
1316        batch: StorageWriteBatch,
1317        db_metrics: &Arc<DBMetrics>,
1318        write_sample_interval: &SamplingInterval,
1319    ) -> Self {
1320        DBBatch {
1321            database: dbref.clone(),
1322            batch,
1323            db_metrics: db_metrics.clone(),
1324            write_sample_interval: write_sample_interval.clone(),
1325        }
1326    }
1327
1328    /// Consume the batch and write its operations to the database
1329    #[instrument(level = "trace", skip_all, err)]
1330    pub fn write(self) -> Result<(), TypedStoreError> {
1331        let mut write_options = rocksdb::WriteOptions::default();
1332
1333        if write_sync_enabled() {
1334            write_options.set_sync(true);
1335        }
1336
1337        self.write_opt(write_options)
1338    }
1339
1340    /// Consume the batch and write its operations to the database with custom write options
1341    #[instrument(level = "trace", skip_all, err)]
1342    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1343        let db_name = self.database.db_name();
1344        let timer = self
1345            .db_metrics
1346            .op_metrics
1347            .rocksdb_batch_commit_latency_seconds
1348            .with_label_values(&[&db_name])
1349            .start_timer();
1350        let batch_size = self.size_in_bytes();
1351
1352        let perf_ctx = if self.write_sample_interval.sample() {
1353            Some(RocksDBPerfContext)
1354        } else {
1355            None
1356        };
1357
1358        self.database
1359            .write_opt_internal(self.batch, &write_options)?;
1360
1361        self.db_metrics
1362            .op_metrics
1363            .rocksdb_batch_commit_bytes
1364            .with_label_values(&[&db_name])
1365            .observe(batch_size as f64);
1366
1367        if perf_ctx.is_some() {
1368            self.db_metrics
1369                .write_perf_ctx_metrics
1370                .report_metrics(&db_name);
1371        }
1372        let elapsed = timer.stop_and_record();
1373        if elapsed > 1.0 {
1374            warn!(?elapsed, ?db_name, "very slow batch write");
1375            self.db_metrics
1376                .op_metrics
1377                .rocksdb_very_slow_batch_writes_count
1378                .with_label_values(&[&db_name])
1379                .inc();
1380            self.db_metrics
1381                .op_metrics
1382                .rocksdb_very_slow_batch_writes_duration_ms
1383                .with_label_values(&[&db_name])
1384                .inc_by((elapsed * 1000.0) as u64);
1385        }
1386        Ok(())
1387    }
1388
1389    pub fn size_in_bytes(&self) -> usize {
1390        match self.batch {
1391            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1392            StorageWriteBatch::InMemory(_) => 0,
1393            // TODO: implement size_in_bytes method
1394            #[cfg(tidehunter)]
1395            StorageWriteBatch::TideHunter(_) => 0,
1396        }
1397    }
1398
1399    /// Replay all operations from `StagedBatch`es into this batch.
1400    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1401        for raw_batch in raw_batches {
1402            let data = &raw_batch.data;
1403            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1404                let end = raw_batch
1405                    .entries
1406                    .get(i + 1)
1407                    .map_or(data.len(), |next| next.offset);
1408                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1409                let key_start = hdr.offset + hdr.cf_name_len;
1410                let key = &data[key_start..key_start + hdr.key_len];
1411                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1412                let cf_name = std::str::from_utf8(cf_bytes)
1413                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1414
1415                if hdr.is_put {
1416                    let value = &data[key_start + hdr.key_len..end];
1417                    match &mut self.batch {
1418                        StorageWriteBatch::Rocks(b) => {
1419                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1420                        }
1421                        StorageWriteBatch::InMemory(b) => {
1422                            b.put_cf(cf_name, key, value);
1423                        }
1424                        #[cfg(tidehunter)]
1425                        _ => {
1426                            return Err(TypedStoreError::RocksDBError(
1427                                "concat not supported for TideHunter".to_string(),
1428                            ));
1429                        }
1430                    }
1431                } else {
1432                    match &mut self.batch {
1433                        StorageWriteBatch::Rocks(b) => {
1434                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1435                        }
1436                        StorageWriteBatch::InMemory(b) => {
1437                            b.delete_cf(cf_name, key);
1438                        }
1439                        #[cfg(tidehunter)]
1440                        _ => {
1441                            return Err(TypedStoreError::RocksDBError(
1442                                "concat not supported for TideHunter".to_string(),
1443                            ));
1444                        }
1445                    }
1446                }
1447            }
1448        }
1449        Ok(self)
1450    }
1451
1452    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1453        &mut self,
1454        db: &DBMap<K, V>,
1455        purged_vals: impl IntoIterator<Item = J>,
1456    ) -> Result<(), TypedStoreError> {
1457        if !Arc::ptr_eq(&db.db, &self.database) {
1458            return Err(TypedStoreError::CrossDBBatch);
1459        }
1460
1461        purged_vals
1462            .into_iter()
1463            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1464                let k_buf = be_fix_int_ser(k.borrow());
1465                match (&mut self.batch, &db.column_family) {
1466                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1467                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1468                    }
1469                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1470                        b.delete_cf(name, k_buf)
1471                    }
1472                    #[cfg(tidehunter)]
1473                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1474                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1475                    }
1476                    _ => Err(TypedStoreError::RocksDBError(
1477                        "typed store invariant violation".to_string(),
1478                    ))?,
1479                }
1480                Ok(())
1481            })?;
1482        Ok(())
1483    }
1484
1485    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1486    /// by writing a range delete tombstone in the db map
1487    /// If the DBMap is configured with ignore_range_deletions set to false,
1488    /// the effect of this write will be visible immediately i.e. you won't
1489    /// see old values when you do a lookup or scan. But if it is configured
1490    /// with ignore_range_deletions set to true, the old value are visible until
1491    /// compaction actually deletes them which will happen sometime after. By
1492    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1493    /// overridden in the config), so please use this function with caution
1494    pub fn schedule_delete_range<K: Serialize, V>(
1495        &mut self,
1496        db: &DBMap<K, V>,
1497        from: &K,
1498        to: &K,
1499    ) -> Result<(), TypedStoreError> {
1500        if !Arc::ptr_eq(&db.db, &self.database) {
1501            return Err(TypedStoreError::CrossDBBatch);
1502        }
1503
1504        let from_buf = be_fix_int_ser(from);
1505        let to_buf = be_fix_int_ser(to);
1506
1507        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1508            b.delete_range_cf(
1509                &rocks_cf_from_db(&self.database, db.cf_name())?,
1510                from_buf,
1511                to_buf,
1512            );
1513        }
1514        Ok(())
1515    }
1516
1517    /// inserts a range of (key, value) pairs given as an iterator
1518    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1519        &mut self,
1520        db: &DBMap<K, V>,
1521        new_vals: impl IntoIterator<Item = (J, U)>,
1522    ) -> Result<&mut Self, TypedStoreError> {
1523        if !Arc::ptr_eq(&db.db, &self.database) {
1524            return Err(TypedStoreError::CrossDBBatch);
1525        }
1526        let mut total = 0usize;
1527        new_vals
1528            .into_iter()
1529            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1530                let k_buf = be_fix_int_ser(k.borrow());
1531                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1532                total += k_buf.len() + v_buf.len();
1533                if db.opts.log_value_hash {
1534                    let key_hash = default_hash(&k_buf);
1535                    let value_hash = default_hash(&v_buf);
1536                    debug!(
1537                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1538                        db.cf_name(),
1539                        key_hash,
1540                        value_hash
1541                    );
1542                }
1543                match (&mut self.batch, &db.column_family) {
1544                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1545                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1546                    }
1547                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1548                        b.put_cf(name, k_buf, v_buf)
1549                    }
1550                    #[cfg(tidehunter)]
1551                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1552                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1553                    }
1554                    _ => Err(TypedStoreError::RocksDBError(
1555                        "typed store invariant violation".to_string(),
1556                    ))?,
1557                }
1558                Ok(())
1559            })?;
1560        self.db_metrics
1561            .op_metrics
1562            .rocksdb_batch_put_bytes
1563            .with_label_values(&[&db.cf])
1564            .observe(total as f64);
1565        Ok(self)
1566    }
1567
1568    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1569        &mut self,
1570        db: &DBMap<K, V>,
1571        new_vals: impl IntoIterator<Item = (J, U)>,
1572    ) -> Result<&mut Self, TypedStoreError> {
1573        if !Arc::ptr_eq(&db.db, &self.database) {
1574            return Err(TypedStoreError::CrossDBBatch);
1575        }
1576        new_vals
1577            .into_iter()
1578            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1579                let k_buf = be_fix_int_ser(k.borrow());
1580                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1581                match &mut self.batch {
1582                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1583                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1584                        k_buf,
1585                        v_buf,
1586                    ),
1587                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1588                }
1589                Ok(())
1590            })?;
1591        Ok(self)
1592    }
1593}
1594
1595impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1596where
1597    K: Serialize + DeserializeOwned,
1598    V: Serialize + DeserializeOwned,
1599{
1600    type Error = TypedStoreError;
1601
1602    #[instrument(level = "trace", skip_all, err)]
1603    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1604        let key_buf = be_fix_int_ser(key);
1605        let readopts = self.opts.readopts();
1606        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1607            && self
1608                .db
1609                .get(&self.column_family, &key_buf, &readopts)?
1610                .is_some())
1611    }
1612
1613    #[instrument(level = "trace", skip_all, err)]
1614    fn multi_contains_keys<J>(
1615        &self,
1616        keys: impl IntoIterator<Item = J>,
1617    ) -> Result<Vec<bool>, Self::Error>
1618    where
1619        J: Borrow<K>,
1620    {
1621        let values = self.multi_get_pinned(keys)?;
1622        Ok(values.into_iter().map(|v| v.is_some()).collect())
1623    }
1624
1625    #[instrument(level = "trace", skip_all, err)]
1626    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1627        let _timer = self
1628            .db_metrics
1629            .op_metrics
1630            .rocksdb_get_latency_seconds
1631            .with_label_values(&[&self.cf])
1632            .start_timer();
1633        let perf_ctx = if self.get_sample_interval.sample() {
1634            Some(RocksDBPerfContext)
1635        } else {
1636            None
1637        };
1638        let key_buf = be_fix_int_ser(key);
1639        let res = self
1640            .db
1641            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1642        self.db_metrics
1643            .op_metrics
1644            .rocksdb_get_bytes
1645            .with_label_values(&[&self.cf])
1646            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1647        if perf_ctx.is_some() {
1648            self.db_metrics
1649                .read_perf_ctx_metrics
1650                .report_metrics(&self.cf);
1651        }
1652        match res {
1653            Some(data) => {
1654                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1655                if value.is_err() {
1656                    let key_hash = default_hash(&key_buf);
1657                    let value_hash = default_hash(&data);
1658                    debug_fatal!(
1659                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1660                        self.cf_name(),
1661                        key_hash,
1662                        value_hash,
1663                        value.as_ref().err().unwrap()
1664                    );
1665                }
1666                Ok(Some(value?))
1667            }
1668            None => Ok(None),
1669        }
1670    }
1671
1672    #[instrument(level = "trace", skip_all, err)]
1673    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1674        let timer = self
1675            .db_metrics
1676            .op_metrics
1677            .rocksdb_put_latency_seconds
1678            .with_label_values(&[&self.cf])
1679            .start_timer();
1680        let perf_ctx = if self.write_sample_interval.sample() {
1681            Some(RocksDBPerfContext)
1682        } else {
1683            None
1684        };
1685        let key_buf = be_fix_int_ser(key);
1686        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1687        self.db_metrics
1688            .op_metrics
1689            .rocksdb_put_bytes
1690            .with_label_values(&[&self.cf])
1691            .observe((key_buf.len() + value_buf.len()) as f64);
1692        if perf_ctx.is_some() {
1693            self.db_metrics
1694                .write_perf_ctx_metrics
1695                .report_metrics(&self.cf);
1696        }
1697        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1698
1699        let elapsed = timer.stop_and_record();
1700        if elapsed > 1.0 {
1701            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1702            self.db_metrics
1703                .op_metrics
1704                .rocksdb_very_slow_puts_count
1705                .with_label_values(&[&self.cf])
1706                .inc();
1707            self.db_metrics
1708                .op_metrics
1709                .rocksdb_very_slow_puts_duration_ms
1710                .with_label_values(&[&self.cf])
1711                .inc_by((elapsed * 1000.0) as u64);
1712        }
1713
1714        Ok(())
1715    }
1716
1717    #[instrument(level = "trace", skip_all, err)]
1718    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1719        let _timer = self
1720            .db_metrics
1721            .op_metrics
1722            .rocksdb_delete_latency_seconds
1723            .with_label_values(&[&self.cf])
1724            .start_timer();
1725        let perf_ctx = if self.write_sample_interval.sample() {
1726            Some(RocksDBPerfContext)
1727        } else {
1728            None
1729        };
1730        let key_buf = be_fix_int_ser(key);
1731        self.db.delete_cf(&self.column_family, key_buf)?;
1732        self.db_metrics
1733            .op_metrics
1734            .rocksdb_deletes
1735            .with_label_values(&[&self.cf])
1736            .inc();
1737        if perf_ctx.is_some() {
1738            self.db_metrics
1739                .write_perf_ctx_metrics
1740                .report_metrics(&self.cf);
1741        }
1742        Ok(())
1743    }
1744
1745    /// Writes a range delete tombstone to delete all entries in the db map
1746    /// If the DBMap is configured with ignore_range_deletions set to false,
1747    /// the effect of this write will be visible immediately i.e. you won't
1748    /// see old values when you do a lookup or scan. But if it is configured
1749    /// with ignore_range_deletions set to true, the old value are visible until
1750    /// compaction actually deletes them which will happen sometime after. By
1751    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1752    /// overridden in the config), so please use this function with caution
1753    #[instrument(level = "trace", skip_all, err)]
1754    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1755        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1756        let last_key = self
1757            .reversed_safe_iter_with_bounds(None, None)?
1758            .next()
1759            .transpose()?
1760            .map(|(k, _v)| k);
1761        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1762            let mut batch = self.batch();
1763            batch.schedule_delete_range(self, &first_key, &last_key)?;
1764            batch.write()?;
1765        }
1766        Ok(())
1767    }
1768
1769    fn is_empty(&self) -> bool {
1770        self.safe_iter().next().is_none()
1771    }
1772
1773    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1774        match &self.db.storage {
1775            Storage::Rocks(db) => {
1776                let db_iter = db
1777                    .underlying
1778                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1779                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1780                Box::new(SafeIter::new(
1781                    self.cf.clone(),
1782                    db_iter,
1783                    _timer,
1784                    _perf_ctx,
1785                    bytes_scanned,
1786                    keys_scanned,
1787                    Some(self.db_metrics.clone()),
1788                ))
1789            }
1790            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1791            #[cfg(tidehunter)]
1792            Storage::TideHunter(db) => match &self.column_family {
1793                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1794                    db.iterator(*ks),
1795                    prefix,
1796                    self.start_iter_timer(),
1797                )),
1798                _ => unreachable!("storage backend invariant violation"),
1799            },
1800        }
1801    }
1802
1803    fn safe_iter_with_bounds(
1804        &'a self,
1805        lower_bound: Option<K>,
1806        upper_bound: Option<K>,
1807    ) -> DbIterator<'a, (K, V)> {
1808        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1809        match &self.db.storage {
1810            Storage::Rocks(db) => {
1811                let readopts =
1812                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1813                let db_iter = db
1814                    .underlying
1815                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1816                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1817                Box::new(SafeIter::new(
1818                    self.cf.clone(),
1819                    db_iter,
1820                    _timer,
1821                    _perf_ctx,
1822                    bytes_scanned,
1823                    keys_scanned,
1824                    Some(self.db_metrics.clone()),
1825                ))
1826            }
1827            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1828            #[cfg(tidehunter)]
1829            Storage::TideHunter(db) => match &self.column_family {
1830                ColumnFamily::TideHunter((ks, prefix)) => {
1831                    let mut iter = db.iterator(*ks);
1832                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1833                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1834                }
1835                _ => unreachable!("storage backend invariant violation"),
1836            },
1837        }
1838    }
1839
1840    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1841        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1842        match &self.db.storage {
1843            Storage::Rocks(db) => {
1844                let readopts =
1845                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1846                let db_iter = db
1847                    .underlying
1848                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1849                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1850                Box::new(SafeIter::new(
1851                    self.cf.clone(),
1852                    db_iter,
1853                    _timer,
1854                    _perf_ctx,
1855                    bytes_scanned,
1856                    keys_scanned,
1857                    Some(self.db_metrics.clone()),
1858                ))
1859            }
1860            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1861            #[cfg(tidehunter)]
1862            Storage::TideHunter(db) => match &self.column_family {
1863                ColumnFamily::TideHunter((ks, prefix)) => {
1864                    let mut iter = db.iterator(*ks);
1865                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1866                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1867                }
1868                _ => unreachable!("storage backend invariant violation"),
1869            },
1870        }
1871    }
1872
1873    /// Returns a vector of values corresponding to the keys provided.
1874    #[instrument(level = "trace", skip_all, err)]
1875    fn multi_get<J>(
1876        &self,
1877        keys: impl IntoIterator<Item = J>,
1878    ) -> Result<Vec<Option<V>>, TypedStoreError>
1879    where
1880        J: Borrow<K>,
1881    {
1882        let results = self.multi_get_pinned(keys)?;
1883        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1884            .into_iter()
1885            .map(|value_byte| match value_byte {
1886                Some(data) => Ok(Some(
1887                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1888                )),
1889                None => Ok(None),
1890            })
1891            .collect();
1892
1893        values_parsed
1894    }
1895
1896    /// Convenience method for batch insertion
1897    #[instrument(level = "trace", skip_all, err)]
1898    fn multi_insert<J, U>(
1899        &self,
1900        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1901    ) -> Result<(), Self::Error>
1902    where
1903        J: Borrow<K>,
1904        U: Borrow<V>,
1905    {
1906        let mut batch = self.batch();
1907        batch.insert_batch(self, key_val_pairs)?;
1908        batch.write()
1909    }
1910
1911    /// Convenience method for batch removal
1912    #[instrument(level = "trace", skip_all, err)]
1913    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1914    where
1915        J: Borrow<K>,
1916    {
1917        let mut batch = self.batch();
1918        batch.delete_batch(self, keys)?;
1919        batch.write()
1920    }
1921
1922    /// Try to catch up with primary when running as secondary
1923    #[instrument(level = "trace", skip_all, err)]
1924    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1925        if let Storage::Rocks(rocks) = &self.db.storage {
1926            rocks
1927                .underlying
1928                .try_catch_up_with_primary()
1929                .map_err(typed_store_err_from_rocks_err)?;
1930        }
1931        Ok(())
1932    }
1933}
1934
1935/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1936#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1937pub fn open_cf_opts<P: AsRef<Path>>(
1938    path: P,
1939    db_options: Option<rocksdb::Options>,
1940    metric_conf: MetricConf,
1941    opt_cfs: &[(&str, rocksdb::Options)],
1942) -> Result<Arc<Database>, TypedStoreError> {
1943    let path = path.as_ref();
1944    ensure_database_type(path, StorageType::Rocks)
1945        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1946    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1947    // because rocksdb uses the simulated clock when creating its background threads, but then
1948    // those threads see the real wall clock (because they are not the test thread), which causes
1949    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1950    // resolves the issue.
1951    //
1952    // This is a no-op in non-simulator builds.
1953
1954    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1955    nondeterministic!({
1956        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1957        options.create_if_missing(true);
1958        options.create_missing_column_families(true);
1959        let rocksdb = {
1960            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1961                &options,
1962                path,
1963                cfs.into_iter()
1964                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1965            )
1966            .map_err(typed_store_err_from_rocks_err)?
1967        };
1968        Ok(Arc::new(Database::new(
1969            Storage::Rocks(RocksDB {
1970                underlying: rocksdb,
1971            }),
1972            metric_conf,
1973            None,
1974        )))
1975    })
1976}
1977
1978/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1979pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1980    primary_path: P,
1981    secondary_path: Option<P>,
1982    db_options: Option<rocksdb::Options>,
1983    metric_conf: MetricConf,
1984    opt_cfs: &[(&str, rocksdb::Options)],
1985) -> Result<Arc<Database>, TypedStoreError> {
1986    let primary_path = primary_path.as_ref();
1987    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1988    // See comment above for explanation of why nondeterministic is necessary here.
1989    nondeterministic!({
1990        // Customize database options
1991        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1992
1993        fdlimit::raise_fd_limit();
1994        // This is a requirement by RocksDB when opening as secondary
1995        options.set_max_open_files(-1);
1996
1997        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1998        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1999            .ok()
2000            .unwrap_or_default();
2001
2002        let default_db_options = default_db_options();
2003        // Add CFs not explicitly listed
2004        for cf_key in cfs.iter() {
2005            if !opt_cfs.contains_key(&cf_key[..]) {
2006                opt_cfs.insert(cf_key, default_db_options.options.clone());
2007            }
2008        }
2009
2010        let primary_path = primary_path.to_path_buf();
2011        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2012            let mut s = primary_path.clone();
2013            s.pop();
2014            s.push("SECONDARY");
2015            s.as_path().to_path_buf()
2016        });
2017
2018        ensure_database_type(&primary_path, StorageType::Rocks)
2019            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2020        ensure_database_type(&secondary_path, StorageType::Rocks)
2021            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2022
2023        let rocksdb = {
2024            options.create_if_missing(true);
2025            options.create_missing_column_families(true);
2026            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2027                &options,
2028                &primary_path,
2029                &secondary_path,
2030                opt_cfs
2031                    .iter()
2032                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2033            )
2034            .map_err(typed_store_err_from_rocks_err)?;
2035            db.try_catch_up_with_primary()
2036                .map_err(typed_store_err_from_rocks_err)?;
2037            db
2038        };
2039        Ok(Arc::new(Database::new(
2040            Storage::Rocks(RocksDB {
2041                underlying: rocksdb,
2042            }),
2043            metric_conf,
2044            None,
2045        )))
2046    })
2047}
2048
2049// Drops a database if there is no other handle to it, with retries and timeout.
2050#[cfg(not(tidehunter))]
2051pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2052    let mut backoff = backoff::ExponentialBackoff {
2053        max_elapsed_time: Some(timeout),
2054        ..Default::default()
2055    };
2056    loop {
2057        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2058            Ok(()) => return Ok(()),
2059            Err(err) => match backoff.next_backoff() {
2060                Some(duration) => tokio::time::sleep(duration).await,
2061                None => return Err(err),
2062            },
2063        }
2064    }
2065}
2066
2067#[cfg(tidehunter)]
2068pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2069    let mut backoff = backoff::ExponentialBackoff {
2070        max_elapsed_time: Some(timeout),
2071        ..Default::default()
2072    };
2073    loop {
2074        match TideHunterDb::drop_db(&path) {
2075            Ok(()) => return Ok(()),
2076            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2077                match backoff.next_backoff() {
2078                    Some(duration) => tokio::time::sleep(duration).await,
2079                    None => {
2080                        warn!(
2081                            "Database at {:?} is still locked after timeout ({:?})",
2082                            path, timeout
2083                        );
2084                        return Err(err);
2085                    }
2086                }
2087            }
2088            Err(err) => return Err(err),
2089        }
2090    }
2091}
2092
2093fn populate_missing_cfs(
2094    input_cfs: &[(&str, rocksdb::Options)],
2095    path: &Path,
2096) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2097    let mut cfs = vec![];
2098    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2099    let existing_cfs =
2100        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2101            .ok()
2102            .unwrap_or_default();
2103
2104    for cf_name in existing_cfs {
2105        if !input_cf_index.contains(&cf_name[..]) {
2106            cfs.push((cf_name, rocksdb::Options::default()));
2107        }
2108    }
2109    cfs.extend(
2110        input_cfs
2111            .iter()
2112            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2113    );
2114    Ok(cfs)
2115}
2116
2117fn default_hash(value: &[u8]) -> Digest<32> {
2118    let mut hasher = fastcrypto::hash::Blake2b256::default();
2119    hasher.update(value);
2120    hasher.finalize()
2121}