typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    /// Returns whether `key` exists, without materializing the value. On tidehunter
231    /// this uses the native `exists` (an index/bloom presence check), which avoids
232    /// the value-record read that `get` performs and is therefore much cheaper.
233    fn contains(
234        &self,
235        cf: &ColumnFamily,
236        key: &[u8],
237        readopts: &ReadOptions,
238    ) -> Result<bool, TypedStoreError> {
239        match (&self.storage, cf) {
240            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
241                let rocks_cf = cf.rocks_cf(db);
242                // `key_may_exist_cf_opt` can return false positives but never false
243                // negatives, so it short-circuits the common absent case before the
244                // real point lookup.
245                Ok(db.underlying.key_may_exist_cf_opt(&rocks_cf, key, readopts)
246                    && db
247                        .underlying
248                        .get_pinned_cf_opt(&rocks_cf, key, readopts)
249                        .map_err(typed_store_err_from_rocks_err)?
250                        .is_some())
251            }
252            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
253                Ok(db.get(cf_name, key).is_some())
254            }
255            #[cfg(tidehunter)]
256            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
257                .exists(*ks, &transform_th_key(key, prefix))
258                .map_err(typed_store_error_from_th_error),
259            _ => Err(TypedStoreError::RocksDBError(
260                "typed store invariant violation".to_string(),
261            )),
262        }
263    }
264
265    /// Multi-key variant of [`Self::contains`]. On tidehunter each key uses the
266    /// native `exists` presence check, avoiding the value-record reads that
267    /// `multi_get` performs. Other backends answer via `multi_get`.
268    fn multi_contains<I, K>(
269        &self,
270        cf: &ColumnFamily,
271        keys: I,
272        readopts: &ReadOptions,
273    ) -> Result<Vec<bool>, TypedStoreError>
274    where
275        I: IntoIterator<Item = K>,
276        K: AsRef<[u8]>,
277    {
278        match (&self.storage, cf) {
279            #[cfg(tidehunter)]
280            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
281                .into_iter()
282                .map(|k| {
283                    db.exists(*ks, &transform_th_key(k.as_ref(), prefix))
284                        .map_err(typed_store_error_from_th_error)
285                })
286                .collect(),
287            _ => self
288                .multi_get(cf, keys, readopts)
289                .into_iter()
290                .map(|r| r.map(|v| v.is_some()))
291                .collect(),
292        }
293    }
294
295    fn multi_get<I, K>(
296        &self,
297        cf: &ColumnFamily,
298        keys: I,
299        readopts: &ReadOptions,
300    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
301    where
302        I: IntoIterator<Item = K>,
303        K: AsRef<[u8]>,
304    {
305        match (&self.storage, cf) {
306            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
307                let keys_vec: Vec<K> = keys.into_iter().collect();
308                let res = db.underlying.batched_multi_get_cf_opt(
309                    &cf.rocks_cf(db),
310                    keys_vec.iter(),
311                    /* sorted_input */ false,
312                    readopts,
313                );
314                res.into_iter()
315                    .map(|r| {
316                        r.map_err(typed_store_err_from_rocks_err)
317                            .map(|item| item.map(GetResult::Rocks))
318                    })
319                    .collect()
320            }
321            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
322                .multi_get(cf_name, keys)
323                .into_iter()
324                .map(|r| Ok(r.map(GetResult::InMemory)))
325                .collect(),
326            #[cfg(tidehunter)]
327            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
328                .into_iter()
329                .map(|k| {
330                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
331                        .map_err(typed_store_error_from_th_error)
332                        .map(|item| item.map(|bytes| GetResult::TideHunter(bytes.into_owned())))
333                })
334                .collect(),
335            _ => unreachable!("typed store invariant violation"),
336        }
337    }
338
339    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
340        match &self.storage {
341            Storage::Rocks(db) => db.underlying.drop_cf(name),
342            Storage::InMemory(db) => {
343                db.drop_cf(name);
344                Ok(())
345            }
346            #[cfg(tidehunter)]
347            Storage::TideHunter(_) => {
348                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
349            }
350        }
351    }
352
353    pub fn delete_file_in_range<K: AsRef<[u8]>>(
354        &self,
355        cf: &impl AsColumnFamilyRef,
356        from: K,
357        to: K,
358    ) -> Result<(), rocksdb::Error> {
359        match &self.storage {
360            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
361            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
362        }
363    }
364
365    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
366        fail_point!("delete-cf-before");
367        let ret = match (&self.storage, cf) {
368            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
369                .underlying
370                .delete_cf(&cf.rocks_cf(db), key)
371                .map_err(typed_store_err_from_rocks_err),
372            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
373                db.delete(cf_name, key.as_ref());
374                Ok(())
375            }
376            #[cfg(tidehunter)]
377            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
378                .remove(*ks, transform_th_key(key.as_ref(), prefix))
379                .map_err(typed_store_error_from_th_error),
380            _ => Err(TypedStoreError::RocksDBError(
381                "typed store invariant violation".to_string(),
382            )),
383        };
384        fail_point!("delete-cf-after");
385        #[allow(clippy::let_and_return)]
386        ret
387    }
388
389    pub fn path_for_pruning(&self) -> &Path {
390        match &self.storage {
391            Storage::Rocks(rocks) => rocks.underlying.path(),
392            _ => unimplemented!("method is only supported for rocksdb backend"),
393        }
394    }
395
396    fn put_cf(
397        &self,
398        cf: &ColumnFamily,
399        key: Vec<u8>,
400        value: Vec<u8>,
401    ) -> Result<(), TypedStoreError> {
402        fail_point!("put-cf-before");
403        let ret = match (&self.storage, cf) {
404            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
405                .underlying
406                .put_cf(&cf.rocks_cf(db), key, value)
407                .map_err(typed_store_err_from_rocks_err),
408            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
409                db.put(cf_name, key, value);
410                Ok(())
411            }
412            #[cfg(tidehunter)]
413            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
414                .insert(*ks, transform_th_key(&key, prefix), value)
415                .map_err(typed_store_error_from_th_error),
416            _ => Err(TypedStoreError::RocksDBError(
417                "typed store invariant violation".to_string(),
418            )),
419        };
420        fail_point!("put-cf-after");
421        #[allow(clippy::let_and_return)]
422        ret
423    }
424
425    pub(crate) fn write_opt_internal(
426        &self,
427        batch: StorageWriteBatch,
428        write_options: &rocksdb::WriteOptions,
429    ) -> Result<(), TypedStoreError> {
430        fail_point!("batch-write-before");
431        let ret = match (&self.storage, batch) {
432            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
433                .underlying
434                .write_opt(batch, write_options)
435                .map_err(typed_store_err_from_rocks_err),
436            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
437                // InMemory doesn't support write options
438                db.write(batch);
439                Ok(())
440            }
441            #[cfg(tidehunter)]
442            (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
443                // TideHunter doesn't support write options
444                batch.commit().map_err(typed_store_error_from_th_error)
445            }
446            _ => Err(TypedStoreError::RocksDBError(
447                "using invalid batch type for the database".to_string(),
448            )),
449        };
450        fail_point!("batch-write-after");
451        #[allow(clippy::let_and_return)]
452        ret
453    }
454
455    #[cfg(tidehunter)]
456    pub fn start_relocation(&self) -> anyhow::Result<()> {
457        if let Storage::TideHunter(db) = &self.storage {
458            db.start_relocation()?;
459        }
460        Ok(())
461    }
462
463    #[cfg(tidehunter)]
464    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
465        if let Storage::TideHunter(db) = &self.storage {
466            db.force_rebuild_control_region()
467                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
468        }
469        Ok(())
470    }
471
472    /// Wait for tidehunter background threads to finish.
473    ///
474    /// Consumes the `Arc<Database>`. Caller must ensure no other clones of this
475    /// `Arc<Database>` (e.g. via `DBMap::db`) are alive — otherwise the inner
476    /// `Arc<TideHunterDb>` strong count will not reach zero and the wait will
477    /// poll until it panics.
478    #[cfg(tidehunter)]
479    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
480        let strong = Arc::strong_count(&self);
481        if strong != 1 {
482            println!(
483                "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
484                strong,
485            );
486        }
487        let Storage::TideHunter(th_arc) = &self.storage else {
488            return;
489        };
490        let th_arc = th_arc.clone();
491        drop(self);
492        th_arc.wait_for_background_threads_to_finish();
493    }
494
495    #[cfg(tidehunter)]
496    pub fn drop_cells_in_range(
497        &self,
498        ks: KeySpace,
499        from_inclusive: &[u8],
500        to_inclusive: &[u8],
501    ) -> anyhow::Result<()> {
502        if let Storage::TideHunter(db) = &self.storage {
503            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
504                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
505        } else {
506            panic!("drop_cells_in_range called on non-TideHunter storage");
507        }
508        Ok(())
509    }
510
511    pub fn compact_range_cf<K: AsRef<[u8]>>(
512        &self,
513        cf_name: &str,
514        start: Option<K>,
515        end: Option<K>,
516    ) {
517        if let Storage::Rocks(rocksdb) = &self.storage {
518            rocksdb
519                .underlying
520                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
521        }
522    }
523
524    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
525        // TODO: implement for other storage types
526        if let Storage::Rocks(rocks) = &self.storage {
527            let checkpoint =
528                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
529            checkpoint
530                .create_checkpoint(path)
531                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
532        }
533        Ok(())
534    }
535
536    pub fn get_sampling_interval(&self) -> SamplingInterval {
537        self.metric_conf.read_sample_interval.new_from_self()
538    }
539
540    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
541        self.metric_conf.read_sample_interval.new_from_self()
542    }
543
544    pub fn write_sampling_interval(&self) -> SamplingInterval {
545        self.metric_conf.write_sample_interval.new_from_self()
546    }
547
548    pub fn iter_sampling_interval(&self) -> SamplingInterval {
549        self.metric_conf.iter_sample_interval.new_from_self()
550    }
551
552    fn db_name(&self) -> String {
553        let name = &self.metric_conf.db_name;
554        if name.is_empty() {
555            "default".to_string()
556        } else {
557            name.clone()
558        }
559    }
560
561    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
562        match &self.storage {
563            Storage::Rocks(rocks) => rocks.underlying.live_files(),
564            _ => Ok(vec![]),
565        }
566    }
567}
568
569fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
570    rocks_db
571        .underlying
572        .cf_handle(cf_name)
573        .expect("Map-keying column family should have been checked at DB creation")
574}
575
576fn rocks_cf_from_db<'a>(
577    db: &'a Database,
578    cf_name: &str,
579) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
580    match &db.storage {
581        Storage::Rocks(rocksdb) => Ok(rocksdb
582            .underlying
583            .cf_handle(cf_name)
584            .expect("Map-keying column family should have been checked at DB creation")),
585        _ => Err(TypedStoreError::RocksDBError(
586            "using invalid batch type for the database".to_string(),
587        )),
588    }
589}
590
591#[derive(Debug, Default)]
592pub struct MetricConf {
593    pub db_name: String,
594    pub read_sample_interval: SamplingInterval,
595    pub write_sample_interval: SamplingInterval,
596    pub iter_sample_interval: SamplingInterval,
597    /// When true and the database is opened with the tidehunter backend, each
598    /// committed `WriteBatch` is written as a single lz4-compressed WAL entry.
599    pub enable_th_batch_compression: bool,
600}
601
602impl MetricConf {
603    pub fn new(db_name: &str) -> Self {
604        if db_name.is_empty() {
605            error!("A meaningful db name should be used for metrics reporting.")
606        }
607        Self {
608            db_name: db_name.to_string(),
609            read_sample_interval: SamplingInterval::default(),
610            write_sample_interval: SamplingInterval::default(),
611            iter_sample_interval: SamplingInterval::default(),
612            enable_th_batch_compression: false,
613        }
614    }
615
616    pub fn with_sampling(mut self, read_interval: SamplingInterval) -> Self {
617        self.read_sample_interval = read_interval;
618        self
619    }
620
621    pub fn with_th_batch_compression(mut self) -> Self {
622        self.enable_th_batch_compression = true;
623        self
624    }
625}
626const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
627const METRICS_ERROR: i64 = -1;
628
629/// An interface to a rocksDB database, keyed by a columnfamily
630#[derive(Clone, Debug)]
631pub struct DBMap<K, V> {
632    pub db: Arc<Database>,
633    _phantom: PhantomData<fn(K) -> V>,
634    column_family: ColumnFamily,
635    // the column family under which the map is stored
636    cf: String,
637    pub opts: ReadWriteOptions,
638    db_metrics: Arc<DBMetrics>,
639    get_sample_interval: SamplingInterval,
640    multiget_sample_interval: SamplingInterval,
641    write_sample_interval: SamplingInterval,
642    iter_sample_interval: SamplingInterval,
643    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
644}
645
646unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
647
648impl<K, V> DBMap<K, V> {
649    pub(crate) fn new(
650        db: Arc<Database>,
651        opts: &ReadWriteOptions,
652        opt_cf: &str,
653        column_family: ColumnFamily,
654        is_deprecated: bool,
655    ) -> Self {
656        let db_cloned = Arc::downgrade(&db.clone());
657        let db_metrics = DBMetrics::get();
658        let db_metrics_cloned = db_metrics.clone();
659        let cf = opt_cf.to_string();
660
661        let (sender, mut recv) = tokio::sync::oneshot::channel();
662        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
663            tokio::task::spawn(async move {
664                let mut interval =
665                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
666                loop {
667                    tokio::select! {
668                        _ = interval.tick() => {
669                            if let Some(db) = db_cloned.upgrade() {
670                                let cf = cf.clone();
671                                let db_metrics = db_metrics.clone();
672                                if let Err(e) = tokio::task::spawn_blocking(move || {
673                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
674                                }).await {
675                                    error!("Failed to log metrics with error: {}", e);
676                                }
677                            }
678                        }
679                        _ = &mut recv => break,
680                    }
681                }
682                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
683            });
684        }
685        DBMap {
686            db: db.clone(),
687            opts: opts.clone(),
688            _phantom: PhantomData,
689            column_family,
690            cf: opt_cf.to_string(),
691            db_metrics: db_metrics_cloned,
692            _metrics_task_cancel_handle: Arc::new(sender),
693            get_sample_interval: db.get_sampling_interval(),
694            multiget_sample_interval: db.multiget_sampling_interval(),
695            write_sample_interval: db.write_sampling_interval(),
696            iter_sample_interval: db.iter_sampling_interval(),
697        }
698    }
699
700    /// Reopens an open database as a typed map operating under a specific column family.
701    /// if no column family is passed, the default column family is used.
702    #[instrument(level = "debug", skip(db), err)]
703    pub fn reopen(
704        db: &Arc<Database>,
705        opt_cf: Option<&str>,
706        rw_options: &ReadWriteOptions,
707        is_deprecated: bool,
708    ) -> Result<Self, TypedStoreError> {
709        let cf_key = opt_cf
710            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
711            .to_owned();
712        Ok(DBMap::new(
713            db.clone(),
714            rw_options,
715            &cf_key,
716            ColumnFamily::Rocks(cf_key.to_string()),
717            is_deprecated,
718        ))
719    }
720
721    #[cfg(tidehunter)]
722    pub fn reopen_th(
723        db: Arc<Database>,
724        cf_name: &str,
725        ks: KeySpace,
726        prefix: Option<Vec<u8>>,
727    ) -> Self {
728        DBMap::new(
729            db,
730            &ReadWriteOptions::default(),
731            cf_name,
732            ColumnFamily::TideHunter((ks, prefix.clone())),
733            false,
734        )
735    }
736
737    pub fn cf_name(&self) -> &str {
738        &self.cf
739    }
740
741    pub fn batch(&self) -> DBBatch {
742        let batch = match &self.db.storage {
743            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
744            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
745            #[cfg(tidehunter)]
746            Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
747        };
748        DBBatch::new(
749            &self.db,
750            batch,
751            &self.db_metrics,
752            &self.write_sample_interval,
753        )
754    }
755
756    pub fn flush(&self) -> Result<(), TypedStoreError> {
757        self.db.flush()
758    }
759
760    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
761        let from_buf = be_fix_int_ser(start);
762        let to_buf = be_fix_int_ser(end);
763        self.db
764            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
765        Ok(())
766    }
767
768    pub fn compact_range_raw(
769        &self,
770        cf_name: &str,
771        start: Vec<u8>,
772        end: Vec<u8>,
773    ) -> Result<(), TypedStoreError> {
774        self.db.compact_range_cf(cf_name, Some(start), Some(end));
775        Ok(())
776    }
777
778    #[cfg(tidehunter)]
779    pub fn drop_cells_in_range<J: Serialize>(
780        &self,
781        from_inclusive: &J,
782        to_inclusive: &J,
783    ) -> Result<(), TypedStoreError>
784    where
785        K: Serialize,
786    {
787        let from_buf = be_fix_int_ser(from_inclusive);
788        let to_buf = be_fix_int_ser(to_inclusive);
789        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
790            self.db
791                .drop_cells_in_range(*ks, &from_buf, &to_buf)
792                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
793        }
794        Ok(())
795    }
796
797    #[cfg(tidehunter)]
798    pub fn drop_cells_in_range_raw(
799        &self,
800        from_inclusive: &[u8],
801        to_inclusive: &[u8],
802    ) -> Result<(), TypedStoreError> {
803        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
804            self.db
805                .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
806                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
807        }
808        Ok(())
809    }
810
811    /// Returns a vector of raw values corresponding to the keys provided.
812    fn multi_get_pinned<J>(
813        &self,
814        keys: impl IntoIterator<Item = J>,
815    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
816    where
817        J: Borrow<K>,
818        K: Serialize,
819    {
820        let _timer = self
821            .db_metrics
822            .op_metrics
823            .rocksdb_multiget_latency_seconds
824            .with_label_values(&[&self.cf])
825            .start_timer();
826        let perf_ctx = if self.multiget_sample_interval.sample() {
827            Some(RocksDBPerfContext)
828        } else {
829            None
830        };
831        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
832        let results: Result<Vec<_>, TypedStoreError> = self
833            .db
834            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
835            .into_iter()
836            .collect();
837        let entries = results?;
838        let entry_size = entries
839            .iter()
840            .flatten()
841            .map(|entry| entry.len())
842            .sum::<usize>();
843        self.db_metrics
844            .op_metrics
845            .rocksdb_multiget_bytes
846            .with_label_values(&[&self.cf])
847            .observe(entry_size as f64);
848        if perf_ctx.is_some() {
849            self.db_metrics
850                .read_perf_ctx_metrics
851                .report_metrics(&self.cf);
852        }
853        Ok(entries)
854    }
855
856    fn get_rocksdb_int_property(
857        rocksdb: &RocksDB,
858        cf: &impl AsColumnFamilyRef,
859        property_name: &std::ffi::CStr,
860    ) -> Result<i64, TypedStoreError> {
861        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
862            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
863            Ok(None) => Ok(0),
864            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
865        }
866    }
867
868    fn report_rocksdb_metrics(
869        database: &Arc<Database>,
870        cf_name: &str,
871        db_metrics: &Arc<DBMetrics>,
872    ) {
873        let Storage::Rocks(rocksdb) = &database.storage else {
874            return;
875        };
876
877        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
878            tracing::warn!(
879                "unable to report metrics for cf {cf_name:?} in db {:?}",
880                database.db_name()
881            );
882            return;
883        };
884
885        db_metrics
886            .cf_metrics
887            .rocksdb_total_sst_files_size
888            .with_label_values(&[cf_name])
889            .set(
890                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
891                    .unwrap_or(METRICS_ERROR),
892            );
893        db_metrics
894            .cf_metrics
895            .rocksdb_total_blob_files_size
896            .with_label_values(&[cf_name])
897            .set(
898                Self::get_rocksdb_int_property(
899                    rocksdb,
900                    &cf,
901                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
902                )
903                .unwrap_or(METRICS_ERROR),
904            );
905        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
906        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
907        let total_num_files: i64 = (0..=6)
908            .map(|level| {
909                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
910                    .unwrap_or(METRICS_ERROR)
911            })
912            .sum();
913        db_metrics
914            .cf_metrics
915            .rocksdb_total_num_files
916            .with_label_values(&[cf_name])
917            .set(total_num_files);
918        db_metrics
919            .cf_metrics
920            .rocksdb_num_level0_files
921            .with_label_values(&[cf_name])
922            .set(
923                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
924                    .unwrap_or(METRICS_ERROR),
925            );
926        db_metrics
927            .cf_metrics
928            .rocksdb_current_size_active_mem_tables
929            .with_label_values(&[cf_name])
930            .set(
931                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
932                    .unwrap_or(METRICS_ERROR),
933            );
934        db_metrics
935            .cf_metrics
936            .rocksdb_size_all_mem_tables
937            .with_label_values(&[cf_name])
938            .set(
939                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
940                    .unwrap_or(METRICS_ERROR),
941            );
942        db_metrics
943            .cf_metrics
944            .rocksdb_num_snapshots
945            .with_label_values(&[cf_name])
946            .set(
947                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
948                    .unwrap_or(METRICS_ERROR),
949            );
950        db_metrics
951            .cf_metrics
952            .rocksdb_oldest_snapshot_time
953            .with_label_values(&[cf_name])
954            .set(
955                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
956                    .unwrap_or(METRICS_ERROR),
957            );
958        db_metrics
959            .cf_metrics
960            .rocksdb_actual_delayed_write_rate
961            .with_label_values(&[cf_name])
962            .set(
963                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
964                    .unwrap_or(METRICS_ERROR),
965            );
966        db_metrics
967            .cf_metrics
968            .rocksdb_is_write_stopped
969            .with_label_values(&[cf_name])
970            .set(
971                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
972                    .unwrap_or(METRICS_ERROR),
973            );
974        db_metrics
975            .cf_metrics
976            .rocksdb_block_cache_capacity
977            .with_label_values(&[cf_name])
978            .set(
979                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
980                    .unwrap_or(METRICS_ERROR),
981            );
982        db_metrics
983            .cf_metrics
984            .rocksdb_block_cache_usage
985            .with_label_values(&[cf_name])
986            .set(
987                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
988                    .unwrap_or(METRICS_ERROR),
989            );
990        db_metrics
991            .cf_metrics
992            .rocksdb_block_cache_pinned_usage
993            .with_label_values(&[cf_name])
994            .set(
995                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
996                    .unwrap_or(METRICS_ERROR),
997            );
998        db_metrics
999            .cf_metrics
1000            .rocksdb_estimate_table_readers_mem
1001            .with_label_values(&[cf_name])
1002            .set(
1003                Self::get_rocksdb_int_property(
1004                    rocksdb,
1005                    &cf,
1006                    properties::ESTIMATE_TABLE_READERS_MEM,
1007                )
1008                .unwrap_or(METRICS_ERROR),
1009            );
1010        db_metrics
1011            .cf_metrics
1012            .rocksdb_estimated_num_keys
1013            .with_label_values(&[cf_name])
1014            .set(
1015                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1016                    .unwrap_or(METRICS_ERROR),
1017            );
1018        db_metrics
1019            .cf_metrics
1020            .rocksdb_num_immutable_mem_tables
1021            .with_label_values(&[cf_name])
1022            .set(
1023                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1024                    .unwrap_or(METRICS_ERROR),
1025            );
1026        db_metrics
1027            .cf_metrics
1028            .rocksdb_mem_table_flush_pending
1029            .with_label_values(&[cf_name])
1030            .set(
1031                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1032                    .unwrap_or(METRICS_ERROR),
1033            );
1034        db_metrics
1035            .cf_metrics
1036            .rocksdb_compaction_pending
1037            .with_label_values(&[cf_name])
1038            .set(
1039                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1040                    .unwrap_or(METRICS_ERROR),
1041            );
1042        db_metrics
1043            .cf_metrics
1044            .rocksdb_estimate_pending_compaction_bytes
1045            .with_label_values(&[cf_name])
1046            .set(
1047                Self::get_rocksdb_int_property(
1048                    rocksdb,
1049                    &cf,
1050                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1051                )
1052                .unwrap_or(METRICS_ERROR),
1053            );
1054        db_metrics
1055            .cf_metrics
1056            .rocksdb_num_running_compactions
1057            .with_label_values(&[cf_name])
1058            .set(
1059                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1060                    .unwrap_or(METRICS_ERROR),
1061            );
1062        db_metrics
1063            .cf_metrics
1064            .rocksdb_num_running_flushes
1065            .with_label_values(&[cf_name])
1066            .set(
1067                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1068                    .unwrap_or(METRICS_ERROR),
1069            );
1070        db_metrics
1071            .cf_metrics
1072            .rocksdb_estimate_oldest_key_time
1073            .with_label_values(&[cf_name])
1074            .set(
1075                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1076                    .unwrap_or(METRICS_ERROR),
1077            );
1078        db_metrics
1079            .cf_metrics
1080            .rocksdb_background_errors
1081            .with_label_values(&[cf_name])
1082            .set(
1083                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1084                    .unwrap_or(METRICS_ERROR),
1085            );
1086        db_metrics
1087            .cf_metrics
1088            .rocksdb_base_level
1089            .with_label_values(&[cf_name])
1090            .set(
1091                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1092                    .unwrap_or(METRICS_ERROR),
1093            );
1094    }
1095
1096    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1097        self.db.checkpoint(path)
1098    }
1099
1100    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1101    where
1102        K: Serialize + DeserializeOwned,
1103        V: Serialize + DeserializeOwned,
1104    {
1105        let mut num_keys = 0;
1106        let mut key_bytes_total = 0;
1107        let mut value_bytes_total = 0;
1108        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1109        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1110        for item in self.safe_iter() {
1111            let (key, value) = item?;
1112            num_keys += 1;
1113            let key_len = be_fix_int_ser(key.borrow()).len();
1114            let value_len = bcs::to_bytes(value.borrow())?.len();
1115            key_bytes_total += key_len;
1116            value_bytes_total += value_len;
1117            key_hist.record(key_len as u64)?;
1118            value_hist.record(value_len as u64)?;
1119        }
1120        Ok(TableSummary {
1121            num_keys,
1122            key_bytes_total,
1123            value_bytes_total,
1124            key_hist,
1125            value_hist,
1126        })
1127    }
1128
1129    fn start_iter_timer(&self) -> HistogramTimer {
1130        self.db_metrics
1131            .op_metrics
1132            .rocksdb_iter_latency_seconds
1133            .with_label_values(&[&self.cf])
1134            .start_timer()
1135    }
1136
1137    // Creates metrics and context for tracking an iterator usage and performance.
1138    fn create_iter_context(
1139        &self,
1140    ) -> (
1141        Option<HistogramTimer>,
1142        Option<Histogram>,
1143        Option<Histogram>,
1144        Option<RocksDBPerfContext>,
1145    ) {
1146        let timer = self.start_iter_timer();
1147        let bytes_scanned = self
1148            .db_metrics
1149            .op_metrics
1150            .rocksdb_iter_bytes
1151            .with_label_values(&[&self.cf]);
1152        let keys_scanned = self
1153            .db_metrics
1154            .op_metrics
1155            .rocksdb_iter_keys
1156            .with_label_values(&[&self.cf]);
1157        let perf_ctx = if self.iter_sample_interval.sample() {
1158            Some(RocksDBPerfContext)
1159        } else {
1160            None
1161        };
1162        (
1163            Some(timer),
1164            Some(bytes_scanned),
1165            Some(keys_scanned),
1166            perf_ctx,
1167        )
1168    }
1169
1170    /// Creates a safe reversed iterator with optional bounds.
1171    /// Both upper bound and lower bound are included.
1172    #[allow(clippy::complexity)]
1173    pub fn reversed_safe_iter_with_bounds(
1174        &self,
1175        lower_bound: Option<K>,
1176        upper_bound: Option<K>,
1177    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1178    where
1179        K: Serialize + DeserializeOwned,
1180        V: Serialize + DeserializeOwned,
1181    {
1182        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1183            lower_bound
1184                .as_ref()
1185                .map(Bound::Included)
1186                .unwrap_or(Bound::Unbounded),
1187            upper_bound
1188                .as_ref()
1189                .map(Bound::Included)
1190                .unwrap_or(Bound::Unbounded),
1191        ));
1192        match &self.db.storage {
1193            Storage::Rocks(db) => {
1194                let readopts = rocks_util::apply_range_bounds(
1195                    self.opts.readopts(),
1196                    it_lower_bound,
1197                    it_upper_bound,
1198                );
1199                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1200                let db_iter = db
1201                    .underlying
1202                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1203                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1204                let iter = SafeIter::new(
1205                    self.cf.clone(),
1206                    db_iter,
1207                    _timer,
1208                    _perf_ctx,
1209                    bytes_scanned,
1210                    keys_scanned,
1211                    Some(self.db_metrics.clone()),
1212                );
1213                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1214            }
1215            Storage::InMemory(db) => {
1216                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1217            }
1218            #[cfg(tidehunter)]
1219            Storage::TideHunter(db) => match &self.column_family {
1220                ColumnFamily::TideHunter((ks, prefix)) => {
1221                    let mut iter = db.iterator(*ks);
1222                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1223                    iter.reverse();
1224                    Ok(Box::new(transform_th_iterator(
1225                        iter,
1226                        prefix,
1227                        self.start_iter_timer(),
1228                    )))
1229                }
1230                _ => unreachable!("storage backend invariant violation"),
1231            },
1232        }
1233    }
1234
1235    /// Iterates the whole table as a consistent point-in-time snapshot.
1236    /// Equivalent to `snapshot_iterator_with_bounds(None, None, false)`.
1237    pub fn snapshot_iterator(&self) -> DbIterator<'_, (K, V)>
1238    where
1239        K: Serialize + DeserializeOwned,
1240        V: Serialize + DeserializeOwned,
1241    {
1242        self.snapshot_iterator_with_bounds(None, None, false)
1243    }
1244
1245    /// Iterates the table as a consistent point-in-time snapshot, optionally
1246    /// bounded and/or reversed.
1247    ///
1248    /// RocksDB and in-memory iterators are already snapshot-consistent from the
1249    /// moment they are created, so this delegates to the regular iterators. The
1250    /// tidehunter backend's live iterator is not stable under concurrent writes,
1251    /// so this opens a short-lived checkpoint and iterates that frontier instead;
1252    /// the returned iterator owns the checkpoint, pinning the snapshot for its
1253    /// lifetime.
1254    ///
1255    /// Bound semantics match the regular iterators: forward is `[lower, upper)`
1256    /// (upper exclusive), reverse is `[lower, upper]` (both inclusive).
1257    pub fn snapshot_iterator_with_bounds(
1258        &self,
1259        lower_bound: Option<K>,
1260        upper_bound: Option<K>,
1261        reverse: bool,
1262    ) -> DbIterator<'_, (K, V)>
1263    where
1264        K: Serialize + DeserializeOwned,
1265        V: Serialize + DeserializeOwned,
1266    {
1267        #[cfg(tidehunter)]
1268        if let Storage::TideHunter(db) = &self.db.storage {
1269            let ColumnFamily::TideHunter((ks, prefix)) = &self.column_family else {
1270                unreachable!("storage backend invariant violation");
1271            };
1272            // Mirror the bound computation of the regular iterators so the snapshot
1273            // observes the same key range: forward via `iterator_bounds` (upper
1274            // exclusive), reverse via an inclusive range.
1275            let (lower, upper) = if reverse {
1276                iterator_bounds_with_range::<K>((
1277                    lower_bound
1278                        .as_ref()
1279                        .map(Bound::Included)
1280                        .unwrap_or(Bound::Unbounded),
1281                    upper_bound
1282                        .as_ref()
1283                        .map(Bound::Included)
1284                        .unwrap_or(Bound::Unbounded),
1285                ))
1286            } else {
1287                iterator_bounds(lower_bound, upper_bound)
1288            };
1289            let mut iter = db.checkpoint().iterator(*ks);
1290            apply_range_bounds(&mut iter, lower, upper, prefix);
1291            if reverse {
1292                iter.reverse();
1293            }
1294            return Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()));
1295        }
1296        if reverse {
1297            // Infallible across all backends; only the `Result` shape differs.
1298            self.reversed_safe_iter_with_bounds(lower_bound, upper_bound)
1299                .expect("reversed iterator construction is infallible")
1300        } else {
1301            self.safe_iter_with_bounds(lower_bound, upper_bound)
1302        }
1303    }
1304}
1305
1306pub enum StorageWriteBatch {
1307    Rocks(rocksdb::WriteBatch),
1308    InMemory(InMemoryBatch),
1309    #[cfg(tidehunter)]
1310    TideHunter(tidehunter::batch::WriteBatch),
1311}
1312
1313/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1314/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1315/// so that slices can be produced without any per-entry allocation.
1316struct EntryHeader {
1317    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1318    offset: usize,
1319    cf_name_len: usize,
1320    key_len: usize,
1321    is_put: bool,
1322}
1323
1324/// A write batch that stores serialized operations in a flat byte buffer without
1325/// requiring a database reference. Can be replayed into a real `DBBatch` via
1326/// `DBBatch::concat`.
1327/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1328/// over write batches.
1329#[derive(Default)]
1330pub struct StagedBatch {
1331    data: Vec<u8>,
1332    entries: Vec<EntryHeader>,
1333}
1334
1335impl StagedBatch {
1336    pub fn new() -> Self {
1337        Self {
1338            data: Vec::with_capacity(1024),
1339            entries: Vec::with_capacity(16),
1340        }
1341    }
1342
1343    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1344        &mut self,
1345        db: &DBMap<K, V>,
1346        new_vals: impl IntoIterator<Item = (J, U)>,
1347    ) -> Result<&mut Self, TypedStoreError> {
1348        let cf_name = db.cf_name();
1349        new_vals
1350            .into_iter()
1351            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1352                let offset = self.data.len();
1353                self.data.extend_from_slice(cf_name.as_bytes());
1354                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1355                bcs::serialize_into(&mut self.data, v.borrow())
1356                    .map_err(typed_store_err_from_bcs_err)?;
1357                self.entries.push(EntryHeader {
1358                    offset,
1359                    cf_name_len: cf_name.len(),
1360                    key_len,
1361                    is_put: true,
1362                });
1363                Ok(())
1364            })?;
1365        Ok(self)
1366    }
1367
1368    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1369        &mut self,
1370        db: &DBMap<K, V>,
1371        purged_vals: impl IntoIterator<Item = J>,
1372    ) -> Result<(), TypedStoreError> {
1373        let cf_name = db.cf_name();
1374        purged_vals
1375            .into_iter()
1376            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1377                let offset = self.data.len();
1378                self.data.extend_from_slice(cf_name.as_bytes());
1379                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1380                self.entries.push(EntryHeader {
1381                    offset,
1382                    cf_name_len: cf_name.len(),
1383                    key_len,
1384                    is_put: false,
1385                });
1386                Ok(())
1387            })?;
1388        Ok(())
1389    }
1390
1391    pub fn size_in_bytes(&self) -> usize {
1392        self.data.len()
1393    }
1394}
1395
1396/// Provides a mutable struct to form a collection of database write operations, and execute them.
1397///
1398/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1399///  ie. they are all written or none is.
1400/// This is also true of operations across column families in the same database.
1401///
1402/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1403/// with each operation.
1404///
1405/// ```
1406/// use typed_store::rocks::*;
1407/// use tempfile::tempdir;
1408/// use typed_store::Map;
1409/// use typed_store::metrics::DBMetrics;
1410/// use prometheus::Registry;
1411/// use core::fmt::Error;
1412/// use std::sync::Arc;
1413///
1414/// #[tokio::main]
1415/// async fn main() -> Result<(), Error> {
1416/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1417///
1418/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1419///     .expect("Failed to open storage");
1420/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1421///
1422/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1423///     .expect("Failed to open storage");
1424/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1425///
1426/// let mut batch = db_cf_1.batch();
1427/// batch
1428///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1429///     .expect("Failed to batch insert")
1430///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1431///     .expect("Failed to batch insert");
1432///
1433/// let _ = batch.write().expect("Failed to execute batch");
1434/// for (k, v) in keys_vals_1 {
1435///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1436///     assert_eq!(Some(v), val);
1437/// }
1438///
1439/// for (k, v) in keys_vals_2 {
1440///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1441///     assert_eq!(Some(v), val);
1442/// }
1443/// Ok(())
1444/// }
1445/// ```
1446///
1447pub struct DBBatch {
1448    database: Arc<Database>,
1449    batch: StorageWriteBatch,
1450    db_metrics: Arc<DBMetrics>,
1451    write_sample_interval: SamplingInterval,
1452}
1453
1454impl DBBatch {
1455    /// Create a new batch associated with a DB reference.
1456    ///
1457    /// Use `open_cf` to get the DB reference or an existing open database.
1458    pub fn new(
1459        dbref: &Arc<Database>,
1460        batch: StorageWriteBatch,
1461        db_metrics: &Arc<DBMetrics>,
1462        write_sample_interval: &SamplingInterval,
1463    ) -> Self {
1464        DBBatch {
1465            database: dbref.clone(),
1466            batch,
1467            db_metrics: db_metrics.clone(),
1468            write_sample_interval: write_sample_interval.clone(),
1469        }
1470    }
1471
1472    /// Consume the batch and write its operations to the database
1473    #[instrument(level = "trace", skip_all, err)]
1474    pub fn write(self) -> Result<(), TypedStoreError> {
1475        let mut write_options = rocksdb::WriteOptions::default();
1476
1477        if write_sync_enabled() {
1478            write_options.set_sync(true);
1479        }
1480
1481        self.write_opt(write_options)
1482    }
1483
1484    /// Consume the batch and write its operations to the database with custom write options
1485    #[instrument(level = "trace", skip_all, err)]
1486    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1487        let db_name = self.database.db_name();
1488        let timer = self
1489            .db_metrics
1490            .op_metrics
1491            .rocksdb_batch_commit_latency_seconds
1492            .with_label_values(&[&db_name])
1493            .start_timer();
1494        let batch_size = self.size_in_bytes();
1495
1496        let perf_ctx = if self.write_sample_interval.sample() {
1497            Some(RocksDBPerfContext)
1498        } else {
1499            None
1500        };
1501
1502        self.database
1503            .write_opt_internal(self.batch, &write_options)?;
1504
1505        self.db_metrics
1506            .op_metrics
1507            .rocksdb_batch_commit_bytes
1508            .with_label_values(&[&db_name])
1509            .observe(batch_size as f64);
1510
1511        if perf_ctx.is_some() {
1512            self.db_metrics
1513                .write_perf_ctx_metrics
1514                .report_metrics(&db_name);
1515        }
1516        let elapsed = timer.stop_and_record();
1517        if elapsed > 1.0 {
1518            warn!(?elapsed, ?db_name, "very slow batch write");
1519            self.db_metrics
1520                .op_metrics
1521                .rocksdb_very_slow_batch_writes_count
1522                .with_label_values(&[&db_name])
1523                .inc();
1524            self.db_metrics
1525                .op_metrics
1526                .rocksdb_very_slow_batch_writes_duration_ms
1527                .with_label_values(&[&db_name])
1528                .inc_by((elapsed * 1000.0) as u64);
1529        }
1530        Ok(())
1531    }
1532
1533    pub fn size_in_bytes(&self) -> usize {
1534        match self.batch {
1535            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1536            StorageWriteBatch::InMemory(_) => 0,
1537            // TODO: implement size_in_bytes method
1538            #[cfg(tidehunter)]
1539            StorageWriteBatch::TideHunter(_) => 0,
1540        }
1541    }
1542
1543    /// Replay all operations from `StagedBatch`es into this batch.
1544    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1545        for raw_batch in raw_batches {
1546            let data = &raw_batch.data;
1547            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1548                let end = raw_batch
1549                    .entries
1550                    .get(i + 1)
1551                    .map_or(data.len(), |next| next.offset);
1552                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1553                let key_start = hdr.offset + hdr.cf_name_len;
1554                let key = &data[key_start..key_start + hdr.key_len];
1555                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1556                let cf_name = std::str::from_utf8(cf_bytes)
1557                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1558
1559                if hdr.is_put {
1560                    let value = &data[key_start + hdr.key_len..end];
1561                    match &mut self.batch {
1562                        StorageWriteBatch::Rocks(b) => {
1563                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1564                        }
1565                        StorageWriteBatch::InMemory(b) => {
1566                            b.put_cf(cf_name, key, value);
1567                        }
1568                        #[cfg(tidehunter)]
1569                        _ => {
1570                            return Err(TypedStoreError::RocksDBError(
1571                                "concat not supported for TideHunter".to_string(),
1572                            ));
1573                        }
1574                    }
1575                } else {
1576                    match &mut self.batch {
1577                        StorageWriteBatch::Rocks(b) => {
1578                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1579                        }
1580                        StorageWriteBatch::InMemory(b) => {
1581                            b.delete_cf(cf_name, key);
1582                        }
1583                        #[cfg(tidehunter)]
1584                        _ => {
1585                            return Err(TypedStoreError::RocksDBError(
1586                                "concat not supported for TideHunter".to_string(),
1587                            ));
1588                        }
1589                    }
1590                }
1591            }
1592        }
1593        Ok(self)
1594    }
1595
1596    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1597        &mut self,
1598        db: &DBMap<K, V>,
1599        purged_vals: impl IntoIterator<Item = J>,
1600    ) -> Result<(), TypedStoreError> {
1601        if !Arc::ptr_eq(&db.db, &self.database) {
1602            return Err(TypedStoreError::CrossDBBatch);
1603        }
1604
1605        purged_vals
1606            .into_iter()
1607            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1608                let k_buf = be_fix_int_ser(k.borrow());
1609                match (&mut self.batch, &db.column_family) {
1610                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1611                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1612                    }
1613                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1614                        b.delete_cf(name, k_buf)
1615                    }
1616                    #[cfg(tidehunter)]
1617                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1618                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1619                    }
1620                    _ => Err(TypedStoreError::RocksDBError(
1621                        "typed store invariant violation".to_string(),
1622                    ))?,
1623                }
1624                Ok(())
1625            })?;
1626        Ok(())
1627    }
1628
1629    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1630    /// by writing a range delete tombstone in the db map
1631    /// If the DBMap is configured with ignore_range_deletions set to false,
1632    /// the effect of this write will be visible immediately i.e. you won't
1633    /// see old values when you do a lookup or scan. But if it is configured
1634    /// with ignore_range_deletions set to true, the old value are visible until
1635    /// compaction actually deletes them which will happen sometime after. By
1636    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1637    /// overridden in the config), so please use this function with caution
1638    pub fn schedule_delete_range<K: Serialize, V>(
1639        &mut self,
1640        db: &DBMap<K, V>,
1641        from: &K,
1642        to: &K,
1643    ) -> Result<(), TypedStoreError> {
1644        if !Arc::ptr_eq(&db.db, &self.database) {
1645            return Err(TypedStoreError::CrossDBBatch);
1646        }
1647
1648        let from_buf = be_fix_int_ser(from);
1649        let to_buf = be_fix_int_ser(to);
1650
1651        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1652            b.delete_range_cf(
1653                &rocks_cf_from_db(&self.database, db.cf_name())?,
1654                from_buf,
1655                to_buf,
1656            );
1657        }
1658        Ok(())
1659    }
1660
1661    /// inserts a range of (key, value) pairs given as an iterator
1662    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1663        &mut self,
1664        db: &DBMap<K, V>,
1665        new_vals: impl IntoIterator<Item = (J, U)>,
1666    ) -> Result<&mut Self, TypedStoreError> {
1667        if !Arc::ptr_eq(&db.db, &self.database) {
1668            return Err(TypedStoreError::CrossDBBatch);
1669        }
1670        let mut total = 0usize;
1671        new_vals
1672            .into_iter()
1673            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1674                let k_buf = be_fix_int_ser(k.borrow());
1675                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1676                total += k_buf.len() + v_buf.len();
1677                if db.opts.log_value_hash {
1678                    let key_hash = default_hash(&k_buf);
1679                    let value_hash = default_hash(&v_buf);
1680                    debug!(
1681                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1682                        db.cf_name(),
1683                        key_hash,
1684                        value_hash
1685                    );
1686                }
1687                match (&mut self.batch, &db.column_family) {
1688                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1689                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1690                    }
1691                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1692                        b.put_cf(name, k_buf, v_buf)
1693                    }
1694                    #[cfg(tidehunter)]
1695                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1696                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1697                    }
1698                    _ => Err(TypedStoreError::RocksDBError(
1699                        "typed store invariant violation".to_string(),
1700                    ))?,
1701                }
1702                Ok(())
1703            })?;
1704        self.db_metrics
1705            .op_metrics
1706            .rocksdb_batch_put_bytes
1707            .with_label_values(&[&db.cf])
1708            .observe(total as f64);
1709        Ok(self)
1710    }
1711
1712    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1713        &mut self,
1714        db: &DBMap<K, V>,
1715        new_vals: impl IntoIterator<Item = (J, U)>,
1716    ) -> Result<&mut Self, TypedStoreError> {
1717        if !Arc::ptr_eq(&db.db, &self.database) {
1718            return Err(TypedStoreError::CrossDBBatch);
1719        }
1720        new_vals
1721            .into_iter()
1722            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1723                let k_buf = be_fix_int_ser(k.borrow());
1724                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1725                match &mut self.batch {
1726                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1727                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1728                        k_buf,
1729                        v_buf,
1730                    ),
1731                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1732                }
1733                Ok(())
1734            })?;
1735        Ok(self)
1736    }
1737}
1738
1739impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1740where
1741    K: Serialize + DeserializeOwned,
1742    V: Serialize + DeserializeOwned,
1743{
1744    type Error = TypedStoreError;
1745
1746    #[instrument(level = "trace", skip_all, err)]
1747    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1748        let key_buf = be_fix_int_ser(key);
1749        self.db
1750            .contains(&self.column_family, &key_buf, &self.opts.readopts())
1751    }
1752
1753    #[instrument(level = "trace", skip_all, err)]
1754    fn multi_contains_keys<J>(
1755        &self,
1756        keys: impl IntoIterator<Item = J>,
1757    ) -> Result<Vec<bool>, Self::Error>
1758    where
1759        J: Borrow<K>,
1760    {
1761        let _timer = self
1762            .db_metrics
1763            .op_metrics
1764            .rocksdb_multiget_latency_seconds
1765            .with_label_values(&[&self.cf])
1766            .start_timer();
1767        let perf_ctx = if self.multiget_sample_interval.sample() {
1768            Some(RocksDBPerfContext)
1769        } else {
1770            None
1771        };
1772        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
1773        let result = self
1774            .db
1775            .multi_contains(&self.column_family, keys_bytes, &self.opts.readopts());
1776        if perf_ctx.is_some() {
1777            self.db_metrics
1778                .read_perf_ctx_metrics
1779                .report_metrics(&self.cf);
1780        }
1781        result
1782    }
1783
1784    #[instrument(level = "trace", skip_all, err)]
1785    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1786        let _timer = self
1787            .db_metrics
1788            .op_metrics
1789            .rocksdb_get_latency_seconds
1790            .with_label_values(&[&self.cf])
1791            .start_timer();
1792        let perf_ctx = if self.get_sample_interval.sample() {
1793            Some(RocksDBPerfContext)
1794        } else {
1795            None
1796        };
1797        let key_buf = be_fix_int_ser(key);
1798        let res = self
1799            .db
1800            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1801        self.db_metrics
1802            .op_metrics
1803            .rocksdb_get_bytes
1804            .with_label_values(&[&self.cf])
1805            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1806        if perf_ctx.is_some() {
1807            self.db_metrics
1808                .read_perf_ctx_metrics
1809                .report_metrics(&self.cf);
1810        }
1811        match res {
1812            Some(data) => {
1813                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1814                if value.is_err() {
1815                    let key_hash = default_hash(&key_buf);
1816                    let value_hash = default_hash(&data);
1817                    debug_fatal!(
1818                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1819                        self.cf_name(),
1820                        key_hash,
1821                        value_hash,
1822                        value.as_ref().err().unwrap()
1823                    );
1824                }
1825                Ok(Some(value?))
1826            }
1827            None => Ok(None),
1828        }
1829    }
1830
1831    #[instrument(level = "trace", skip_all, err)]
1832    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1833        let timer = self
1834            .db_metrics
1835            .op_metrics
1836            .rocksdb_put_latency_seconds
1837            .with_label_values(&[&self.cf])
1838            .start_timer();
1839        let perf_ctx = if self.write_sample_interval.sample() {
1840            Some(RocksDBPerfContext)
1841        } else {
1842            None
1843        };
1844        let key_buf = be_fix_int_ser(key);
1845        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1846        self.db_metrics
1847            .op_metrics
1848            .rocksdb_put_bytes
1849            .with_label_values(&[&self.cf])
1850            .observe((key_buf.len() + value_buf.len()) as f64);
1851        if perf_ctx.is_some() {
1852            self.db_metrics
1853                .write_perf_ctx_metrics
1854                .report_metrics(&self.cf);
1855        }
1856        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1857
1858        let elapsed = timer.stop_and_record();
1859        if elapsed > 1.0 {
1860            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1861            self.db_metrics
1862                .op_metrics
1863                .rocksdb_very_slow_puts_count
1864                .with_label_values(&[&self.cf])
1865                .inc();
1866            self.db_metrics
1867                .op_metrics
1868                .rocksdb_very_slow_puts_duration_ms
1869                .with_label_values(&[&self.cf])
1870                .inc_by((elapsed * 1000.0) as u64);
1871        }
1872
1873        Ok(())
1874    }
1875
1876    #[instrument(level = "trace", skip_all, err)]
1877    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1878        let _timer = self
1879            .db_metrics
1880            .op_metrics
1881            .rocksdb_delete_latency_seconds
1882            .with_label_values(&[&self.cf])
1883            .start_timer();
1884        let perf_ctx = if self.write_sample_interval.sample() {
1885            Some(RocksDBPerfContext)
1886        } else {
1887            None
1888        };
1889        let key_buf = be_fix_int_ser(key);
1890        self.db.delete_cf(&self.column_family, key_buf)?;
1891        self.db_metrics
1892            .op_metrics
1893            .rocksdb_deletes
1894            .with_label_values(&[&self.cf])
1895            .inc();
1896        if perf_ctx.is_some() {
1897            self.db_metrics
1898                .write_perf_ctx_metrics
1899                .report_metrics(&self.cf);
1900        }
1901        Ok(())
1902    }
1903
1904    /// Writes a range delete tombstone to delete all entries in the db map
1905    /// If the DBMap is configured with ignore_range_deletions set to false,
1906    /// the effect of this write will be visible immediately i.e. you won't
1907    /// see old values when you do a lookup or scan. But if it is configured
1908    /// with ignore_range_deletions set to true, the old value are visible until
1909    /// compaction actually deletes them which will happen sometime after. By
1910    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1911    /// overridden in the config), so please use this function with caution
1912    #[instrument(level = "trace", skip_all, err)]
1913    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1914        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1915        let last_key = self
1916            .reversed_safe_iter_with_bounds(None, None)?
1917            .next()
1918            .transpose()?
1919            .map(|(k, _v)| k);
1920        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1921            let mut batch = self.batch();
1922            batch.schedule_delete_range(self, &first_key, &last_key)?;
1923            batch.write()?;
1924        }
1925        Ok(())
1926    }
1927
1928    fn is_empty(&self) -> bool {
1929        self.safe_iter().next().is_none()
1930    }
1931
1932    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1933        match &self.db.storage {
1934            Storage::Rocks(db) => {
1935                let db_iter = db
1936                    .underlying
1937                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1938                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1939                Box::new(SafeIter::new(
1940                    self.cf.clone(),
1941                    db_iter,
1942                    _timer,
1943                    _perf_ctx,
1944                    bytes_scanned,
1945                    keys_scanned,
1946                    Some(self.db_metrics.clone()),
1947                ))
1948            }
1949            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1950            #[cfg(tidehunter)]
1951            Storage::TideHunter(db) => match &self.column_family {
1952                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1953                    db.iterator(*ks),
1954                    prefix,
1955                    self.start_iter_timer(),
1956                )),
1957                _ => unreachable!("storage backend invariant violation"),
1958            },
1959        }
1960    }
1961
1962    fn safe_iter_with_bounds(
1963        &'a self,
1964        lower_bound: Option<K>,
1965        upper_bound: Option<K>,
1966    ) -> DbIterator<'a, (K, V)> {
1967        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1968        match &self.db.storage {
1969            Storage::Rocks(db) => {
1970                let readopts =
1971                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1972                let db_iter = db
1973                    .underlying
1974                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1975                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1976                Box::new(SafeIter::new(
1977                    self.cf.clone(),
1978                    db_iter,
1979                    _timer,
1980                    _perf_ctx,
1981                    bytes_scanned,
1982                    keys_scanned,
1983                    Some(self.db_metrics.clone()),
1984                ))
1985            }
1986            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1987            #[cfg(tidehunter)]
1988            Storage::TideHunter(db) => match &self.column_family {
1989                ColumnFamily::TideHunter((ks, prefix)) => {
1990                    let mut iter = db.iterator(*ks);
1991                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1992                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1993                }
1994                _ => unreachable!("storage backend invariant violation"),
1995            },
1996        }
1997    }
1998
1999    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
2000        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
2001        match &self.db.storage {
2002            Storage::Rocks(db) => {
2003                let readopts =
2004                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
2005                let db_iter = db
2006                    .underlying
2007                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
2008                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2009                Box::new(SafeIter::new(
2010                    self.cf.clone(),
2011                    db_iter,
2012                    _timer,
2013                    _perf_ctx,
2014                    bytes_scanned,
2015                    keys_scanned,
2016                    Some(self.db_metrics.clone()),
2017                ))
2018            }
2019            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
2020            #[cfg(tidehunter)]
2021            Storage::TideHunter(db) => match &self.column_family {
2022                ColumnFamily::TideHunter((ks, prefix)) => {
2023                    let mut iter = db.iterator(*ks);
2024                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
2025                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
2026                }
2027                _ => unreachable!("storage backend invariant violation"),
2028            },
2029        }
2030    }
2031
2032    /// Returns a vector of values corresponding to the keys provided.
2033    #[instrument(level = "trace", skip_all, err)]
2034    fn multi_get<J>(
2035        &self,
2036        keys: impl IntoIterator<Item = J>,
2037    ) -> Result<Vec<Option<V>>, TypedStoreError>
2038    where
2039        J: Borrow<K>,
2040    {
2041        let results = self.multi_get_pinned(keys)?;
2042        let values_parsed: Result<Vec<_>, TypedStoreError> = results
2043            .into_iter()
2044            .map(|value_byte| match value_byte {
2045                Some(data) => Ok(Some(
2046                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2047                )),
2048                None => Ok(None),
2049            })
2050            .collect();
2051
2052        values_parsed
2053    }
2054
2055    /// Convenience method for batch insertion
2056    #[instrument(level = "trace", skip_all, err)]
2057    fn multi_insert<J, U>(
2058        &self,
2059        key_val_pairs: impl IntoIterator<Item = (J, U)>,
2060    ) -> Result<(), Self::Error>
2061    where
2062        J: Borrow<K>,
2063        U: Borrow<V>,
2064    {
2065        let mut batch = self.batch();
2066        batch.insert_batch(self, key_val_pairs)?;
2067        batch.write()
2068    }
2069
2070    /// Convenience method for batch removal
2071    #[instrument(level = "trace", skip_all, err)]
2072    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2073    where
2074        J: Borrow<K>,
2075    {
2076        let mut batch = self.batch();
2077        batch.delete_batch(self, keys)?;
2078        batch.write()
2079    }
2080
2081    /// Try to catch up with primary when running as secondary
2082    #[instrument(level = "trace", skip_all, err)]
2083    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2084        if let Storage::Rocks(rocks) = &self.db.storage {
2085            rocks
2086                .underlying
2087                .try_catch_up_with_primary()
2088                .map_err(typed_store_err_from_rocks_err)?;
2089        }
2090        Ok(())
2091    }
2092}
2093
2094/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
2095#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2096pub fn open_cf_opts<P: AsRef<Path>>(
2097    path: P,
2098    db_options: Option<rocksdb::Options>,
2099    metric_conf: MetricConf,
2100    opt_cfs: &[(&str, rocksdb::Options)],
2101) -> Result<Arc<Database>, TypedStoreError> {
2102    let path = path.as_ref();
2103    ensure_database_type(path, StorageType::Rocks)
2104        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2105    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
2106    // because rocksdb uses the simulated clock when creating its background threads, but then
2107    // those threads see the real wall clock (because they are not the test thread), which causes
2108    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
2109    // resolves the issue.
2110    //
2111    // This is a no-op in non-simulator builds.
2112
2113    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2114    nondeterministic!({
2115        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2116        options.create_if_missing(true);
2117        options.create_missing_column_families(true);
2118        let rocksdb = {
2119            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2120                &options,
2121                path,
2122                cfs.into_iter()
2123                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2124            )
2125            .map_err(typed_store_err_from_rocks_err)?
2126        };
2127        Ok(Arc::new(Database::new(
2128            Storage::Rocks(RocksDB {
2129                underlying: rocksdb,
2130            }),
2131            metric_conf,
2132            None,
2133        )))
2134    })
2135}
2136
2137/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
2138pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2139    primary_path: P,
2140    secondary_path: Option<P>,
2141    db_options: Option<rocksdb::Options>,
2142    metric_conf: MetricConf,
2143    opt_cfs: &[(&str, rocksdb::Options)],
2144) -> Result<Arc<Database>, TypedStoreError> {
2145    let primary_path = primary_path.as_ref();
2146    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2147    // See comment above for explanation of why nondeterministic is necessary here.
2148    nondeterministic!({
2149        // Customize database options
2150        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2151
2152        fdlimit::raise_fd_limit();
2153        // This is a requirement by RocksDB when opening as secondary
2154        options.set_max_open_files(-1);
2155
2156        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2157        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2158            .ok()
2159            .unwrap_or_default();
2160
2161        let default_db_options = default_db_options();
2162        // Add CFs not explicitly listed
2163        for cf_key in cfs.iter() {
2164            if !opt_cfs.contains_key(&cf_key[..]) {
2165                opt_cfs.insert(cf_key, default_db_options.options.clone());
2166            }
2167        }
2168
2169        let primary_path = primary_path.to_path_buf();
2170        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2171            let mut s = primary_path.clone();
2172            s.pop();
2173            s.push("SECONDARY");
2174            s.as_path().to_path_buf()
2175        });
2176
2177        ensure_database_type(&primary_path, StorageType::Rocks)
2178            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2179        ensure_database_type(&secondary_path, StorageType::Rocks)
2180            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2181
2182        let rocksdb = {
2183            options.create_if_missing(true);
2184            options.create_missing_column_families(true);
2185            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2186                &options,
2187                &primary_path,
2188                &secondary_path,
2189                opt_cfs
2190                    .iter()
2191                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2192            )
2193            .map_err(typed_store_err_from_rocks_err)?;
2194            db.try_catch_up_with_primary()
2195                .map_err(typed_store_err_from_rocks_err)?;
2196            db
2197        };
2198        Ok(Arc::new(Database::new(
2199            Storage::Rocks(RocksDB {
2200                underlying: rocksdb,
2201            }),
2202            metric_conf,
2203            None,
2204        )))
2205    })
2206}
2207
2208// Drops a database if there is no other handle to it, with retries and timeout.
2209// Detects the storage variant (RocksDB vs. tidehunter) from the directory
2210// contents and dispatches to the matching cleanup. Both variants coexist in
2211// tidehunter builds — some stores (e.g. rpc-index) are pure RocksDB even when
2212// the tidehunter feature is enabled elsewhere in the binary.
2213pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2214    #[cfg(tidehunter)]
2215    if is_tidehunter_db(&path) {
2216        return safe_drop_tidehunter_db(path, timeout).await;
2217    }
2218    safe_drop_rocksdb(path, timeout).await
2219}
2220
2221async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2222    let mut backoff = backoff::ExponentialBackoff {
2223        max_elapsed_time: Some(timeout),
2224        ..Default::default()
2225    };
2226    loop {
2227        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2228            Ok(()) => return Ok(()),
2229            Err(err) => match backoff.next_backoff() {
2230                Some(duration) => tokio::time::sleep(duration).await,
2231                None => return Err(std::io::Error::other(err)),
2232            },
2233        }
2234    }
2235}
2236
2237#[cfg(tidehunter)]
2238fn is_tidehunter_db(path: &Path) -> bool {
2239    // `shape.yaml` is written by TideHunterDb::open and is the canonical marker
2240    // for a tidehunter DB directory; RocksDB never creates it.
2241    path.join("shape.yaml").exists()
2242}
2243
2244#[cfg(tidehunter)]
2245async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2246    let mut backoff = backoff::ExponentialBackoff {
2247        max_elapsed_time: Some(timeout),
2248        ..Default::default()
2249    };
2250    loop {
2251        match TideHunterDb::drop_db(&path) {
2252            Ok(()) => return Ok(()),
2253            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2254                match backoff.next_backoff() {
2255                    Some(duration) => tokio::time::sleep(duration).await,
2256                    None => {
2257                        warn!(
2258                            "Database at {:?} is still locked after timeout ({:?})",
2259                            path, timeout
2260                        );
2261                        return Err(err);
2262                    }
2263                }
2264            }
2265            Err(err) => return Err(err),
2266        }
2267    }
2268}
2269
2270fn populate_missing_cfs(
2271    input_cfs: &[(&str, rocksdb::Options)],
2272    path: &Path,
2273) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2274    let mut cfs = vec![];
2275    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2276    let existing_cfs =
2277        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2278            .ok()
2279            .unwrap_or_default();
2280
2281    for cf_name in existing_cfs {
2282        if !input_cf_index.contains(&cf_name[..]) {
2283            cfs.push((cf_name, rocksdb::Options::default()));
2284        }
2285    }
2286    cfs.extend(
2287        input_cfs
2288            .iter()
2289            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2290    );
2291    Ok(cfs)
2292}
2293
2294fn default_hash(value: &[u8]) -> Digest<32> {
2295    let mut hasher = fastcrypto::hash::Blake2b256::default();
2296    hasher.update(value);
2297    hasher.finalize()
2298}