typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    fn multi_get<I, K>(
231        &self,
232        cf: &ColumnFamily,
233        keys: I,
234        readopts: &ReadOptions,
235    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236    where
237        I: IntoIterator<Item = K>,
238        K: AsRef<[u8]>,
239    {
240        match (&self.storage, cf) {
241            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242                let keys_vec: Vec<K> = keys.into_iter().collect();
243                let res = db.underlying.batched_multi_get_cf_opt(
244                    &cf.rocks_cf(db),
245                    keys_vec.iter(),
246                    /* sorted_input */ false,
247                    readopts,
248                );
249                res.into_iter()
250                    .map(|r| {
251                        r.map_err(typed_store_err_from_rocks_err)
252                            .map(|item| item.map(GetResult::Rocks))
253                    })
254                    .collect()
255            }
256            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257                .multi_get(cf_name, keys)
258                .into_iter()
259                .map(|r| Ok(r.map(GetResult::InMemory)))
260                .collect(),
261            #[cfg(tidehunter)]
262            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
263                .into_iter()
264                .map(|k| {
265                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
266                        .map_err(typed_store_error_from_th_error)
267                        .map(|item| item.map(|bytes| GetResult::TideHunter(bytes.into_owned())))
268                })
269                .collect(),
270            _ => unreachable!("typed store invariant violation"),
271        }
272    }
273
274    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
275        match &self.storage {
276            Storage::Rocks(db) => db.underlying.drop_cf(name),
277            Storage::InMemory(db) => {
278                db.drop_cf(name);
279                Ok(())
280            }
281            #[cfg(tidehunter)]
282            Storage::TideHunter(_) => {
283                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
284            }
285        }
286    }
287
288    pub fn delete_file_in_range<K: AsRef<[u8]>>(
289        &self,
290        cf: &impl AsColumnFamilyRef,
291        from: K,
292        to: K,
293    ) -> Result<(), rocksdb::Error> {
294        match &self.storage {
295            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
296            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
297        }
298    }
299
300    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
301        fail_point!("delete-cf-before");
302        let ret = match (&self.storage, cf) {
303            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
304                .underlying
305                .delete_cf(&cf.rocks_cf(db), key)
306                .map_err(typed_store_err_from_rocks_err),
307            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
308                db.delete(cf_name, key.as_ref());
309                Ok(())
310            }
311            #[cfg(tidehunter)]
312            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
313                .remove(*ks, transform_th_key(key.as_ref(), prefix))
314                .map_err(typed_store_error_from_th_error),
315            _ => Err(TypedStoreError::RocksDBError(
316                "typed store invariant violation".to_string(),
317            )),
318        };
319        fail_point!("delete-cf-after");
320        #[allow(clippy::let_and_return)]
321        ret
322    }
323
324    pub fn path_for_pruning(&self) -> &Path {
325        match &self.storage {
326            Storage::Rocks(rocks) => rocks.underlying.path(),
327            _ => unimplemented!("method is only supported for rocksdb backend"),
328        }
329    }
330
331    fn put_cf(
332        &self,
333        cf: &ColumnFamily,
334        key: Vec<u8>,
335        value: Vec<u8>,
336    ) -> Result<(), TypedStoreError> {
337        fail_point!("put-cf-before");
338        let ret = match (&self.storage, cf) {
339            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
340                .underlying
341                .put_cf(&cf.rocks_cf(db), key, value)
342                .map_err(typed_store_err_from_rocks_err),
343            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
344                db.put(cf_name, key, value);
345                Ok(())
346            }
347            #[cfg(tidehunter)]
348            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
349                .insert(*ks, transform_th_key(&key, prefix), value)
350                .map_err(typed_store_error_from_th_error),
351            _ => Err(TypedStoreError::RocksDBError(
352                "typed store invariant violation".to_string(),
353            )),
354        };
355        fail_point!("put-cf-after");
356        #[allow(clippy::let_and_return)]
357        ret
358    }
359
360    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
361        &self,
362        cf_name: &str,
363        key: K,
364        readopts: &ReadOptions,
365    ) -> bool {
366        match &self.storage {
367            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
368            // but no false negatives. We use it to short-circuit the absent case
369            Storage::Rocks(rocks) => {
370                rocks
371                    .underlying
372                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
373            }
374            _ => true,
375        }
376    }
377
378    pub(crate) fn write_opt_internal(
379        &self,
380        batch: StorageWriteBatch,
381        write_options: &rocksdb::WriteOptions,
382    ) -> Result<(), TypedStoreError> {
383        fail_point!("batch-write-before");
384        let ret = match (&self.storage, batch) {
385            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
386                .underlying
387                .write_opt(batch, write_options)
388                .map_err(typed_store_err_from_rocks_err),
389            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
390                // InMemory doesn't support write options
391                db.write(batch);
392                Ok(())
393            }
394            #[cfg(tidehunter)]
395            (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
396                // TideHunter doesn't support write options
397                batch.commit().map_err(typed_store_error_from_th_error)
398            }
399            _ => Err(TypedStoreError::RocksDBError(
400                "using invalid batch type for the database".to_string(),
401            )),
402        };
403        fail_point!("batch-write-after");
404        #[allow(clippy::let_and_return)]
405        ret
406    }
407
408    #[cfg(tidehunter)]
409    pub fn start_relocation(&self) -> anyhow::Result<()> {
410        if let Storage::TideHunter(db) = &self.storage {
411            db.start_relocation()?;
412        }
413        Ok(())
414    }
415
416    #[cfg(tidehunter)]
417    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
418        if let Storage::TideHunter(db) = &self.storage {
419            db.force_rebuild_control_region()
420                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
421        }
422        Ok(())
423    }
424
425    /// Wait for tidehunter background threads to finish.
426    ///
427    /// Consumes the `Arc<Database>`. Caller must ensure no other clones of this
428    /// `Arc<Database>` (e.g. via `DBMap::db`) are alive — otherwise the inner
429    /// `Arc<TideHunterDb>` strong count will not reach zero and the wait will
430    /// poll until it panics.
431    #[cfg(tidehunter)]
432    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
433        let strong = Arc::strong_count(&self);
434        if strong != 1 {
435            println!(
436                "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
437                strong,
438            );
439        }
440        let Storage::TideHunter(th_arc) = &self.storage else {
441            return;
442        };
443        let th_arc = th_arc.clone();
444        drop(self);
445        th_arc.wait_for_background_threads_to_finish();
446    }
447
448    #[cfg(tidehunter)]
449    pub fn drop_cells_in_range(
450        &self,
451        ks: KeySpace,
452        from_inclusive: &[u8],
453        to_inclusive: &[u8],
454    ) -> anyhow::Result<()> {
455        if let Storage::TideHunter(db) = &self.storage {
456            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
457                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
458        } else {
459            panic!("drop_cells_in_range called on non-TideHunter storage");
460        }
461        Ok(())
462    }
463
464    pub fn compact_range_cf<K: AsRef<[u8]>>(
465        &self,
466        cf_name: &str,
467        start: Option<K>,
468        end: Option<K>,
469    ) {
470        if let Storage::Rocks(rocksdb) = &self.storage {
471            rocksdb
472                .underlying
473                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
474        }
475    }
476
477    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
478        // TODO: implement for other storage types
479        if let Storage::Rocks(rocks) = &self.storage {
480            let checkpoint =
481                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
482            checkpoint
483                .create_checkpoint(path)
484                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
485        }
486        Ok(())
487    }
488
489    pub fn get_sampling_interval(&self) -> SamplingInterval {
490        self.metric_conf.read_sample_interval.new_from_self()
491    }
492
493    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
494        self.metric_conf.read_sample_interval.new_from_self()
495    }
496
497    pub fn write_sampling_interval(&self) -> SamplingInterval {
498        self.metric_conf.write_sample_interval.new_from_self()
499    }
500
501    pub fn iter_sampling_interval(&self) -> SamplingInterval {
502        self.metric_conf.iter_sample_interval.new_from_self()
503    }
504
505    fn db_name(&self) -> String {
506        let name = &self.metric_conf.db_name;
507        if name.is_empty() {
508            "default".to_string()
509        } else {
510            name.clone()
511        }
512    }
513
514    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
515        match &self.storage {
516            Storage::Rocks(rocks) => rocks.underlying.live_files(),
517            _ => Ok(vec![]),
518        }
519    }
520}
521
522fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
523    rocks_db
524        .underlying
525        .cf_handle(cf_name)
526        .expect("Map-keying column family should have been checked at DB creation")
527}
528
529fn rocks_cf_from_db<'a>(
530    db: &'a Database,
531    cf_name: &str,
532) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
533    match &db.storage {
534        Storage::Rocks(rocksdb) => Ok(rocksdb
535            .underlying
536            .cf_handle(cf_name)
537            .expect("Map-keying column family should have been checked at DB creation")),
538        _ => Err(TypedStoreError::RocksDBError(
539            "using invalid batch type for the database".to_string(),
540        )),
541    }
542}
543
544#[derive(Debug, Default)]
545pub struct MetricConf {
546    pub db_name: String,
547    pub read_sample_interval: SamplingInterval,
548    pub write_sample_interval: SamplingInterval,
549    pub iter_sample_interval: SamplingInterval,
550    /// When true and the database is opened with the tidehunter backend, each
551    /// committed `WriteBatch` is written as a single lz4-compressed WAL entry.
552    pub enable_th_batch_compression: bool,
553}
554
555impl MetricConf {
556    pub fn new(db_name: &str) -> Self {
557        if db_name.is_empty() {
558            error!("A meaningful db name should be used for metrics reporting.")
559        }
560        Self {
561            db_name: db_name.to_string(),
562            read_sample_interval: SamplingInterval::default(),
563            write_sample_interval: SamplingInterval::default(),
564            iter_sample_interval: SamplingInterval::default(),
565            enable_th_batch_compression: false,
566        }
567    }
568
569    pub fn with_sampling(mut self, read_interval: SamplingInterval) -> Self {
570        self.read_sample_interval = read_interval;
571        self
572    }
573
574    pub fn with_th_batch_compression(mut self) -> Self {
575        self.enable_th_batch_compression = true;
576        self
577    }
578}
579const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
580const METRICS_ERROR: i64 = -1;
581
582/// An interface to a rocksDB database, keyed by a columnfamily
583#[derive(Clone, Debug)]
584pub struct DBMap<K, V> {
585    pub db: Arc<Database>,
586    _phantom: PhantomData<fn(K) -> V>,
587    column_family: ColumnFamily,
588    // the column family under which the map is stored
589    cf: String,
590    pub opts: ReadWriteOptions,
591    db_metrics: Arc<DBMetrics>,
592    get_sample_interval: SamplingInterval,
593    multiget_sample_interval: SamplingInterval,
594    write_sample_interval: SamplingInterval,
595    iter_sample_interval: SamplingInterval,
596    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
597}
598
599unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
600
601impl<K, V> DBMap<K, V> {
602    pub(crate) fn new(
603        db: Arc<Database>,
604        opts: &ReadWriteOptions,
605        opt_cf: &str,
606        column_family: ColumnFamily,
607        is_deprecated: bool,
608    ) -> Self {
609        let db_cloned = Arc::downgrade(&db.clone());
610        let db_metrics = DBMetrics::get();
611        let db_metrics_cloned = db_metrics.clone();
612        let cf = opt_cf.to_string();
613
614        let (sender, mut recv) = tokio::sync::oneshot::channel();
615        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
616            tokio::task::spawn(async move {
617                let mut interval =
618                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
619                loop {
620                    tokio::select! {
621                        _ = interval.tick() => {
622                            if let Some(db) = db_cloned.upgrade() {
623                                let cf = cf.clone();
624                                let db_metrics = db_metrics.clone();
625                                if let Err(e) = tokio::task::spawn_blocking(move || {
626                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
627                                }).await {
628                                    error!("Failed to log metrics with error: {}", e);
629                                }
630                            }
631                        }
632                        _ = &mut recv => break,
633                    }
634                }
635                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
636            });
637        }
638        DBMap {
639            db: db.clone(),
640            opts: opts.clone(),
641            _phantom: PhantomData,
642            column_family,
643            cf: opt_cf.to_string(),
644            db_metrics: db_metrics_cloned,
645            _metrics_task_cancel_handle: Arc::new(sender),
646            get_sample_interval: db.get_sampling_interval(),
647            multiget_sample_interval: db.multiget_sampling_interval(),
648            write_sample_interval: db.write_sampling_interval(),
649            iter_sample_interval: db.iter_sampling_interval(),
650        }
651    }
652
653    /// Reopens an open database as a typed map operating under a specific column family.
654    /// if no column family is passed, the default column family is used.
655    #[instrument(level = "debug", skip(db), err)]
656    pub fn reopen(
657        db: &Arc<Database>,
658        opt_cf: Option<&str>,
659        rw_options: &ReadWriteOptions,
660        is_deprecated: bool,
661    ) -> Result<Self, TypedStoreError> {
662        let cf_key = opt_cf
663            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
664            .to_owned();
665        Ok(DBMap::new(
666            db.clone(),
667            rw_options,
668            &cf_key,
669            ColumnFamily::Rocks(cf_key.to_string()),
670            is_deprecated,
671        ))
672    }
673
674    #[cfg(tidehunter)]
675    pub fn reopen_th(
676        db: Arc<Database>,
677        cf_name: &str,
678        ks: KeySpace,
679        prefix: Option<Vec<u8>>,
680    ) -> Self {
681        DBMap::new(
682            db,
683            &ReadWriteOptions::default(),
684            cf_name,
685            ColumnFamily::TideHunter((ks, prefix.clone())),
686            false,
687        )
688    }
689
690    pub fn cf_name(&self) -> &str {
691        &self.cf
692    }
693
694    pub fn batch(&self) -> DBBatch {
695        let batch = match &self.db.storage {
696            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
697            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
698            #[cfg(tidehunter)]
699            Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
700        };
701        DBBatch::new(
702            &self.db,
703            batch,
704            &self.db_metrics,
705            &self.write_sample_interval,
706        )
707    }
708
709    pub fn flush(&self) -> Result<(), TypedStoreError> {
710        self.db.flush()
711    }
712
713    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
714        let from_buf = be_fix_int_ser(start);
715        let to_buf = be_fix_int_ser(end);
716        self.db
717            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
718        Ok(())
719    }
720
721    pub fn compact_range_raw(
722        &self,
723        cf_name: &str,
724        start: Vec<u8>,
725        end: Vec<u8>,
726    ) -> Result<(), TypedStoreError> {
727        self.db.compact_range_cf(cf_name, Some(start), Some(end));
728        Ok(())
729    }
730
731    #[cfg(tidehunter)]
732    pub fn drop_cells_in_range<J: Serialize>(
733        &self,
734        from_inclusive: &J,
735        to_inclusive: &J,
736    ) -> Result<(), TypedStoreError>
737    where
738        K: Serialize,
739    {
740        let from_buf = be_fix_int_ser(from_inclusive);
741        let to_buf = be_fix_int_ser(to_inclusive);
742        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
743            self.db
744                .drop_cells_in_range(*ks, &from_buf, &to_buf)
745                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
746        }
747        Ok(())
748    }
749
750    #[cfg(tidehunter)]
751    pub fn drop_cells_in_range_raw(
752        &self,
753        from_inclusive: &[u8],
754        to_inclusive: &[u8],
755    ) -> Result<(), TypedStoreError> {
756        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
757            self.db
758                .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
759                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
760        }
761        Ok(())
762    }
763
764    /// Returns a vector of raw values corresponding to the keys provided.
765    fn multi_get_pinned<J>(
766        &self,
767        keys: impl IntoIterator<Item = J>,
768    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
769    where
770        J: Borrow<K>,
771        K: Serialize,
772    {
773        let _timer = self
774            .db_metrics
775            .op_metrics
776            .rocksdb_multiget_latency_seconds
777            .with_label_values(&[&self.cf])
778            .start_timer();
779        let perf_ctx = if self.multiget_sample_interval.sample() {
780            Some(RocksDBPerfContext)
781        } else {
782            None
783        };
784        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
785        let results: Result<Vec<_>, TypedStoreError> = self
786            .db
787            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
788            .into_iter()
789            .collect();
790        let entries = results?;
791        let entry_size = entries
792            .iter()
793            .flatten()
794            .map(|entry| entry.len())
795            .sum::<usize>();
796        self.db_metrics
797            .op_metrics
798            .rocksdb_multiget_bytes
799            .with_label_values(&[&self.cf])
800            .observe(entry_size as f64);
801        if perf_ctx.is_some() {
802            self.db_metrics
803                .read_perf_ctx_metrics
804                .report_metrics(&self.cf);
805        }
806        Ok(entries)
807    }
808
809    fn get_rocksdb_int_property(
810        rocksdb: &RocksDB,
811        cf: &impl AsColumnFamilyRef,
812        property_name: &std::ffi::CStr,
813    ) -> Result<i64, TypedStoreError> {
814        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
815            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
816            Ok(None) => Ok(0),
817            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
818        }
819    }
820
821    fn report_rocksdb_metrics(
822        database: &Arc<Database>,
823        cf_name: &str,
824        db_metrics: &Arc<DBMetrics>,
825    ) {
826        let Storage::Rocks(rocksdb) = &database.storage else {
827            return;
828        };
829
830        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
831            tracing::warn!(
832                "unable to report metrics for cf {cf_name:?} in db {:?}",
833                database.db_name()
834            );
835            return;
836        };
837
838        db_metrics
839            .cf_metrics
840            .rocksdb_total_sst_files_size
841            .with_label_values(&[cf_name])
842            .set(
843                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
844                    .unwrap_or(METRICS_ERROR),
845            );
846        db_metrics
847            .cf_metrics
848            .rocksdb_total_blob_files_size
849            .with_label_values(&[cf_name])
850            .set(
851                Self::get_rocksdb_int_property(
852                    rocksdb,
853                    &cf,
854                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
855                )
856                .unwrap_or(METRICS_ERROR),
857            );
858        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
859        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
860        let total_num_files: i64 = (0..=6)
861            .map(|level| {
862                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
863                    .unwrap_or(METRICS_ERROR)
864            })
865            .sum();
866        db_metrics
867            .cf_metrics
868            .rocksdb_total_num_files
869            .with_label_values(&[cf_name])
870            .set(total_num_files);
871        db_metrics
872            .cf_metrics
873            .rocksdb_num_level0_files
874            .with_label_values(&[cf_name])
875            .set(
876                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
877                    .unwrap_or(METRICS_ERROR),
878            );
879        db_metrics
880            .cf_metrics
881            .rocksdb_current_size_active_mem_tables
882            .with_label_values(&[cf_name])
883            .set(
884                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
885                    .unwrap_or(METRICS_ERROR),
886            );
887        db_metrics
888            .cf_metrics
889            .rocksdb_size_all_mem_tables
890            .with_label_values(&[cf_name])
891            .set(
892                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
893                    .unwrap_or(METRICS_ERROR),
894            );
895        db_metrics
896            .cf_metrics
897            .rocksdb_num_snapshots
898            .with_label_values(&[cf_name])
899            .set(
900                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
901                    .unwrap_or(METRICS_ERROR),
902            );
903        db_metrics
904            .cf_metrics
905            .rocksdb_oldest_snapshot_time
906            .with_label_values(&[cf_name])
907            .set(
908                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
909                    .unwrap_or(METRICS_ERROR),
910            );
911        db_metrics
912            .cf_metrics
913            .rocksdb_actual_delayed_write_rate
914            .with_label_values(&[cf_name])
915            .set(
916                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
917                    .unwrap_or(METRICS_ERROR),
918            );
919        db_metrics
920            .cf_metrics
921            .rocksdb_is_write_stopped
922            .with_label_values(&[cf_name])
923            .set(
924                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
925                    .unwrap_or(METRICS_ERROR),
926            );
927        db_metrics
928            .cf_metrics
929            .rocksdb_block_cache_capacity
930            .with_label_values(&[cf_name])
931            .set(
932                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
933                    .unwrap_or(METRICS_ERROR),
934            );
935        db_metrics
936            .cf_metrics
937            .rocksdb_block_cache_usage
938            .with_label_values(&[cf_name])
939            .set(
940                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
941                    .unwrap_or(METRICS_ERROR),
942            );
943        db_metrics
944            .cf_metrics
945            .rocksdb_block_cache_pinned_usage
946            .with_label_values(&[cf_name])
947            .set(
948                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
949                    .unwrap_or(METRICS_ERROR),
950            );
951        db_metrics
952            .cf_metrics
953            .rocksdb_estimate_table_readers_mem
954            .with_label_values(&[cf_name])
955            .set(
956                Self::get_rocksdb_int_property(
957                    rocksdb,
958                    &cf,
959                    properties::ESTIMATE_TABLE_READERS_MEM,
960                )
961                .unwrap_or(METRICS_ERROR),
962            );
963        db_metrics
964            .cf_metrics
965            .rocksdb_estimated_num_keys
966            .with_label_values(&[cf_name])
967            .set(
968                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
969                    .unwrap_or(METRICS_ERROR),
970            );
971        db_metrics
972            .cf_metrics
973            .rocksdb_num_immutable_mem_tables
974            .with_label_values(&[cf_name])
975            .set(
976                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
977                    .unwrap_or(METRICS_ERROR),
978            );
979        db_metrics
980            .cf_metrics
981            .rocksdb_mem_table_flush_pending
982            .with_label_values(&[cf_name])
983            .set(
984                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
985                    .unwrap_or(METRICS_ERROR),
986            );
987        db_metrics
988            .cf_metrics
989            .rocksdb_compaction_pending
990            .with_label_values(&[cf_name])
991            .set(
992                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
993                    .unwrap_or(METRICS_ERROR),
994            );
995        db_metrics
996            .cf_metrics
997            .rocksdb_estimate_pending_compaction_bytes
998            .with_label_values(&[cf_name])
999            .set(
1000                Self::get_rocksdb_int_property(
1001                    rocksdb,
1002                    &cf,
1003                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1004                )
1005                .unwrap_or(METRICS_ERROR),
1006            );
1007        db_metrics
1008            .cf_metrics
1009            .rocksdb_num_running_compactions
1010            .with_label_values(&[cf_name])
1011            .set(
1012                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1013                    .unwrap_or(METRICS_ERROR),
1014            );
1015        db_metrics
1016            .cf_metrics
1017            .rocksdb_num_running_flushes
1018            .with_label_values(&[cf_name])
1019            .set(
1020                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1021                    .unwrap_or(METRICS_ERROR),
1022            );
1023        db_metrics
1024            .cf_metrics
1025            .rocksdb_estimate_oldest_key_time
1026            .with_label_values(&[cf_name])
1027            .set(
1028                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1029                    .unwrap_or(METRICS_ERROR),
1030            );
1031        db_metrics
1032            .cf_metrics
1033            .rocksdb_background_errors
1034            .with_label_values(&[cf_name])
1035            .set(
1036                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1037                    .unwrap_or(METRICS_ERROR),
1038            );
1039        db_metrics
1040            .cf_metrics
1041            .rocksdb_base_level
1042            .with_label_values(&[cf_name])
1043            .set(
1044                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1045                    .unwrap_or(METRICS_ERROR),
1046            );
1047    }
1048
1049    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1050        self.db.checkpoint(path)
1051    }
1052
1053    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1054    where
1055        K: Serialize + DeserializeOwned,
1056        V: Serialize + DeserializeOwned,
1057    {
1058        let mut num_keys = 0;
1059        let mut key_bytes_total = 0;
1060        let mut value_bytes_total = 0;
1061        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1062        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1063        for item in self.safe_iter() {
1064            let (key, value) = item?;
1065            num_keys += 1;
1066            let key_len = be_fix_int_ser(key.borrow()).len();
1067            let value_len = bcs::to_bytes(value.borrow())?.len();
1068            key_bytes_total += key_len;
1069            value_bytes_total += value_len;
1070            key_hist.record(key_len as u64)?;
1071            value_hist.record(value_len as u64)?;
1072        }
1073        Ok(TableSummary {
1074            num_keys,
1075            key_bytes_total,
1076            value_bytes_total,
1077            key_hist,
1078            value_hist,
1079        })
1080    }
1081
1082    fn start_iter_timer(&self) -> HistogramTimer {
1083        self.db_metrics
1084            .op_metrics
1085            .rocksdb_iter_latency_seconds
1086            .with_label_values(&[&self.cf])
1087            .start_timer()
1088    }
1089
1090    // Creates metrics and context for tracking an iterator usage and performance.
1091    fn create_iter_context(
1092        &self,
1093    ) -> (
1094        Option<HistogramTimer>,
1095        Option<Histogram>,
1096        Option<Histogram>,
1097        Option<RocksDBPerfContext>,
1098    ) {
1099        let timer = self.start_iter_timer();
1100        let bytes_scanned = self
1101            .db_metrics
1102            .op_metrics
1103            .rocksdb_iter_bytes
1104            .with_label_values(&[&self.cf]);
1105        let keys_scanned = self
1106            .db_metrics
1107            .op_metrics
1108            .rocksdb_iter_keys
1109            .with_label_values(&[&self.cf]);
1110        let perf_ctx = if self.iter_sample_interval.sample() {
1111            Some(RocksDBPerfContext)
1112        } else {
1113            None
1114        };
1115        (
1116            Some(timer),
1117            Some(bytes_scanned),
1118            Some(keys_scanned),
1119            perf_ctx,
1120        )
1121    }
1122
1123    /// Creates a safe reversed iterator with optional bounds.
1124    /// Both upper bound and lower bound are included.
1125    #[allow(clippy::complexity)]
1126    pub fn reversed_safe_iter_with_bounds(
1127        &self,
1128        lower_bound: Option<K>,
1129        upper_bound: Option<K>,
1130    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1131    where
1132        K: Serialize + DeserializeOwned,
1133        V: Serialize + DeserializeOwned,
1134    {
1135        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1136            lower_bound
1137                .as_ref()
1138                .map(Bound::Included)
1139                .unwrap_or(Bound::Unbounded),
1140            upper_bound
1141                .as_ref()
1142                .map(Bound::Included)
1143                .unwrap_or(Bound::Unbounded),
1144        ));
1145        match &self.db.storage {
1146            Storage::Rocks(db) => {
1147                let readopts = rocks_util::apply_range_bounds(
1148                    self.opts.readopts(),
1149                    it_lower_bound,
1150                    it_upper_bound,
1151                );
1152                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1153                let db_iter = db
1154                    .underlying
1155                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1156                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1157                let iter = SafeIter::new(
1158                    self.cf.clone(),
1159                    db_iter,
1160                    _timer,
1161                    _perf_ctx,
1162                    bytes_scanned,
1163                    keys_scanned,
1164                    Some(self.db_metrics.clone()),
1165                );
1166                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1167            }
1168            Storage::InMemory(db) => {
1169                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1170            }
1171            #[cfg(tidehunter)]
1172            Storage::TideHunter(db) => match &self.column_family {
1173                ColumnFamily::TideHunter((ks, prefix)) => {
1174                    let mut iter = db.iterator(*ks);
1175                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1176                    iter.reverse();
1177                    Ok(Box::new(transform_th_iterator(
1178                        iter,
1179                        prefix,
1180                        self.start_iter_timer(),
1181                    )))
1182                }
1183                _ => unreachable!("storage backend invariant violation"),
1184            },
1185        }
1186    }
1187}
1188
1189pub enum StorageWriteBatch {
1190    Rocks(rocksdb::WriteBatch),
1191    InMemory(InMemoryBatch),
1192    #[cfg(tidehunter)]
1193    TideHunter(tidehunter::batch::WriteBatch),
1194}
1195
1196/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1197/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1198/// so that slices can be produced without any per-entry allocation.
1199struct EntryHeader {
1200    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1201    offset: usize,
1202    cf_name_len: usize,
1203    key_len: usize,
1204    is_put: bool,
1205}
1206
1207/// A write batch that stores serialized operations in a flat byte buffer without
1208/// requiring a database reference. Can be replayed into a real `DBBatch` via
1209/// `DBBatch::concat`.
1210/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1211/// over write batches.
1212#[derive(Default)]
1213pub struct StagedBatch {
1214    data: Vec<u8>,
1215    entries: Vec<EntryHeader>,
1216}
1217
1218impl StagedBatch {
1219    pub fn new() -> Self {
1220        Self {
1221            data: Vec::with_capacity(1024),
1222            entries: Vec::with_capacity(16),
1223        }
1224    }
1225
1226    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1227        &mut self,
1228        db: &DBMap<K, V>,
1229        new_vals: impl IntoIterator<Item = (J, U)>,
1230    ) -> Result<&mut Self, TypedStoreError> {
1231        let cf_name = db.cf_name();
1232        new_vals
1233            .into_iter()
1234            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1235                let offset = self.data.len();
1236                self.data.extend_from_slice(cf_name.as_bytes());
1237                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1238                bcs::serialize_into(&mut self.data, v.borrow())
1239                    .map_err(typed_store_err_from_bcs_err)?;
1240                self.entries.push(EntryHeader {
1241                    offset,
1242                    cf_name_len: cf_name.len(),
1243                    key_len,
1244                    is_put: true,
1245                });
1246                Ok(())
1247            })?;
1248        Ok(self)
1249    }
1250
1251    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1252        &mut self,
1253        db: &DBMap<K, V>,
1254        purged_vals: impl IntoIterator<Item = J>,
1255    ) -> Result<(), TypedStoreError> {
1256        let cf_name = db.cf_name();
1257        purged_vals
1258            .into_iter()
1259            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1260                let offset = self.data.len();
1261                self.data.extend_from_slice(cf_name.as_bytes());
1262                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1263                self.entries.push(EntryHeader {
1264                    offset,
1265                    cf_name_len: cf_name.len(),
1266                    key_len,
1267                    is_put: false,
1268                });
1269                Ok(())
1270            })?;
1271        Ok(())
1272    }
1273
1274    pub fn size_in_bytes(&self) -> usize {
1275        self.data.len()
1276    }
1277}
1278
1279/// Provides a mutable struct to form a collection of database write operations, and execute them.
1280///
1281/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1282///  ie. they are all written or none is.
1283/// This is also true of operations across column families in the same database.
1284///
1285/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1286/// with each operation.
1287///
1288/// ```
1289/// use typed_store::rocks::*;
1290/// use tempfile::tempdir;
1291/// use typed_store::Map;
1292/// use typed_store::metrics::DBMetrics;
1293/// use prometheus::Registry;
1294/// use core::fmt::Error;
1295/// use std::sync::Arc;
1296///
1297/// #[tokio::main]
1298/// async fn main() -> Result<(), Error> {
1299/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1300///
1301/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1302///     .expect("Failed to open storage");
1303/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1304///
1305/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1306///     .expect("Failed to open storage");
1307/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1308///
1309/// let mut batch = db_cf_1.batch();
1310/// batch
1311///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1312///     .expect("Failed to batch insert")
1313///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1314///     .expect("Failed to batch insert");
1315///
1316/// let _ = batch.write().expect("Failed to execute batch");
1317/// for (k, v) in keys_vals_1 {
1318///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1319///     assert_eq!(Some(v), val);
1320/// }
1321///
1322/// for (k, v) in keys_vals_2 {
1323///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1324///     assert_eq!(Some(v), val);
1325/// }
1326/// Ok(())
1327/// }
1328/// ```
1329///
1330pub struct DBBatch {
1331    database: Arc<Database>,
1332    batch: StorageWriteBatch,
1333    db_metrics: Arc<DBMetrics>,
1334    write_sample_interval: SamplingInterval,
1335}
1336
1337impl DBBatch {
1338    /// Create a new batch associated with a DB reference.
1339    ///
1340    /// Use `open_cf` to get the DB reference or an existing open database.
1341    pub fn new(
1342        dbref: &Arc<Database>,
1343        batch: StorageWriteBatch,
1344        db_metrics: &Arc<DBMetrics>,
1345        write_sample_interval: &SamplingInterval,
1346    ) -> Self {
1347        DBBatch {
1348            database: dbref.clone(),
1349            batch,
1350            db_metrics: db_metrics.clone(),
1351            write_sample_interval: write_sample_interval.clone(),
1352        }
1353    }
1354
1355    /// Consume the batch and write its operations to the database
1356    #[instrument(level = "trace", skip_all, err)]
1357    pub fn write(self) -> Result<(), TypedStoreError> {
1358        let mut write_options = rocksdb::WriteOptions::default();
1359
1360        if write_sync_enabled() {
1361            write_options.set_sync(true);
1362        }
1363
1364        self.write_opt(write_options)
1365    }
1366
1367    /// Consume the batch and write its operations to the database with custom write options
1368    #[instrument(level = "trace", skip_all, err)]
1369    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1370        let db_name = self.database.db_name();
1371        let timer = self
1372            .db_metrics
1373            .op_metrics
1374            .rocksdb_batch_commit_latency_seconds
1375            .with_label_values(&[&db_name])
1376            .start_timer();
1377        let batch_size = self.size_in_bytes();
1378
1379        let perf_ctx = if self.write_sample_interval.sample() {
1380            Some(RocksDBPerfContext)
1381        } else {
1382            None
1383        };
1384
1385        self.database
1386            .write_opt_internal(self.batch, &write_options)?;
1387
1388        self.db_metrics
1389            .op_metrics
1390            .rocksdb_batch_commit_bytes
1391            .with_label_values(&[&db_name])
1392            .observe(batch_size as f64);
1393
1394        if perf_ctx.is_some() {
1395            self.db_metrics
1396                .write_perf_ctx_metrics
1397                .report_metrics(&db_name);
1398        }
1399        let elapsed = timer.stop_and_record();
1400        if elapsed > 1.0 {
1401            warn!(?elapsed, ?db_name, "very slow batch write");
1402            self.db_metrics
1403                .op_metrics
1404                .rocksdb_very_slow_batch_writes_count
1405                .with_label_values(&[&db_name])
1406                .inc();
1407            self.db_metrics
1408                .op_metrics
1409                .rocksdb_very_slow_batch_writes_duration_ms
1410                .with_label_values(&[&db_name])
1411                .inc_by((elapsed * 1000.0) as u64);
1412        }
1413        Ok(())
1414    }
1415
1416    pub fn size_in_bytes(&self) -> usize {
1417        match self.batch {
1418            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1419            StorageWriteBatch::InMemory(_) => 0,
1420            // TODO: implement size_in_bytes method
1421            #[cfg(tidehunter)]
1422            StorageWriteBatch::TideHunter(_) => 0,
1423        }
1424    }
1425
1426    /// Replay all operations from `StagedBatch`es into this batch.
1427    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1428        for raw_batch in raw_batches {
1429            let data = &raw_batch.data;
1430            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1431                let end = raw_batch
1432                    .entries
1433                    .get(i + 1)
1434                    .map_or(data.len(), |next| next.offset);
1435                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1436                let key_start = hdr.offset + hdr.cf_name_len;
1437                let key = &data[key_start..key_start + hdr.key_len];
1438                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1439                let cf_name = std::str::from_utf8(cf_bytes)
1440                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1441
1442                if hdr.is_put {
1443                    let value = &data[key_start + hdr.key_len..end];
1444                    match &mut self.batch {
1445                        StorageWriteBatch::Rocks(b) => {
1446                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1447                        }
1448                        StorageWriteBatch::InMemory(b) => {
1449                            b.put_cf(cf_name, key, value);
1450                        }
1451                        #[cfg(tidehunter)]
1452                        _ => {
1453                            return Err(TypedStoreError::RocksDBError(
1454                                "concat not supported for TideHunter".to_string(),
1455                            ));
1456                        }
1457                    }
1458                } else {
1459                    match &mut self.batch {
1460                        StorageWriteBatch::Rocks(b) => {
1461                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1462                        }
1463                        StorageWriteBatch::InMemory(b) => {
1464                            b.delete_cf(cf_name, key);
1465                        }
1466                        #[cfg(tidehunter)]
1467                        _ => {
1468                            return Err(TypedStoreError::RocksDBError(
1469                                "concat not supported for TideHunter".to_string(),
1470                            ));
1471                        }
1472                    }
1473                }
1474            }
1475        }
1476        Ok(self)
1477    }
1478
1479    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1480        &mut self,
1481        db: &DBMap<K, V>,
1482        purged_vals: impl IntoIterator<Item = J>,
1483    ) -> Result<(), TypedStoreError> {
1484        if !Arc::ptr_eq(&db.db, &self.database) {
1485            return Err(TypedStoreError::CrossDBBatch);
1486        }
1487
1488        purged_vals
1489            .into_iter()
1490            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1491                let k_buf = be_fix_int_ser(k.borrow());
1492                match (&mut self.batch, &db.column_family) {
1493                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1494                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1495                    }
1496                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1497                        b.delete_cf(name, k_buf)
1498                    }
1499                    #[cfg(tidehunter)]
1500                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1501                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1502                    }
1503                    _ => Err(TypedStoreError::RocksDBError(
1504                        "typed store invariant violation".to_string(),
1505                    ))?,
1506                }
1507                Ok(())
1508            })?;
1509        Ok(())
1510    }
1511
1512    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1513    /// by writing a range delete tombstone in the db map
1514    /// If the DBMap is configured with ignore_range_deletions set to false,
1515    /// the effect of this write will be visible immediately i.e. you won't
1516    /// see old values when you do a lookup or scan. But if it is configured
1517    /// with ignore_range_deletions set to true, the old value are visible until
1518    /// compaction actually deletes them which will happen sometime after. By
1519    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1520    /// overridden in the config), so please use this function with caution
1521    pub fn schedule_delete_range<K: Serialize, V>(
1522        &mut self,
1523        db: &DBMap<K, V>,
1524        from: &K,
1525        to: &K,
1526    ) -> Result<(), TypedStoreError> {
1527        if !Arc::ptr_eq(&db.db, &self.database) {
1528            return Err(TypedStoreError::CrossDBBatch);
1529        }
1530
1531        let from_buf = be_fix_int_ser(from);
1532        let to_buf = be_fix_int_ser(to);
1533
1534        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1535            b.delete_range_cf(
1536                &rocks_cf_from_db(&self.database, db.cf_name())?,
1537                from_buf,
1538                to_buf,
1539            );
1540        }
1541        Ok(())
1542    }
1543
1544    /// inserts a range of (key, value) pairs given as an iterator
1545    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1546        &mut self,
1547        db: &DBMap<K, V>,
1548        new_vals: impl IntoIterator<Item = (J, U)>,
1549    ) -> Result<&mut Self, TypedStoreError> {
1550        if !Arc::ptr_eq(&db.db, &self.database) {
1551            return Err(TypedStoreError::CrossDBBatch);
1552        }
1553        let mut total = 0usize;
1554        new_vals
1555            .into_iter()
1556            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1557                let k_buf = be_fix_int_ser(k.borrow());
1558                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1559                total += k_buf.len() + v_buf.len();
1560                if db.opts.log_value_hash {
1561                    let key_hash = default_hash(&k_buf);
1562                    let value_hash = default_hash(&v_buf);
1563                    debug!(
1564                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1565                        db.cf_name(),
1566                        key_hash,
1567                        value_hash
1568                    );
1569                }
1570                match (&mut self.batch, &db.column_family) {
1571                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1572                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1573                    }
1574                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1575                        b.put_cf(name, k_buf, v_buf)
1576                    }
1577                    #[cfg(tidehunter)]
1578                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1579                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1580                    }
1581                    _ => Err(TypedStoreError::RocksDBError(
1582                        "typed store invariant violation".to_string(),
1583                    ))?,
1584                }
1585                Ok(())
1586            })?;
1587        self.db_metrics
1588            .op_metrics
1589            .rocksdb_batch_put_bytes
1590            .with_label_values(&[&db.cf])
1591            .observe(total as f64);
1592        Ok(self)
1593    }
1594
1595    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1596        &mut self,
1597        db: &DBMap<K, V>,
1598        new_vals: impl IntoIterator<Item = (J, U)>,
1599    ) -> Result<&mut Self, TypedStoreError> {
1600        if !Arc::ptr_eq(&db.db, &self.database) {
1601            return Err(TypedStoreError::CrossDBBatch);
1602        }
1603        new_vals
1604            .into_iter()
1605            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1606                let k_buf = be_fix_int_ser(k.borrow());
1607                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1608                match &mut self.batch {
1609                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1610                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1611                        k_buf,
1612                        v_buf,
1613                    ),
1614                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1615                }
1616                Ok(())
1617            })?;
1618        Ok(self)
1619    }
1620}
1621
1622impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1623where
1624    K: Serialize + DeserializeOwned,
1625    V: Serialize + DeserializeOwned,
1626{
1627    type Error = TypedStoreError;
1628
1629    #[instrument(level = "trace", skip_all, err)]
1630    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1631        let key_buf = be_fix_int_ser(key);
1632        let readopts = self.opts.readopts();
1633        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1634            && self
1635                .db
1636                .get(&self.column_family, &key_buf, &readopts)?
1637                .is_some())
1638    }
1639
1640    #[instrument(level = "trace", skip_all, err)]
1641    fn multi_contains_keys<J>(
1642        &self,
1643        keys: impl IntoIterator<Item = J>,
1644    ) -> Result<Vec<bool>, Self::Error>
1645    where
1646        J: Borrow<K>,
1647    {
1648        let values = self.multi_get_pinned(keys)?;
1649        Ok(values.into_iter().map(|v| v.is_some()).collect())
1650    }
1651
1652    #[instrument(level = "trace", skip_all, err)]
1653    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1654        let _timer = self
1655            .db_metrics
1656            .op_metrics
1657            .rocksdb_get_latency_seconds
1658            .with_label_values(&[&self.cf])
1659            .start_timer();
1660        let perf_ctx = if self.get_sample_interval.sample() {
1661            Some(RocksDBPerfContext)
1662        } else {
1663            None
1664        };
1665        let key_buf = be_fix_int_ser(key);
1666        let res = self
1667            .db
1668            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1669        self.db_metrics
1670            .op_metrics
1671            .rocksdb_get_bytes
1672            .with_label_values(&[&self.cf])
1673            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1674        if perf_ctx.is_some() {
1675            self.db_metrics
1676                .read_perf_ctx_metrics
1677                .report_metrics(&self.cf);
1678        }
1679        match res {
1680            Some(data) => {
1681                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1682                if value.is_err() {
1683                    let key_hash = default_hash(&key_buf);
1684                    let value_hash = default_hash(&data);
1685                    debug_fatal!(
1686                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1687                        self.cf_name(),
1688                        key_hash,
1689                        value_hash,
1690                        value.as_ref().err().unwrap()
1691                    );
1692                }
1693                Ok(Some(value?))
1694            }
1695            None => Ok(None),
1696        }
1697    }
1698
1699    #[instrument(level = "trace", skip_all, err)]
1700    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1701        let timer = self
1702            .db_metrics
1703            .op_metrics
1704            .rocksdb_put_latency_seconds
1705            .with_label_values(&[&self.cf])
1706            .start_timer();
1707        let perf_ctx = if self.write_sample_interval.sample() {
1708            Some(RocksDBPerfContext)
1709        } else {
1710            None
1711        };
1712        let key_buf = be_fix_int_ser(key);
1713        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1714        self.db_metrics
1715            .op_metrics
1716            .rocksdb_put_bytes
1717            .with_label_values(&[&self.cf])
1718            .observe((key_buf.len() + value_buf.len()) as f64);
1719        if perf_ctx.is_some() {
1720            self.db_metrics
1721                .write_perf_ctx_metrics
1722                .report_metrics(&self.cf);
1723        }
1724        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1725
1726        let elapsed = timer.stop_and_record();
1727        if elapsed > 1.0 {
1728            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1729            self.db_metrics
1730                .op_metrics
1731                .rocksdb_very_slow_puts_count
1732                .with_label_values(&[&self.cf])
1733                .inc();
1734            self.db_metrics
1735                .op_metrics
1736                .rocksdb_very_slow_puts_duration_ms
1737                .with_label_values(&[&self.cf])
1738                .inc_by((elapsed * 1000.0) as u64);
1739        }
1740
1741        Ok(())
1742    }
1743
1744    #[instrument(level = "trace", skip_all, err)]
1745    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1746        let _timer = self
1747            .db_metrics
1748            .op_metrics
1749            .rocksdb_delete_latency_seconds
1750            .with_label_values(&[&self.cf])
1751            .start_timer();
1752        let perf_ctx = if self.write_sample_interval.sample() {
1753            Some(RocksDBPerfContext)
1754        } else {
1755            None
1756        };
1757        let key_buf = be_fix_int_ser(key);
1758        self.db.delete_cf(&self.column_family, key_buf)?;
1759        self.db_metrics
1760            .op_metrics
1761            .rocksdb_deletes
1762            .with_label_values(&[&self.cf])
1763            .inc();
1764        if perf_ctx.is_some() {
1765            self.db_metrics
1766                .write_perf_ctx_metrics
1767                .report_metrics(&self.cf);
1768        }
1769        Ok(())
1770    }
1771
1772    /// Writes a range delete tombstone to delete all entries in the db map
1773    /// If the DBMap is configured with ignore_range_deletions set to false,
1774    /// the effect of this write will be visible immediately i.e. you won't
1775    /// see old values when you do a lookup or scan. But if it is configured
1776    /// with ignore_range_deletions set to true, the old value are visible until
1777    /// compaction actually deletes them which will happen sometime after. By
1778    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1779    /// overridden in the config), so please use this function with caution
1780    #[instrument(level = "trace", skip_all, err)]
1781    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1782        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1783        let last_key = self
1784            .reversed_safe_iter_with_bounds(None, None)?
1785            .next()
1786            .transpose()?
1787            .map(|(k, _v)| k);
1788        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1789            let mut batch = self.batch();
1790            batch.schedule_delete_range(self, &first_key, &last_key)?;
1791            batch.write()?;
1792        }
1793        Ok(())
1794    }
1795
1796    fn is_empty(&self) -> bool {
1797        self.safe_iter().next().is_none()
1798    }
1799
1800    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1801        match &self.db.storage {
1802            Storage::Rocks(db) => {
1803                let db_iter = db
1804                    .underlying
1805                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1806                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1807                Box::new(SafeIter::new(
1808                    self.cf.clone(),
1809                    db_iter,
1810                    _timer,
1811                    _perf_ctx,
1812                    bytes_scanned,
1813                    keys_scanned,
1814                    Some(self.db_metrics.clone()),
1815                ))
1816            }
1817            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1818            #[cfg(tidehunter)]
1819            Storage::TideHunter(db) => match &self.column_family {
1820                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1821                    db.iterator(*ks),
1822                    prefix,
1823                    self.start_iter_timer(),
1824                )),
1825                _ => unreachable!("storage backend invariant violation"),
1826            },
1827        }
1828    }
1829
1830    fn safe_iter_with_bounds(
1831        &'a self,
1832        lower_bound: Option<K>,
1833        upper_bound: Option<K>,
1834    ) -> DbIterator<'a, (K, V)> {
1835        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1836        match &self.db.storage {
1837            Storage::Rocks(db) => {
1838                let readopts =
1839                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1840                let db_iter = db
1841                    .underlying
1842                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1843                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1844                Box::new(SafeIter::new(
1845                    self.cf.clone(),
1846                    db_iter,
1847                    _timer,
1848                    _perf_ctx,
1849                    bytes_scanned,
1850                    keys_scanned,
1851                    Some(self.db_metrics.clone()),
1852                ))
1853            }
1854            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1855            #[cfg(tidehunter)]
1856            Storage::TideHunter(db) => match &self.column_family {
1857                ColumnFamily::TideHunter((ks, prefix)) => {
1858                    let mut iter = db.iterator(*ks);
1859                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1860                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1861                }
1862                _ => unreachable!("storage backend invariant violation"),
1863            },
1864        }
1865    }
1866
1867    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1868        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1869        match &self.db.storage {
1870            Storage::Rocks(db) => {
1871                let readopts =
1872                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1873                let db_iter = db
1874                    .underlying
1875                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1876                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1877                Box::new(SafeIter::new(
1878                    self.cf.clone(),
1879                    db_iter,
1880                    _timer,
1881                    _perf_ctx,
1882                    bytes_scanned,
1883                    keys_scanned,
1884                    Some(self.db_metrics.clone()),
1885                ))
1886            }
1887            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1888            #[cfg(tidehunter)]
1889            Storage::TideHunter(db) => match &self.column_family {
1890                ColumnFamily::TideHunter((ks, prefix)) => {
1891                    let mut iter = db.iterator(*ks);
1892                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1893                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1894                }
1895                _ => unreachable!("storage backend invariant violation"),
1896            },
1897        }
1898    }
1899
1900    /// Returns a vector of values corresponding to the keys provided.
1901    #[instrument(level = "trace", skip_all, err)]
1902    fn multi_get<J>(
1903        &self,
1904        keys: impl IntoIterator<Item = J>,
1905    ) -> Result<Vec<Option<V>>, TypedStoreError>
1906    where
1907        J: Borrow<K>,
1908    {
1909        let results = self.multi_get_pinned(keys)?;
1910        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1911            .into_iter()
1912            .map(|value_byte| match value_byte {
1913                Some(data) => Ok(Some(
1914                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1915                )),
1916                None => Ok(None),
1917            })
1918            .collect();
1919
1920        values_parsed
1921    }
1922
1923    /// Convenience method for batch insertion
1924    #[instrument(level = "trace", skip_all, err)]
1925    fn multi_insert<J, U>(
1926        &self,
1927        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1928    ) -> Result<(), Self::Error>
1929    where
1930        J: Borrow<K>,
1931        U: Borrow<V>,
1932    {
1933        let mut batch = self.batch();
1934        batch.insert_batch(self, key_val_pairs)?;
1935        batch.write()
1936    }
1937
1938    /// Convenience method for batch removal
1939    #[instrument(level = "trace", skip_all, err)]
1940    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1941    where
1942        J: Borrow<K>,
1943    {
1944        let mut batch = self.batch();
1945        batch.delete_batch(self, keys)?;
1946        batch.write()
1947    }
1948
1949    /// Try to catch up with primary when running as secondary
1950    #[instrument(level = "trace", skip_all, err)]
1951    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1952        if let Storage::Rocks(rocks) = &self.db.storage {
1953            rocks
1954                .underlying
1955                .try_catch_up_with_primary()
1956                .map_err(typed_store_err_from_rocks_err)?;
1957        }
1958        Ok(())
1959    }
1960}
1961
1962/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1963#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1964pub fn open_cf_opts<P: AsRef<Path>>(
1965    path: P,
1966    db_options: Option<rocksdb::Options>,
1967    metric_conf: MetricConf,
1968    opt_cfs: &[(&str, rocksdb::Options)],
1969) -> Result<Arc<Database>, TypedStoreError> {
1970    let path = path.as_ref();
1971    ensure_database_type(path, StorageType::Rocks)
1972        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1973    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1974    // because rocksdb uses the simulated clock when creating its background threads, but then
1975    // those threads see the real wall clock (because they are not the test thread), which causes
1976    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1977    // resolves the issue.
1978    //
1979    // This is a no-op in non-simulator builds.
1980
1981    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1982    nondeterministic!({
1983        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1984        options.create_if_missing(true);
1985        options.create_missing_column_families(true);
1986        let rocksdb = {
1987            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1988                &options,
1989                path,
1990                cfs.into_iter()
1991                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1992            )
1993            .map_err(typed_store_err_from_rocks_err)?
1994        };
1995        Ok(Arc::new(Database::new(
1996            Storage::Rocks(RocksDB {
1997                underlying: rocksdb,
1998            }),
1999            metric_conf,
2000            None,
2001        )))
2002    })
2003}
2004
2005/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
2006pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2007    primary_path: P,
2008    secondary_path: Option<P>,
2009    db_options: Option<rocksdb::Options>,
2010    metric_conf: MetricConf,
2011    opt_cfs: &[(&str, rocksdb::Options)],
2012) -> Result<Arc<Database>, TypedStoreError> {
2013    let primary_path = primary_path.as_ref();
2014    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2015    // See comment above for explanation of why nondeterministic is necessary here.
2016    nondeterministic!({
2017        // Customize database options
2018        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2019
2020        fdlimit::raise_fd_limit();
2021        // This is a requirement by RocksDB when opening as secondary
2022        options.set_max_open_files(-1);
2023
2024        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2025        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2026            .ok()
2027            .unwrap_or_default();
2028
2029        let default_db_options = default_db_options();
2030        // Add CFs not explicitly listed
2031        for cf_key in cfs.iter() {
2032            if !opt_cfs.contains_key(&cf_key[..]) {
2033                opt_cfs.insert(cf_key, default_db_options.options.clone());
2034            }
2035        }
2036
2037        let primary_path = primary_path.to_path_buf();
2038        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2039            let mut s = primary_path.clone();
2040            s.pop();
2041            s.push("SECONDARY");
2042            s.as_path().to_path_buf()
2043        });
2044
2045        ensure_database_type(&primary_path, StorageType::Rocks)
2046            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2047        ensure_database_type(&secondary_path, StorageType::Rocks)
2048            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2049
2050        let rocksdb = {
2051            options.create_if_missing(true);
2052            options.create_missing_column_families(true);
2053            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2054                &options,
2055                &primary_path,
2056                &secondary_path,
2057                opt_cfs
2058                    .iter()
2059                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2060            )
2061            .map_err(typed_store_err_from_rocks_err)?;
2062            db.try_catch_up_with_primary()
2063                .map_err(typed_store_err_from_rocks_err)?;
2064            db
2065        };
2066        Ok(Arc::new(Database::new(
2067            Storage::Rocks(RocksDB {
2068                underlying: rocksdb,
2069            }),
2070            metric_conf,
2071            None,
2072        )))
2073    })
2074}
2075
2076// Drops a database if there is no other handle to it, with retries and timeout.
2077// Detects the storage variant (RocksDB vs. tidehunter) from the directory
2078// contents and dispatches to the matching cleanup. Both variants coexist in
2079// tidehunter builds — some stores (e.g. rpc-index) are pure RocksDB even when
2080// the tidehunter feature is enabled elsewhere in the binary.
2081pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2082    #[cfg(tidehunter)]
2083    if is_tidehunter_db(&path) {
2084        return safe_drop_tidehunter_db(path, timeout).await;
2085    }
2086    safe_drop_rocksdb(path, timeout).await
2087}
2088
2089async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2090    let mut backoff = backoff::ExponentialBackoff {
2091        max_elapsed_time: Some(timeout),
2092        ..Default::default()
2093    };
2094    loop {
2095        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2096            Ok(()) => return Ok(()),
2097            Err(err) => match backoff.next_backoff() {
2098                Some(duration) => tokio::time::sleep(duration).await,
2099                None => return Err(std::io::Error::other(err)),
2100            },
2101        }
2102    }
2103}
2104
2105#[cfg(tidehunter)]
2106fn is_tidehunter_db(path: &Path) -> bool {
2107    // `shape.yaml` is written by TideHunterDb::open and is the canonical marker
2108    // for a tidehunter DB directory; RocksDB never creates it.
2109    path.join("shape.yaml").exists()
2110}
2111
2112#[cfg(tidehunter)]
2113async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2114    let mut backoff = backoff::ExponentialBackoff {
2115        max_elapsed_time: Some(timeout),
2116        ..Default::default()
2117    };
2118    loop {
2119        match TideHunterDb::drop_db(&path) {
2120            Ok(()) => return Ok(()),
2121            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2122                match backoff.next_backoff() {
2123                    Some(duration) => tokio::time::sleep(duration).await,
2124                    None => {
2125                        warn!(
2126                            "Database at {:?} is still locked after timeout ({:?})",
2127                            path, timeout
2128                        );
2129                        return Err(err);
2130                    }
2131                }
2132            }
2133            Err(err) => return Err(err),
2134        }
2135    }
2136}
2137
2138fn populate_missing_cfs(
2139    input_cfs: &[(&str, rocksdb::Options)],
2140    path: &Path,
2141) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2142    let mut cfs = vec![];
2143    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2144    let existing_cfs =
2145        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2146            .ok()
2147            .unwrap_or_default();
2148
2149    for cf_name in existing_cfs {
2150        if !input_cf_index.contains(&cf_name[..]) {
2151            cfs.push((cf_name, rocksdb::Options::default()));
2152        }
2153    }
2154    cfs.extend(
2155        input_cfs
2156            .iter()
2157            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2158    );
2159    Ok(cfs)
2160}
2161
2162fn default_hash(value: &[u8]) -> Digest<32> {
2163    let mut hasher = fastcrypto::hash::Blake2b256::default();
2164    hasher.update(value);
2165    hasher.finalize()
2166}