typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20    be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21    iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26    traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36    properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42    borrow::Borrow,
43    marker::PhantomData,
44    ops::RangeBounds,
45    path::{Path, PathBuf},
46    sync::{Arc, OnceLock},
47    time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
57// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
58const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64    *WRITE_SYNC_ENABLED
65        .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68/// Initialize the write sync setting from config.
69/// Must be called before any database writes occur.
70pub fn init_write_sync(enabled: Option<bool>) {
71    if let Some(value) = enabled {
72        let _ = WRITE_SYNC_ENABLED.set(value);
73    }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85    fn drop(&mut self) {
86        self.underlying.cancel_all_background_work(/* wait */ true);
87    }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92    Rocks(String),
93    InMemory(String),
94    #[cfg(tidehunter)]
95    TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103            #[cfg(tidehunter)]
104            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105        }
106    }
107}
108
109impl ColumnFamily {
110    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111        match &self {
112            ColumnFamily::Rocks(name) => rocks_db
113                .underlying
114                .cf_handle(name)
115                .expect("Map-keying column family should have been checked at DB creation"),
116            _ => unreachable!("invariant is checked by the caller"),
117        }
118    }
119}
120
121pub enum Storage {
122    Rocks(RocksDB),
123    InMemory(InMemoryDB),
124    #[cfg(tidehunter)]
125    TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133            #[cfg(tidehunter)]
134            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135        }
136    }
137}
138
139#[derive(Debug)]
140pub struct Database {
141    storage: Storage,
142    metric_conf: MetricConf,
143    registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147    fn drop(&mut self) {
148        let metrics = DBMetrics::get();
149        metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150        if let Some(registry_id) = self.registry_id {
151            metrics.registry_serivce.remove(registry_id);
152        }
153    }
154}
155
156enum GetResult<'a> {
157    Rocks(DBPinnableSlice<'a>),
158    InMemory(Vec<u8>),
159    #[cfg(tidehunter)]
160    TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164    type Target = [u8];
165    fn deref(&self) -> &[u8] {
166        match self {
167            GetResult::Rocks(d) => d.deref(),
168            GetResult::InMemory(d) => d.deref(),
169            #[cfg(tidehunter)]
170            GetResult::TideHunter(d) => d.deref(),
171        }
172    }
173}
174
175impl Database {
176    pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178        Self {
179            storage,
180            metric_conf,
181            registry_id,
182        }
183    }
184
185    /// Flush all memtables to SST files on disk.
186    pub fn flush(&self) -> Result<(), TypedStoreError> {
187        match &self.storage {
188            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190            }),
191            Storage::InMemory(_) => {
192                // InMemory databases don't need flushing
193                Ok(())
194            }
195            #[cfg(tidehunter)]
196            Storage::TideHunter(_) => {
197                // TideHunter doesn't support an explicit flush.
198                Ok(())
199            }
200        }
201    }
202
203    fn get<K: AsRef<[u8]>>(
204        &self,
205        cf: &ColumnFamily,
206        key: K,
207        readopts: &ReadOptions,
208    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209        match (&self.storage, cf) {
210            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211                .underlying
212                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213                .map_err(typed_store_err_from_rocks_err)?
214                .map(GetResult::Rocks)),
215            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216                Ok(db.get(cf_name, key).map(GetResult::InMemory))
217            }
218            #[cfg(tidehunter)]
219            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220                .get(*ks, &transform_th_key(key.as_ref(), prefix))
221                .map_err(typed_store_error_from_th_error)?
222                .map(GetResult::TideHunter)),
223
224            _ => Err(TypedStoreError::RocksDBError(
225                "typed store invariant violation".to_string(),
226            )),
227        }
228    }
229
230    fn multi_get<I, K>(
231        &self,
232        cf: &ColumnFamily,
233        keys: I,
234        readopts: &ReadOptions,
235    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236    where
237        I: IntoIterator<Item = K>,
238        K: AsRef<[u8]>,
239    {
240        match (&self.storage, cf) {
241            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242                let keys_vec: Vec<K> = keys.into_iter().collect();
243                let res = db.underlying.batched_multi_get_cf_opt(
244                    &cf.rocks_cf(db),
245                    keys_vec.iter(),
246                    /* sorted_input */ false,
247                    readopts,
248                );
249                res.into_iter()
250                    .map(|r| {
251                        r.map_err(typed_store_err_from_rocks_err)
252                            .map(|item| item.map(GetResult::Rocks))
253                    })
254                    .collect()
255            }
256            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257                .multi_get(cf_name, keys)
258                .into_iter()
259                .map(|r| Ok(r.map(GetResult::InMemory)))
260                .collect(),
261            #[cfg(tidehunter)]
262            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263                let res = keys.into_iter().map(|k| {
264                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265                        .map_err(typed_store_error_from_th_error)
266                });
267                res.into_iter()
268                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269                    .collect()
270            }
271            _ => unreachable!("typed store invariant violation"),
272        }
273    }
274
275    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276        match &self.storage {
277            Storage::Rocks(db) => db.underlying.drop_cf(name),
278            Storage::InMemory(db) => {
279                db.drop_cf(name);
280                Ok(())
281            }
282            #[cfg(tidehunter)]
283            Storage::TideHunter(_) => {
284                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285            }
286        }
287    }
288
289    pub fn delete_file_in_range<K: AsRef<[u8]>>(
290        &self,
291        cf: &impl AsColumnFamilyRef,
292        from: K,
293        to: K,
294    ) -> Result<(), rocksdb::Error> {
295        match &self.storage {
296            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298        }
299    }
300
301    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302        fail_point!("delete-cf-before");
303        let ret = match (&self.storage, cf) {
304            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305                .underlying
306                .delete_cf(&cf.rocks_cf(db), key)
307                .map_err(typed_store_err_from_rocks_err),
308            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309                db.delete(cf_name, key.as_ref());
310                Ok(())
311            }
312            #[cfg(tidehunter)]
313            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314                .remove(*ks, transform_th_key(key.as_ref(), prefix))
315                .map_err(typed_store_error_from_th_error),
316            _ => Err(TypedStoreError::RocksDBError(
317                "typed store invariant violation".to_string(),
318            )),
319        };
320        fail_point!("delete-cf-after");
321        #[allow(clippy::let_and_return)]
322        ret
323    }
324
325    pub fn path_for_pruning(&self) -> &Path {
326        match &self.storage {
327            Storage::Rocks(rocks) => rocks.underlying.path(),
328            _ => unimplemented!("method is only supported for rocksdb backend"),
329        }
330    }
331
332    fn put_cf(
333        &self,
334        cf: &ColumnFamily,
335        key: Vec<u8>,
336        value: Vec<u8>,
337    ) -> Result<(), TypedStoreError> {
338        fail_point!("put-cf-before");
339        let ret = match (&self.storage, cf) {
340            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341                .underlying
342                .put_cf(&cf.rocks_cf(db), key, value)
343                .map_err(typed_store_err_from_rocks_err),
344            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345                db.put(cf_name, key, value);
346                Ok(())
347            }
348            #[cfg(tidehunter)]
349            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350                .insert(*ks, transform_th_key(&key, prefix), value)
351                .map_err(typed_store_error_from_th_error),
352            _ => Err(TypedStoreError::RocksDBError(
353                "typed store invariant violation".to_string(),
354            )),
355        };
356        fail_point!("put-cf-after");
357        #[allow(clippy::let_and_return)]
358        ret
359    }
360
361    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362        &self,
363        cf_name: &str,
364        key: K,
365        readopts: &ReadOptions,
366    ) -> bool {
367        match &self.storage {
368            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
369            // but no false negatives. We use it to short-circuit the absent case
370            Storage::Rocks(rocks) => {
371                rocks
372                    .underlying
373                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374            }
375            _ => true,
376        }
377    }
378
379    pub(crate) fn write_opt_internal(
380        &self,
381        batch: StorageWriteBatch,
382        write_options: &rocksdb::WriteOptions,
383    ) -> Result<(), TypedStoreError> {
384        fail_point!("batch-write-before");
385        let ret = match (&self.storage, batch) {
386            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387                .underlying
388                .write_opt(batch, write_options)
389                .map_err(typed_store_err_from_rocks_err),
390            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391                // InMemory doesn't support write options
392                db.write(batch);
393                Ok(())
394            }
395            #[cfg(tidehunter)]
396            (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397                // TideHunter doesn't support write options
398                batch.commit().map_err(typed_store_error_from_th_error)
399            }
400            _ => Err(TypedStoreError::RocksDBError(
401                "using invalid batch type for the database".to_string(),
402            )),
403        };
404        fail_point!("batch-write-after");
405        #[allow(clippy::let_and_return)]
406        ret
407    }
408
409    #[cfg(tidehunter)]
410    pub fn start_relocation(&self) -> anyhow::Result<()> {
411        if let Storage::TideHunter(db) = &self.storage {
412            db.start_relocation()?;
413        }
414        Ok(())
415    }
416
417    #[cfg(tidehunter)]
418    pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
419        if let Storage::TideHunter(db) = &self.storage {
420            db.force_rebuild_control_region()
421                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
422        }
423        Ok(())
424    }
425
426    /// Wait for tidehunter background threads to finish.
427    ///
428    /// Consumes the `Arc<Database>`. Caller must ensure no other clones of this
429    /// `Arc<Database>` (e.g. via `DBMap::db`) are alive — otherwise the inner
430    /// `Arc<TideHunterDb>` strong count will not reach zero and the wait will
431    /// poll until it panics.
432    #[cfg(tidehunter)]
433    pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
434        let strong = Arc::strong_count(&self);
435        if strong != 1 {
436            println!(
437                "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
438                strong,
439            );
440        }
441        let Storage::TideHunter(th_arc) = &self.storage else {
442            return;
443        };
444        let th_arc = th_arc.clone();
445        drop(self);
446        th_arc.wait_for_background_threads_to_finish();
447    }
448
449    #[cfg(tidehunter)]
450    pub fn drop_cells_in_range(
451        &self,
452        ks: KeySpace,
453        from_inclusive: &[u8],
454        to_inclusive: &[u8],
455    ) -> anyhow::Result<()> {
456        if let Storage::TideHunter(db) = &self.storage {
457            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
458                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
459        } else {
460            panic!("drop_cells_in_range called on non-TideHunter storage");
461        }
462        Ok(())
463    }
464
465    pub fn compact_range_cf<K: AsRef<[u8]>>(
466        &self,
467        cf_name: &str,
468        start: Option<K>,
469        end: Option<K>,
470    ) {
471        if let Storage::Rocks(rocksdb) = &self.storage {
472            rocksdb
473                .underlying
474                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
475        }
476    }
477
478    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
479        // TODO: implement for other storage types
480        if let Storage::Rocks(rocks) = &self.storage {
481            let checkpoint =
482                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
483            checkpoint
484                .create_checkpoint(path)
485                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
486        }
487        Ok(())
488    }
489
490    pub fn get_sampling_interval(&self) -> SamplingInterval {
491        self.metric_conf.read_sample_interval.new_from_self()
492    }
493
494    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
495        self.metric_conf.read_sample_interval.new_from_self()
496    }
497
498    pub fn write_sampling_interval(&self) -> SamplingInterval {
499        self.metric_conf.write_sample_interval.new_from_self()
500    }
501
502    pub fn iter_sampling_interval(&self) -> SamplingInterval {
503        self.metric_conf.iter_sample_interval.new_from_self()
504    }
505
506    fn db_name(&self) -> String {
507        let name = &self.metric_conf.db_name;
508        if name.is_empty() {
509            "default".to_string()
510        } else {
511            name.clone()
512        }
513    }
514
515    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
516        match &self.storage {
517            Storage::Rocks(rocks) => rocks.underlying.live_files(),
518            _ => Ok(vec![]),
519        }
520    }
521}
522
523fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
524    rocks_db
525        .underlying
526        .cf_handle(cf_name)
527        .expect("Map-keying column family should have been checked at DB creation")
528}
529
530fn rocks_cf_from_db<'a>(
531    db: &'a Database,
532    cf_name: &str,
533) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
534    match &db.storage {
535        Storage::Rocks(rocksdb) => Ok(rocksdb
536            .underlying
537            .cf_handle(cf_name)
538            .expect("Map-keying column family should have been checked at DB creation")),
539        _ => Err(TypedStoreError::RocksDBError(
540            "using invalid batch type for the database".to_string(),
541        )),
542    }
543}
544
545#[derive(Debug, Default)]
546pub struct MetricConf {
547    pub db_name: String,
548    pub read_sample_interval: SamplingInterval,
549    pub write_sample_interval: SamplingInterval,
550    pub iter_sample_interval: SamplingInterval,
551}
552
553impl MetricConf {
554    pub fn new(db_name: &str) -> Self {
555        if db_name.is_empty() {
556            error!("A meaningful db name should be used for metrics reporting.")
557        }
558        Self {
559            db_name: db_name.to_string(),
560            read_sample_interval: SamplingInterval::default(),
561            write_sample_interval: SamplingInterval::default(),
562            iter_sample_interval: SamplingInterval::default(),
563        }
564    }
565
566    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
567        Self {
568            db_name: self.db_name,
569            read_sample_interval: read_interval,
570            write_sample_interval: SamplingInterval::default(),
571            iter_sample_interval: SamplingInterval::default(),
572        }
573    }
574}
575const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
576const METRICS_ERROR: i64 = -1;
577
578/// An interface to a rocksDB database, keyed by a columnfamily
579#[derive(Clone, Debug)]
580pub struct DBMap<K, V> {
581    pub db: Arc<Database>,
582    _phantom: PhantomData<fn(K) -> V>,
583    column_family: ColumnFamily,
584    // the column family under which the map is stored
585    cf: String,
586    pub opts: ReadWriteOptions,
587    db_metrics: Arc<DBMetrics>,
588    get_sample_interval: SamplingInterval,
589    multiget_sample_interval: SamplingInterval,
590    write_sample_interval: SamplingInterval,
591    iter_sample_interval: SamplingInterval,
592    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
593}
594
595unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
596
597impl<K, V> DBMap<K, V> {
598    pub(crate) fn new(
599        db: Arc<Database>,
600        opts: &ReadWriteOptions,
601        opt_cf: &str,
602        column_family: ColumnFamily,
603        is_deprecated: bool,
604    ) -> Self {
605        let db_cloned = Arc::downgrade(&db.clone());
606        let db_metrics = DBMetrics::get();
607        let db_metrics_cloned = db_metrics.clone();
608        let cf = opt_cf.to_string();
609
610        let (sender, mut recv) = tokio::sync::oneshot::channel();
611        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
612            tokio::task::spawn(async move {
613                let mut interval =
614                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
615                loop {
616                    tokio::select! {
617                        _ = interval.tick() => {
618                            if let Some(db) = db_cloned.upgrade() {
619                                let cf = cf.clone();
620                                let db_metrics = db_metrics.clone();
621                                if let Err(e) = tokio::task::spawn_blocking(move || {
622                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
623                                }).await {
624                                    error!("Failed to log metrics with error: {}", e);
625                                }
626                            }
627                        }
628                        _ = &mut recv => break,
629                    }
630                }
631                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
632            });
633        }
634        DBMap {
635            db: db.clone(),
636            opts: opts.clone(),
637            _phantom: PhantomData,
638            column_family,
639            cf: opt_cf.to_string(),
640            db_metrics: db_metrics_cloned,
641            _metrics_task_cancel_handle: Arc::new(sender),
642            get_sample_interval: db.get_sampling_interval(),
643            multiget_sample_interval: db.multiget_sampling_interval(),
644            write_sample_interval: db.write_sampling_interval(),
645            iter_sample_interval: db.iter_sampling_interval(),
646        }
647    }
648
649    /// Reopens an open database as a typed map operating under a specific column family.
650    /// if no column family is passed, the default column family is used.
651    #[instrument(level = "debug", skip(db), err)]
652    pub fn reopen(
653        db: &Arc<Database>,
654        opt_cf: Option<&str>,
655        rw_options: &ReadWriteOptions,
656        is_deprecated: bool,
657    ) -> Result<Self, TypedStoreError> {
658        let cf_key = opt_cf
659            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
660            .to_owned();
661        Ok(DBMap::new(
662            db.clone(),
663            rw_options,
664            &cf_key,
665            ColumnFamily::Rocks(cf_key.to_string()),
666            is_deprecated,
667        ))
668    }
669
670    #[cfg(tidehunter)]
671    pub fn reopen_th(
672        db: Arc<Database>,
673        cf_name: &str,
674        ks: KeySpace,
675        prefix: Option<Vec<u8>>,
676    ) -> Self {
677        DBMap::new(
678            db,
679            &ReadWriteOptions::default(),
680            cf_name,
681            ColumnFamily::TideHunter((ks, prefix.clone())),
682            false,
683        )
684    }
685
686    pub fn cf_name(&self) -> &str {
687        &self.cf
688    }
689
690    pub fn batch(&self) -> DBBatch {
691        let batch = match &self.db.storage {
692            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
693            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
694            #[cfg(tidehunter)]
695            Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
696        };
697        DBBatch::new(
698            &self.db,
699            batch,
700            &self.db_metrics,
701            &self.write_sample_interval,
702        )
703    }
704
705    pub fn flush(&self) -> Result<(), TypedStoreError> {
706        self.db.flush()
707    }
708
709    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
710        let from_buf = be_fix_int_ser(start);
711        let to_buf = be_fix_int_ser(end);
712        self.db
713            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
714        Ok(())
715    }
716
717    pub fn compact_range_raw(
718        &self,
719        cf_name: &str,
720        start: Vec<u8>,
721        end: Vec<u8>,
722    ) -> Result<(), TypedStoreError> {
723        self.db.compact_range_cf(cf_name, Some(start), Some(end));
724        Ok(())
725    }
726
727    #[cfg(tidehunter)]
728    pub fn drop_cells_in_range<J: Serialize>(
729        &self,
730        from_inclusive: &J,
731        to_inclusive: &J,
732    ) -> Result<(), TypedStoreError>
733    where
734        K: Serialize,
735    {
736        let from_buf = be_fix_int_ser(from_inclusive);
737        let to_buf = be_fix_int_ser(to_inclusive);
738        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
739            self.db
740                .drop_cells_in_range(*ks, &from_buf, &to_buf)
741                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
742        }
743        Ok(())
744    }
745
746    #[cfg(tidehunter)]
747    pub fn drop_cells_in_range_raw(
748        &self,
749        from_inclusive: &[u8],
750        to_inclusive: &[u8],
751    ) -> Result<(), TypedStoreError> {
752        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
753            self.db
754                .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
755                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
756        }
757        Ok(())
758    }
759
760    /// Returns a vector of raw values corresponding to the keys provided.
761    fn multi_get_pinned<J>(
762        &self,
763        keys: impl IntoIterator<Item = J>,
764    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
765    where
766        J: Borrow<K>,
767        K: Serialize,
768    {
769        let _timer = self
770            .db_metrics
771            .op_metrics
772            .rocksdb_multiget_latency_seconds
773            .with_label_values(&[&self.cf])
774            .start_timer();
775        let perf_ctx = if self.multiget_sample_interval.sample() {
776            Some(RocksDBPerfContext)
777        } else {
778            None
779        };
780        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
781        let results: Result<Vec<_>, TypedStoreError> = self
782            .db
783            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
784            .into_iter()
785            .collect();
786        let entries = results?;
787        let entry_size = entries
788            .iter()
789            .flatten()
790            .map(|entry| entry.len())
791            .sum::<usize>();
792        self.db_metrics
793            .op_metrics
794            .rocksdb_multiget_bytes
795            .with_label_values(&[&self.cf])
796            .observe(entry_size as f64);
797        if perf_ctx.is_some() {
798            self.db_metrics
799                .read_perf_ctx_metrics
800                .report_metrics(&self.cf);
801        }
802        Ok(entries)
803    }
804
805    fn get_rocksdb_int_property(
806        rocksdb: &RocksDB,
807        cf: &impl AsColumnFamilyRef,
808        property_name: &std::ffi::CStr,
809    ) -> Result<i64, TypedStoreError> {
810        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
811            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
812            Ok(None) => Ok(0),
813            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
814        }
815    }
816
817    fn report_rocksdb_metrics(
818        database: &Arc<Database>,
819        cf_name: &str,
820        db_metrics: &Arc<DBMetrics>,
821    ) {
822        let Storage::Rocks(rocksdb) = &database.storage else {
823            return;
824        };
825
826        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
827            tracing::warn!(
828                "unable to report metrics for cf {cf_name:?} in db {:?}",
829                database.db_name()
830            );
831            return;
832        };
833
834        db_metrics
835            .cf_metrics
836            .rocksdb_total_sst_files_size
837            .with_label_values(&[cf_name])
838            .set(
839                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
840                    .unwrap_or(METRICS_ERROR),
841            );
842        db_metrics
843            .cf_metrics
844            .rocksdb_total_blob_files_size
845            .with_label_values(&[cf_name])
846            .set(
847                Self::get_rocksdb_int_property(
848                    rocksdb,
849                    &cf,
850                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
851                )
852                .unwrap_or(METRICS_ERROR),
853            );
854        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
855        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
856        let total_num_files: i64 = (0..=6)
857            .map(|level| {
858                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
859                    .unwrap_or(METRICS_ERROR)
860            })
861            .sum();
862        db_metrics
863            .cf_metrics
864            .rocksdb_total_num_files
865            .with_label_values(&[cf_name])
866            .set(total_num_files);
867        db_metrics
868            .cf_metrics
869            .rocksdb_num_level0_files
870            .with_label_values(&[cf_name])
871            .set(
872                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
873                    .unwrap_or(METRICS_ERROR),
874            );
875        db_metrics
876            .cf_metrics
877            .rocksdb_current_size_active_mem_tables
878            .with_label_values(&[cf_name])
879            .set(
880                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
881                    .unwrap_or(METRICS_ERROR),
882            );
883        db_metrics
884            .cf_metrics
885            .rocksdb_size_all_mem_tables
886            .with_label_values(&[cf_name])
887            .set(
888                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
889                    .unwrap_or(METRICS_ERROR),
890            );
891        db_metrics
892            .cf_metrics
893            .rocksdb_num_snapshots
894            .with_label_values(&[cf_name])
895            .set(
896                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
897                    .unwrap_or(METRICS_ERROR),
898            );
899        db_metrics
900            .cf_metrics
901            .rocksdb_oldest_snapshot_time
902            .with_label_values(&[cf_name])
903            .set(
904                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
905                    .unwrap_or(METRICS_ERROR),
906            );
907        db_metrics
908            .cf_metrics
909            .rocksdb_actual_delayed_write_rate
910            .with_label_values(&[cf_name])
911            .set(
912                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
913                    .unwrap_or(METRICS_ERROR),
914            );
915        db_metrics
916            .cf_metrics
917            .rocksdb_is_write_stopped
918            .with_label_values(&[cf_name])
919            .set(
920                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
921                    .unwrap_or(METRICS_ERROR),
922            );
923        db_metrics
924            .cf_metrics
925            .rocksdb_block_cache_capacity
926            .with_label_values(&[cf_name])
927            .set(
928                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
929                    .unwrap_or(METRICS_ERROR),
930            );
931        db_metrics
932            .cf_metrics
933            .rocksdb_block_cache_usage
934            .with_label_values(&[cf_name])
935            .set(
936                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
937                    .unwrap_or(METRICS_ERROR),
938            );
939        db_metrics
940            .cf_metrics
941            .rocksdb_block_cache_pinned_usage
942            .with_label_values(&[cf_name])
943            .set(
944                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
945                    .unwrap_or(METRICS_ERROR),
946            );
947        db_metrics
948            .cf_metrics
949            .rocksdb_estimate_table_readers_mem
950            .with_label_values(&[cf_name])
951            .set(
952                Self::get_rocksdb_int_property(
953                    rocksdb,
954                    &cf,
955                    properties::ESTIMATE_TABLE_READERS_MEM,
956                )
957                .unwrap_or(METRICS_ERROR),
958            );
959        db_metrics
960            .cf_metrics
961            .rocksdb_estimated_num_keys
962            .with_label_values(&[cf_name])
963            .set(
964                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
965                    .unwrap_or(METRICS_ERROR),
966            );
967        db_metrics
968            .cf_metrics
969            .rocksdb_num_immutable_mem_tables
970            .with_label_values(&[cf_name])
971            .set(
972                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
973                    .unwrap_or(METRICS_ERROR),
974            );
975        db_metrics
976            .cf_metrics
977            .rocksdb_mem_table_flush_pending
978            .with_label_values(&[cf_name])
979            .set(
980                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
981                    .unwrap_or(METRICS_ERROR),
982            );
983        db_metrics
984            .cf_metrics
985            .rocksdb_compaction_pending
986            .with_label_values(&[cf_name])
987            .set(
988                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
989                    .unwrap_or(METRICS_ERROR),
990            );
991        db_metrics
992            .cf_metrics
993            .rocksdb_estimate_pending_compaction_bytes
994            .with_label_values(&[cf_name])
995            .set(
996                Self::get_rocksdb_int_property(
997                    rocksdb,
998                    &cf,
999                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1000                )
1001                .unwrap_or(METRICS_ERROR),
1002            );
1003        db_metrics
1004            .cf_metrics
1005            .rocksdb_num_running_compactions
1006            .with_label_values(&[cf_name])
1007            .set(
1008                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1009                    .unwrap_or(METRICS_ERROR),
1010            );
1011        db_metrics
1012            .cf_metrics
1013            .rocksdb_num_running_flushes
1014            .with_label_values(&[cf_name])
1015            .set(
1016                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1017                    .unwrap_or(METRICS_ERROR),
1018            );
1019        db_metrics
1020            .cf_metrics
1021            .rocksdb_estimate_oldest_key_time
1022            .with_label_values(&[cf_name])
1023            .set(
1024                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1025                    .unwrap_or(METRICS_ERROR),
1026            );
1027        db_metrics
1028            .cf_metrics
1029            .rocksdb_background_errors
1030            .with_label_values(&[cf_name])
1031            .set(
1032                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1033                    .unwrap_or(METRICS_ERROR),
1034            );
1035        db_metrics
1036            .cf_metrics
1037            .rocksdb_base_level
1038            .with_label_values(&[cf_name])
1039            .set(
1040                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1041                    .unwrap_or(METRICS_ERROR),
1042            );
1043    }
1044
1045    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1046        self.db.checkpoint(path)
1047    }
1048
1049    pub fn table_summary(&self) -> eyre::Result<TableSummary>
1050    where
1051        K: Serialize + DeserializeOwned,
1052        V: Serialize + DeserializeOwned,
1053    {
1054        let mut num_keys = 0;
1055        let mut key_bytes_total = 0;
1056        let mut value_bytes_total = 0;
1057        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1058        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1059        for item in self.safe_iter() {
1060            let (key, value) = item?;
1061            num_keys += 1;
1062            let key_len = be_fix_int_ser(key.borrow()).len();
1063            let value_len = bcs::to_bytes(value.borrow())?.len();
1064            key_bytes_total += key_len;
1065            value_bytes_total += value_len;
1066            key_hist.record(key_len as u64)?;
1067            value_hist.record(value_len as u64)?;
1068        }
1069        Ok(TableSummary {
1070            num_keys,
1071            key_bytes_total,
1072            value_bytes_total,
1073            key_hist,
1074            value_hist,
1075        })
1076    }
1077
1078    fn start_iter_timer(&self) -> HistogramTimer {
1079        self.db_metrics
1080            .op_metrics
1081            .rocksdb_iter_latency_seconds
1082            .with_label_values(&[&self.cf])
1083            .start_timer()
1084    }
1085
1086    // Creates metrics and context for tracking an iterator usage and performance.
1087    fn create_iter_context(
1088        &self,
1089    ) -> (
1090        Option<HistogramTimer>,
1091        Option<Histogram>,
1092        Option<Histogram>,
1093        Option<RocksDBPerfContext>,
1094    ) {
1095        let timer = self.start_iter_timer();
1096        let bytes_scanned = self
1097            .db_metrics
1098            .op_metrics
1099            .rocksdb_iter_bytes
1100            .with_label_values(&[&self.cf]);
1101        let keys_scanned = self
1102            .db_metrics
1103            .op_metrics
1104            .rocksdb_iter_keys
1105            .with_label_values(&[&self.cf]);
1106        let perf_ctx = if self.iter_sample_interval.sample() {
1107            Some(RocksDBPerfContext)
1108        } else {
1109            None
1110        };
1111        (
1112            Some(timer),
1113            Some(bytes_scanned),
1114            Some(keys_scanned),
1115            perf_ctx,
1116        )
1117    }
1118
1119    /// Creates a safe reversed iterator with optional bounds.
1120    /// Both upper bound and lower bound are included.
1121    #[allow(clippy::complexity)]
1122    pub fn reversed_safe_iter_with_bounds(
1123        &self,
1124        lower_bound: Option<K>,
1125        upper_bound: Option<K>,
1126    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1127    where
1128        K: Serialize + DeserializeOwned,
1129        V: Serialize + DeserializeOwned,
1130    {
1131        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1132            lower_bound
1133                .as_ref()
1134                .map(Bound::Included)
1135                .unwrap_or(Bound::Unbounded),
1136            upper_bound
1137                .as_ref()
1138                .map(Bound::Included)
1139                .unwrap_or(Bound::Unbounded),
1140        ));
1141        match &self.db.storage {
1142            Storage::Rocks(db) => {
1143                let readopts = rocks_util::apply_range_bounds(
1144                    self.opts.readopts(),
1145                    it_lower_bound,
1146                    it_upper_bound,
1147                );
1148                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1149                let db_iter = db
1150                    .underlying
1151                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1152                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1153                let iter = SafeIter::new(
1154                    self.cf.clone(),
1155                    db_iter,
1156                    _timer,
1157                    _perf_ctx,
1158                    bytes_scanned,
1159                    keys_scanned,
1160                    Some(self.db_metrics.clone()),
1161                );
1162                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1163            }
1164            Storage::InMemory(db) => {
1165                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1166            }
1167            #[cfg(tidehunter)]
1168            Storage::TideHunter(db) => match &self.column_family {
1169                ColumnFamily::TideHunter((ks, prefix)) => {
1170                    let mut iter = db.iterator(*ks);
1171                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1172                    iter.reverse();
1173                    Ok(Box::new(transform_th_iterator(
1174                        iter,
1175                        prefix,
1176                        self.start_iter_timer(),
1177                    )))
1178                }
1179                _ => unreachable!("storage backend invariant violation"),
1180            },
1181        }
1182    }
1183}
1184
1185pub enum StorageWriteBatch {
1186    Rocks(rocksdb::WriteBatch),
1187    InMemory(InMemoryBatch),
1188    #[cfg(tidehunter)]
1189    TideHunter(tidehunter::batch::WriteBatch),
1190}
1191
1192/// Flat-buffer entry header. All byte data (cf_name, key, value) is stored
1193/// contiguously in `StagedBatch::data`; this header records offsets and lengths
1194/// so that slices can be produced without any per-entry allocation.
1195struct EntryHeader {
1196    /// Byte offset into `StagedBatch::data` where this entry's data begins.
1197    offset: usize,
1198    cf_name_len: usize,
1199    key_len: usize,
1200    is_put: bool,
1201}
1202
1203/// A write batch that stores serialized operations in a flat byte buffer without
1204/// requiring a database reference. Can be replayed into a real `DBBatch` via
1205/// `DBBatch::concat`.
1206/// TOOD: this can be deleted when we upgrade rust-rocksdb, which supports iterating
1207/// over write batches.
1208#[derive(Default)]
1209pub struct StagedBatch {
1210    data: Vec<u8>,
1211    entries: Vec<EntryHeader>,
1212}
1213
1214impl StagedBatch {
1215    pub fn new() -> Self {
1216        Self {
1217            data: Vec::with_capacity(1024),
1218            entries: Vec::with_capacity(16),
1219        }
1220    }
1221
1222    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1223        &mut self,
1224        db: &DBMap<K, V>,
1225        new_vals: impl IntoIterator<Item = (J, U)>,
1226    ) -> Result<&mut Self, TypedStoreError> {
1227        let cf_name = db.cf_name();
1228        new_vals
1229            .into_iter()
1230            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1231                let offset = self.data.len();
1232                self.data.extend_from_slice(cf_name.as_bytes());
1233                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1234                bcs::serialize_into(&mut self.data, v.borrow())
1235                    .map_err(typed_store_err_from_bcs_err)?;
1236                self.entries.push(EntryHeader {
1237                    offset,
1238                    cf_name_len: cf_name.len(),
1239                    key_len,
1240                    is_put: true,
1241                });
1242                Ok(())
1243            })?;
1244        Ok(self)
1245    }
1246
1247    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1248        &mut self,
1249        db: &DBMap<K, V>,
1250        purged_vals: impl IntoIterator<Item = J>,
1251    ) -> Result<(), TypedStoreError> {
1252        let cf_name = db.cf_name();
1253        purged_vals
1254            .into_iter()
1255            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1256                let offset = self.data.len();
1257                self.data.extend_from_slice(cf_name.as_bytes());
1258                let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1259                self.entries.push(EntryHeader {
1260                    offset,
1261                    cf_name_len: cf_name.len(),
1262                    key_len,
1263                    is_put: false,
1264                });
1265                Ok(())
1266            })?;
1267        Ok(())
1268    }
1269
1270    pub fn size_in_bytes(&self) -> usize {
1271        self.data.len()
1272    }
1273}
1274
1275/// Provides a mutable struct to form a collection of database write operations, and execute them.
1276///
1277/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1278///  ie. they are all written or none is.
1279/// This is also true of operations across column families in the same database.
1280///
1281/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1282/// with each operation.
1283///
1284/// ```
1285/// use typed_store::rocks::*;
1286/// use tempfile::tempdir;
1287/// use typed_store::Map;
1288/// use typed_store::metrics::DBMetrics;
1289/// use prometheus::Registry;
1290/// use core::fmt::Error;
1291/// use std::sync::Arc;
1292///
1293/// #[tokio::main]
1294/// async fn main() -> Result<(), Error> {
1295/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1296///
1297/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1298///     .expect("Failed to open storage");
1299/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1300///
1301/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1302///     .expect("Failed to open storage");
1303/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1304///
1305/// let mut batch = db_cf_1.batch();
1306/// batch
1307///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1308///     .expect("Failed to batch insert")
1309///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1310///     .expect("Failed to batch insert");
1311///
1312/// let _ = batch.write().expect("Failed to execute batch");
1313/// for (k, v) in keys_vals_1 {
1314///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1315///     assert_eq!(Some(v), val);
1316/// }
1317///
1318/// for (k, v) in keys_vals_2 {
1319///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1320///     assert_eq!(Some(v), val);
1321/// }
1322/// Ok(())
1323/// }
1324/// ```
1325///
1326pub struct DBBatch {
1327    database: Arc<Database>,
1328    batch: StorageWriteBatch,
1329    db_metrics: Arc<DBMetrics>,
1330    write_sample_interval: SamplingInterval,
1331}
1332
1333impl DBBatch {
1334    /// Create a new batch associated with a DB reference.
1335    ///
1336    /// Use `open_cf` to get the DB reference or an existing open database.
1337    pub fn new(
1338        dbref: &Arc<Database>,
1339        batch: StorageWriteBatch,
1340        db_metrics: &Arc<DBMetrics>,
1341        write_sample_interval: &SamplingInterval,
1342    ) -> Self {
1343        DBBatch {
1344            database: dbref.clone(),
1345            batch,
1346            db_metrics: db_metrics.clone(),
1347            write_sample_interval: write_sample_interval.clone(),
1348        }
1349    }
1350
1351    /// Consume the batch and write its operations to the database
1352    #[instrument(level = "trace", skip_all, err)]
1353    pub fn write(self) -> Result<(), TypedStoreError> {
1354        let mut write_options = rocksdb::WriteOptions::default();
1355
1356        if write_sync_enabled() {
1357            write_options.set_sync(true);
1358        }
1359
1360        self.write_opt(write_options)
1361    }
1362
1363    /// Consume the batch and write its operations to the database with custom write options
1364    #[instrument(level = "trace", skip_all, err)]
1365    pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1366        let db_name = self.database.db_name();
1367        let timer = self
1368            .db_metrics
1369            .op_metrics
1370            .rocksdb_batch_commit_latency_seconds
1371            .with_label_values(&[&db_name])
1372            .start_timer();
1373        let batch_size = self.size_in_bytes();
1374
1375        let perf_ctx = if self.write_sample_interval.sample() {
1376            Some(RocksDBPerfContext)
1377        } else {
1378            None
1379        };
1380
1381        self.database
1382            .write_opt_internal(self.batch, &write_options)?;
1383
1384        self.db_metrics
1385            .op_metrics
1386            .rocksdb_batch_commit_bytes
1387            .with_label_values(&[&db_name])
1388            .observe(batch_size as f64);
1389
1390        if perf_ctx.is_some() {
1391            self.db_metrics
1392                .write_perf_ctx_metrics
1393                .report_metrics(&db_name);
1394        }
1395        let elapsed = timer.stop_and_record();
1396        if elapsed > 1.0 {
1397            warn!(?elapsed, ?db_name, "very slow batch write");
1398            self.db_metrics
1399                .op_metrics
1400                .rocksdb_very_slow_batch_writes_count
1401                .with_label_values(&[&db_name])
1402                .inc();
1403            self.db_metrics
1404                .op_metrics
1405                .rocksdb_very_slow_batch_writes_duration_ms
1406                .with_label_values(&[&db_name])
1407                .inc_by((elapsed * 1000.0) as u64);
1408        }
1409        Ok(())
1410    }
1411
1412    pub fn size_in_bytes(&self) -> usize {
1413        match self.batch {
1414            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1415            StorageWriteBatch::InMemory(_) => 0,
1416            // TODO: implement size_in_bytes method
1417            #[cfg(tidehunter)]
1418            StorageWriteBatch::TideHunter(_) => 0,
1419        }
1420    }
1421
1422    /// Replay all operations from `StagedBatch`es into this batch.
1423    pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1424        for raw_batch in raw_batches {
1425            let data = &raw_batch.data;
1426            for (i, hdr) in raw_batch.entries.iter().enumerate() {
1427                let end = raw_batch
1428                    .entries
1429                    .get(i + 1)
1430                    .map_or(data.len(), |next| next.offset);
1431                let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1432                let key_start = hdr.offset + hdr.cf_name_len;
1433                let key = &data[key_start..key_start + hdr.key_len];
1434                // Safety: cf_name was written from &str bytes in insert_batch / delete_batch.
1435                let cf_name = std::str::from_utf8(cf_bytes)
1436                    .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1437
1438                if hdr.is_put {
1439                    let value = &data[key_start + hdr.key_len..end];
1440                    match &mut self.batch {
1441                        StorageWriteBatch::Rocks(b) => {
1442                            b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1443                        }
1444                        StorageWriteBatch::InMemory(b) => {
1445                            b.put_cf(cf_name, key, value);
1446                        }
1447                        #[cfg(tidehunter)]
1448                        _ => {
1449                            return Err(TypedStoreError::RocksDBError(
1450                                "concat not supported for TideHunter".to_string(),
1451                            ));
1452                        }
1453                    }
1454                } else {
1455                    match &mut self.batch {
1456                        StorageWriteBatch::Rocks(b) => {
1457                            b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1458                        }
1459                        StorageWriteBatch::InMemory(b) => {
1460                            b.delete_cf(cf_name, key);
1461                        }
1462                        #[cfg(tidehunter)]
1463                        _ => {
1464                            return Err(TypedStoreError::RocksDBError(
1465                                "concat not supported for TideHunter".to_string(),
1466                            ));
1467                        }
1468                    }
1469                }
1470            }
1471        }
1472        Ok(self)
1473    }
1474
1475    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1476        &mut self,
1477        db: &DBMap<K, V>,
1478        purged_vals: impl IntoIterator<Item = J>,
1479    ) -> Result<(), TypedStoreError> {
1480        if !Arc::ptr_eq(&db.db, &self.database) {
1481            return Err(TypedStoreError::CrossDBBatch);
1482        }
1483
1484        purged_vals
1485            .into_iter()
1486            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1487                let k_buf = be_fix_int_ser(k.borrow());
1488                match (&mut self.batch, &db.column_family) {
1489                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1490                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1491                    }
1492                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1493                        b.delete_cf(name, k_buf)
1494                    }
1495                    #[cfg(tidehunter)]
1496                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1497                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1498                    }
1499                    _ => Err(TypedStoreError::RocksDBError(
1500                        "typed store invariant violation".to_string(),
1501                    ))?,
1502                }
1503                Ok(())
1504            })?;
1505        Ok(())
1506    }
1507
1508    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1509    /// by writing a range delete tombstone in the db map
1510    /// If the DBMap is configured with ignore_range_deletions set to false,
1511    /// the effect of this write will be visible immediately i.e. you won't
1512    /// see old values when you do a lookup or scan. But if it is configured
1513    /// with ignore_range_deletions set to true, the old value are visible until
1514    /// compaction actually deletes them which will happen sometime after. By
1515    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1516    /// overridden in the config), so please use this function with caution
1517    pub fn schedule_delete_range<K: Serialize, V>(
1518        &mut self,
1519        db: &DBMap<K, V>,
1520        from: &K,
1521        to: &K,
1522    ) -> Result<(), TypedStoreError> {
1523        if !Arc::ptr_eq(&db.db, &self.database) {
1524            return Err(TypedStoreError::CrossDBBatch);
1525        }
1526
1527        let from_buf = be_fix_int_ser(from);
1528        let to_buf = be_fix_int_ser(to);
1529
1530        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1531            b.delete_range_cf(
1532                &rocks_cf_from_db(&self.database, db.cf_name())?,
1533                from_buf,
1534                to_buf,
1535            );
1536        }
1537        Ok(())
1538    }
1539
1540    /// inserts a range of (key, value) pairs given as an iterator
1541    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1542        &mut self,
1543        db: &DBMap<K, V>,
1544        new_vals: impl IntoIterator<Item = (J, U)>,
1545    ) -> Result<&mut Self, TypedStoreError> {
1546        if !Arc::ptr_eq(&db.db, &self.database) {
1547            return Err(TypedStoreError::CrossDBBatch);
1548        }
1549        let mut total = 0usize;
1550        new_vals
1551            .into_iter()
1552            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1553                let k_buf = be_fix_int_ser(k.borrow());
1554                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1555                total += k_buf.len() + v_buf.len();
1556                if db.opts.log_value_hash {
1557                    let key_hash = default_hash(&k_buf);
1558                    let value_hash = default_hash(&v_buf);
1559                    debug!(
1560                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1561                        db.cf_name(),
1562                        key_hash,
1563                        value_hash
1564                    );
1565                }
1566                match (&mut self.batch, &db.column_family) {
1567                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1568                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1569                    }
1570                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1571                        b.put_cf(name, k_buf, v_buf)
1572                    }
1573                    #[cfg(tidehunter)]
1574                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1575                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1576                    }
1577                    _ => Err(TypedStoreError::RocksDBError(
1578                        "typed store invariant violation".to_string(),
1579                    ))?,
1580                }
1581                Ok(())
1582            })?;
1583        self.db_metrics
1584            .op_metrics
1585            .rocksdb_batch_put_bytes
1586            .with_label_values(&[&db.cf])
1587            .observe(total as f64);
1588        Ok(self)
1589    }
1590
1591    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1592        &mut self,
1593        db: &DBMap<K, V>,
1594        new_vals: impl IntoIterator<Item = (J, U)>,
1595    ) -> Result<&mut Self, TypedStoreError> {
1596        if !Arc::ptr_eq(&db.db, &self.database) {
1597            return Err(TypedStoreError::CrossDBBatch);
1598        }
1599        new_vals
1600            .into_iter()
1601            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1602                let k_buf = be_fix_int_ser(k.borrow());
1603                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1604                match &mut self.batch {
1605                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1606                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1607                        k_buf,
1608                        v_buf,
1609                    ),
1610                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1611                }
1612                Ok(())
1613            })?;
1614        Ok(self)
1615    }
1616}
1617
1618impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1619where
1620    K: Serialize + DeserializeOwned,
1621    V: Serialize + DeserializeOwned,
1622{
1623    type Error = TypedStoreError;
1624
1625    #[instrument(level = "trace", skip_all, err)]
1626    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1627        let key_buf = be_fix_int_ser(key);
1628        let readopts = self.opts.readopts();
1629        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1630            && self
1631                .db
1632                .get(&self.column_family, &key_buf, &readopts)?
1633                .is_some())
1634    }
1635
1636    #[instrument(level = "trace", skip_all, err)]
1637    fn multi_contains_keys<J>(
1638        &self,
1639        keys: impl IntoIterator<Item = J>,
1640    ) -> Result<Vec<bool>, Self::Error>
1641    where
1642        J: Borrow<K>,
1643    {
1644        let values = self.multi_get_pinned(keys)?;
1645        Ok(values.into_iter().map(|v| v.is_some()).collect())
1646    }
1647
1648    #[instrument(level = "trace", skip_all, err)]
1649    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1650        let _timer = self
1651            .db_metrics
1652            .op_metrics
1653            .rocksdb_get_latency_seconds
1654            .with_label_values(&[&self.cf])
1655            .start_timer();
1656        let perf_ctx = if self.get_sample_interval.sample() {
1657            Some(RocksDBPerfContext)
1658        } else {
1659            None
1660        };
1661        let key_buf = be_fix_int_ser(key);
1662        let res = self
1663            .db
1664            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1665        self.db_metrics
1666            .op_metrics
1667            .rocksdb_get_bytes
1668            .with_label_values(&[&self.cf])
1669            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1670        if perf_ctx.is_some() {
1671            self.db_metrics
1672                .read_perf_ctx_metrics
1673                .report_metrics(&self.cf);
1674        }
1675        match res {
1676            Some(data) => {
1677                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1678                if value.is_err() {
1679                    let key_hash = default_hash(&key_buf);
1680                    let value_hash = default_hash(&data);
1681                    debug_fatal!(
1682                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1683                        self.cf_name(),
1684                        key_hash,
1685                        value_hash,
1686                        value.as_ref().err().unwrap()
1687                    );
1688                }
1689                Ok(Some(value?))
1690            }
1691            None => Ok(None),
1692        }
1693    }
1694
1695    #[instrument(level = "trace", skip_all, err)]
1696    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1697        let timer = self
1698            .db_metrics
1699            .op_metrics
1700            .rocksdb_put_latency_seconds
1701            .with_label_values(&[&self.cf])
1702            .start_timer();
1703        let perf_ctx = if self.write_sample_interval.sample() {
1704            Some(RocksDBPerfContext)
1705        } else {
1706            None
1707        };
1708        let key_buf = be_fix_int_ser(key);
1709        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1710        self.db_metrics
1711            .op_metrics
1712            .rocksdb_put_bytes
1713            .with_label_values(&[&self.cf])
1714            .observe((key_buf.len() + value_buf.len()) as f64);
1715        if perf_ctx.is_some() {
1716            self.db_metrics
1717                .write_perf_ctx_metrics
1718                .report_metrics(&self.cf);
1719        }
1720        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1721
1722        let elapsed = timer.stop_and_record();
1723        if elapsed > 1.0 {
1724            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1725            self.db_metrics
1726                .op_metrics
1727                .rocksdb_very_slow_puts_count
1728                .with_label_values(&[&self.cf])
1729                .inc();
1730            self.db_metrics
1731                .op_metrics
1732                .rocksdb_very_slow_puts_duration_ms
1733                .with_label_values(&[&self.cf])
1734                .inc_by((elapsed * 1000.0) as u64);
1735        }
1736
1737        Ok(())
1738    }
1739
1740    #[instrument(level = "trace", skip_all, err)]
1741    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1742        let _timer = self
1743            .db_metrics
1744            .op_metrics
1745            .rocksdb_delete_latency_seconds
1746            .with_label_values(&[&self.cf])
1747            .start_timer();
1748        let perf_ctx = if self.write_sample_interval.sample() {
1749            Some(RocksDBPerfContext)
1750        } else {
1751            None
1752        };
1753        let key_buf = be_fix_int_ser(key);
1754        self.db.delete_cf(&self.column_family, key_buf)?;
1755        self.db_metrics
1756            .op_metrics
1757            .rocksdb_deletes
1758            .with_label_values(&[&self.cf])
1759            .inc();
1760        if perf_ctx.is_some() {
1761            self.db_metrics
1762                .write_perf_ctx_metrics
1763                .report_metrics(&self.cf);
1764        }
1765        Ok(())
1766    }
1767
1768    /// Writes a range delete tombstone to delete all entries in the db map
1769    /// If the DBMap is configured with ignore_range_deletions set to false,
1770    /// the effect of this write will be visible immediately i.e. you won't
1771    /// see old values when you do a lookup or scan. But if it is configured
1772    /// with ignore_range_deletions set to true, the old value are visible until
1773    /// compaction actually deletes them which will happen sometime after. By
1774    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1775    /// overridden in the config), so please use this function with caution
1776    #[instrument(level = "trace", skip_all, err)]
1777    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1778        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1779        let last_key = self
1780            .reversed_safe_iter_with_bounds(None, None)?
1781            .next()
1782            .transpose()?
1783            .map(|(k, _v)| k);
1784        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1785            let mut batch = self.batch();
1786            batch.schedule_delete_range(self, &first_key, &last_key)?;
1787            batch.write()?;
1788        }
1789        Ok(())
1790    }
1791
1792    fn is_empty(&self) -> bool {
1793        self.safe_iter().next().is_none()
1794    }
1795
1796    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1797        match &self.db.storage {
1798            Storage::Rocks(db) => {
1799                let db_iter = db
1800                    .underlying
1801                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1802                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1803                Box::new(SafeIter::new(
1804                    self.cf.clone(),
1805                    db_iter,
1806                    _timer,
1807                    _perf_ctx,
1808                    bytes_scanned,
1809                    keys_scanned,
1810                    Some(self.db_metrics.clone()),
1811                ))
1812            }
1813            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1814            #[cfg(tidehunter)]
1815            Storage::TideHunter(db) => match &self.column_family {
1816                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1817                    db.iterator(*ks),
1818                    prefix,
1819                    self.start_iter_timer(),
1820                )),
1821                _ => unreachable!("storage backend invariant violation"),
1822            },
1823        }
1824    }
1825
1826    fn safe_iter_with_bounds(
1827        &'a self,
1828        lower_bound: Option<K>,
1829        upper_bound: Option<K>,
1830    ) -> DbIterator<'a, (K, V)> {
1831        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1832        match &self.db.storage {
1833            Storage::Rocks(db) => {
1834                let readopts =
1835                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1836                let db_iter = db
1837                    .underlying
1838                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1839                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1840                Box::new(SafeIter::new(
1841                    self.cf.clone(),
1842                    db_iter,
1843                    _timer,
1844                    _perf_ctx,
1845                    bytes_scanned,
1846                    keys_scanned,
1847                    Some(self.db_metrics.clone()),
1848                ))
1849            }
1850            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1851            #[cfg(tidehunter)]
1852            Storage::TideHunter(db) => match &self.column_family {
1853                ColumnFamily::TideHunter((ks, prefix)) => {
1854                    let mut iter = db.iterator(*ks);
1855                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1856                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1857                }
1858                _ => unreachable!("storage backend invariant violation"),
1859            },
1860        }
1861    }
1862
1863    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1864        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1865        match &self.db.storage {
1866            Storage::Rocks(db) => {
1867                let readopts =
1868                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1869                let db_iter = db
1870                    .underlying
1871                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1872                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1873                Box::new(SafeIter::new(
1874                    self.cf.clone(),
1875                    db_iter,
1876                    _timer,
1877                    _perf_ctx,
1878                    bytes_scanned,
1879                    keys_scanned,
1880                    Some(self.db_metrics.clone()),
1881                ))
1882            }
1883            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1884            #[cfg(tidehunter)]
1885            Storage::TideHunter(db) => match &self.column_family {
1886                ColumnFamily::TideHunter((ks, prefix)) => {
1887                    let mut iter = db.iterator(*ks);
1888                    apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1889                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1890                }
1891                _ => unreachable!("storage backend invariant violation"),
1892            },
1893        }
1894    }
1895
1896    /// Returns a vector of values corresponding to the keys provided.
1897    #[instrument(level = "trace", skip_all, err)]
1898    fn multi_get<J>(
1899        &self,
1900        keys: impl IntoIterator<Item = J>,
1901    ) -> Result<Vec<Option<V>>, TypedStoreError>
1902    where
1903        J: Borrow<K>,
1904    {
1905        let results = self.multi_get_pinned(keys)?;
1906        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1907            .into_iter()
1908            .map(|value_byte| match value_byte {
1909                Some(data) => Ok(Some(
1910                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1911                )),
1912                None => Ok(None),
1913            })
1914            .collect();
1915
1916        values_parsed
1917    }
1918
1919    /// Convenience method for batch insertion
1920    #[instrument(level = "trace", skip_all, err)]
1921    fn multi_insert<J, U>(
1922        &self,
1923        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1924    ) -> Result<(), Self::Error>
1925    where
1926        J: Borrow<K>,
1927        U: Borrow<V>,
1928    {
1929        let mut batch = self.batch();
1930        batch.insert_batch(self, key_val_pairs)?;
1931        batch.write()
1932    }
1933
1934    /// Convenience method for batch removal
1935    #[instrument(level = "trace", skip_all, err)]
1936    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1937    where
1938        J: Borrow<K>,
1939    {
1940        let mut batch = self.batch();
1941        batch.delete_batch(self, keys)?;
1942        batch.write()
1943    }
1944
1945    /// Try to catch up with primary when running as secondary
1946    #[instrument(level = "trace", skip_all, err)]
1947    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1948        if let Storage::Rocks(rocks) = &self.db.storage {
1949            rocks
1950                .underlying
1951                .try_catch_up_with_primary()
1952                .map_err(typed_store_err_from_rocks_err)?;
1953        }
1954        Ok(())
1955    }
1956}
1957
1958/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1959#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1960pub fn open_cf_opts<P: AsRef<Path>>(
1961    path: P,
1962    db_options: Option<rocksdb::Options>,
1963    metric_conf: MetricConf,
1964    opt_cfs: &[(&str, rocksdb::Options)],
1965) -> Result<Arc<Database>, TypedStoreError> {
1966    let path = path.as_ref();
1967    ensure_database_type(path, StorageType::Rocks)
1968        .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1969    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1970    // because rocksdb uses the simulated clock when creating its background threads, but then
1971    // those threads see the real wall clock (because they are not the test thread), which causes
1972    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1973    // resolves the issue.
1974    //
1975    // This is a no-op in non-simulator builds.
1976
1977    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1978    nondeterministic!({
1979        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1980        options.create_if_missing(true);
1981        options.create_missing_column_families(true);
1982        let rocksdb = {
1983            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1984                &options,
1985                path,
1986                cfs.into_iter()
1987                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1988            )
1989            .map_err(typed_store_err_from_rocks_err)?
1990        };
1991        Ok(Arc::new(Database::new(
1992            Storage::Rocks(RocksDB {
1993                underlying: rocksdb,
1994            }),
1995            metric_conf,
1996            None,
1997        )))
1998    })
1999}
2000
2001/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
2002pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2003    primary_path: P,
2004    secondary_path: Option<P>,
2005    db_options: Option<rocksdb::Options>,
2006    metric_conf: MetricConf,
2007    opt_cfs: &[(&str, rocksdb::Options)],
2008) -> Result<Arc<Database>, TypedStoreError> {
2009    let primary_path = primary_path.as_ref();
2010    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2011    // See comment above for explanation of why nondeterministic is necessary here.
2012    nondeterministic!({
2013        // Customize database options
2014        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2015
2016        fdlimit::raise_fd_limit();
2017        // This is a requirement by RocksDB when opening as secondary
2018        options.set_max_open_files(-1);
2019
2020        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2021        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2022            .ok()
2023            .unwrap_or_default();
2024
2025        let default_db_options = default_db_options();
2026        // Add CFs not explicitly listed
2027        for cf_key in cfs.iter() {
2028            if !opt_cfs.contains_key(&cf_key[..]) {
2029                opt_cfs.insert(cf_key, default_db_options.options.clone());
2030            }
2031        }
2032
2033        let primary_path = primary_path.to_path_buf();
2034        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2035            let mut s = primary_path.clone();
2036            s.pop();
2037            s.push("SECONDARY");
2038            s.as_path().to_path_buf()
2039        });
2040
2041        ensure_database_type(&primary_path, StorageType::Rocks)
2042            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2043        ensure_database_type(&secondary_path, StorageType::Rocks)
2044            .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2045
2046        let rocksdb = {
2047            options.create_if_missing(true);
2048            options.create_missing_column_families(true);
2049            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2050                &options,
2051                &primary_path,
2052                &secondary_path,
2053                opt_cfs
2054                    .iter()
2055                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2056            )
2057            .map_err(typed_store_err_from_rocks_err)?;
2058            db.try_catch_up_with_primary()
2059                .map_err(typed_store_err_from_rocks_err)?;
2060            db
2061        };
2062        Ok(Arc::new(Database::new(
2063            Storage::Rocks(RocksDB {
2064                underlying: rocksdb,
2065            }),
2066            metric_conf,
2067            None,
2068        )))
2069    })
2070}
2071
2072// Drops a database if there is no other handle to it, with retries and timeout.
2073// Detects the storage variant (RocksDB vs. tidehunter) from the directory
2074// contents and dispatches to the matching cleanup. Both variants coexist in
2075// tidehunter builds — some stores (e.g. rpc-index) are pure RocksDB even when
2076// the tidehunter feature is enabled elsewhere in the binary.
2077pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2078    #[cfg(tidehunter)]
2079    if is_tidehunter_db(&path) {
2080        return safe_drop_tidehunter_db(path, timeout).await;
2081    }
2082    safe_drop_rocksdb(path, timeout).await
2083}
2084
2085async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2086    let mut backoff = backoff::ExponentialBackoff {
2087        max_elapsed_time: Some(timeout),
2088        ..Default::default()
2089    };
2090    loop {
2091        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2092            Ok(()) => return Ok(()),
2093            Err(err) => match backoff.next_backoff() {
2094                Some(duration) => tokio::time::sleep(duration).await,
2095                None => return Err(std::io::Error::other(err)),
2096            },
2097        }
2098    }
2099}
2100
2101#[cfg(tidehunter)]
2102fn is_tidehunter_db(path: &Path) -> bool {
2103    // `shape.yaml` is written by TideHunterDb::open and is the canonical marker
2104    // for a tidehunter DB directory; RocksDB never creates it.
2105    path.join("shape.yaml").exists()
2106}
2107
2108#[cfg(tidehunter)]
2109async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2110    let mut backoff = backoff::ExponentialBackoff {
2111        max_elapsed_time: Some(timeout),
2112        ..Default::default()
2113    };
2114    loop {
2115        match TideHunterDb::drop_db(&path) {
2116            Ok(()) => return Ok(()),
2117            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2118                match backoff.next_backoff() {
2119                    Some(duration) => tokio::time::sleep(duration).await,
2120                    None => {
2121                        warn!(
2122                            "Database at {:?} is still locked after timeout ({:?})",
2123                            path, timeout
2124                        );
2125                        return Err(err);
2126                    }
2127                }
2128            }
2129            Err(err) => return Err(err),
2130        }
2131    }
2132}
2133
2134fn populate_missing_cfs(
2135    input_cfs: &[(&str, rocksdb::Options)],
2136    path: &Path,
2137) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2138    let mut cfs = vec![];
2139    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2140    let existing_cfs =
2141        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2142            .ok()
2143            .unwrap_or_default();
2144
2145    for cf_name in existing_cfs {
2146        if !input_cf_index.contains(&cf_name[..]) {
2147            cfs.push((cf_name, rocksdb::Options::default()));
2148        }
2149    }
2150    cfs.extend(
2151        input_cfs
2152            .iter()
2153            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2154    );
2155    Ok(cfs)
2156}
2157
2158fn default_hash(value: &[u8]) -> Digest<32> {
2159    let mut hasher = fastcrypto::hash::Blake2b256::default();
2160    hasher.update(value);
2161    hasher.finalize()
2162}