typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{be_fix_int_ser, iterator_bounds, iterator_bounds_with_range};
20use crate::{DbIterator, TypedStoreError};
21use crate::{
22    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
23    traits::{Map, TableSummary},
24};
25use backoff::backoff::Backoff;
26use fastcrypto::hash::{Digest, HashFunction};
27use mysten_common::debug_fatal;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::properties::num_files_at_level;
30use rocksdb::{
31    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
32    properties,
33};
34use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
35use serde::{Serialize, de::DeserializeOwned};
36use std::ops::{Bound, Deref};
37use std::{
38    borrow::Borrow,
39    marker::PhantomData,
40    ops::RangeBounds,
41    path::{Path, PathBuf},
42    sync::Arc,
43    time::Duration,
44};
45use std::{collections::HashSet, ffi::CStr};
46use sui_macros::{fail_point, nondeterministic};
47#[cfg(tidehunter)]
48use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
49use tokio::sync::oneshot;
50use tracing::{debug, error, instrument, warn};
51
52// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
53// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
54const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
55    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
56
57#[cfg(test)]
58mod tests;
59
60#[derive(Debug)]
61pub struct RocksDB {
62    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
63}
64
65impl Drop for RocksDB {
66    fn drop(&mut self) {
67        self.underlying.cancel_all_background_work(/* wait */ true);
68    }
69}
70
71#[derive(Clone)]
72pub enum ColumnFamily {
73    Rocks(String),
74    InMemory(String),
75    #[cfg(tidehunter)]
76    TideHunter((KeySpace, Option<Vec<u8>>)),
77}
78
79impl std::fmt::Debug for ColumnFamily {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
83            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
84            #[cfg(tidehunter)]
85            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
86        }
87    }
88}
89
90impl ColumnFamily {
91    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
92        match &self {
93            ColumnFamily::Rocks(name) => rocks_db
94                .underlying
95                .cf_handle(name)
96                .expect("Map-keying column family should have been checked at DB creation"),
97            _ => unreachable!("invariant is checked by the caller"),
98        }
99    }
100}
101
102pub enum Storage {
103    Rocks(RocksDB),
104    InMemory(InMemoryDB),
105    #[cfg(tidehunter)]
106    TideHunter(Arc<TideHunterDb>),
107}
108
109impl std::fmt::Debug for Storage {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        match self {
112            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
113            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
114            #[cfg(tidehunter)]
115            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
116        }
117    }
118}
119
120#[derive(Debug)]
121pub struct Database {
122    storage: Storage,
123    metric_conf: MetricConf,
124}
125
126impl Drop for Database {
127    fn drop(&mut self) {
128        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
129    }
130}
131
132enum GetResult<'a> {
133    Rocks(DBPinnableSlice<'a>),
134    InMemory(Vec<u8>),
135    #[cfg(tidehunter)]
136    TideHunter(tidehunter::minibytes::Bytes),
137}
138
139impl Deref for GetResult<'_> {
140    type Target = [u8];
141    fn deref(&self) -> &[u8] {
142        match self {
143            GetResult::Rocks(d) => d.deref(),
144            GetResult::InMemory(d) => d.deref(),
145            #[cfg(tidehunter)]
146            GetResult::TideHunter(d) => d.deref(),
147        }
148    }
149}
150
151impl Database {
152    pub fn new(storage: Storage, metric_conf: MetricConf) -> Self {
153        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154        Self {
155            storage,
156            metric_conf,
157        }
158    }
159
160    /// Flush all memtables to SST files on disk.
161    pub fn flush(&self) -> Result<(), TypedStoreError> {
162        match &self.storage {
163            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
164                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
165            }),
166            Storage::InMemory(_) => {
167                // InMemory databases don't need flushing
168                Ok(())
169            }
170            #[cfg(tidehunter)]
171            Storage::TideHunter(_) => {
172                // TideHunter doesn't support an explicit flush.
173                Ok(())
174            }
175        }
176    }
177
178    fn get<K: AsRef<[u8]>>(
179        &self,
180        cf: &ColumnFamily,
181        key: K,
182        readopts: &ReadOptions,
183    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
184        match (&self.storage, cf) {
185            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
186                .underlying
187                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
188                .map_err(typed_store_err_from_rocks_err)?
189                .map(GetResult::Rocks)),
190            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
191                Ok(db.get(cf_name, key).map(GetResult::InMemory))
192            }
193            #[cfg(tidehunter)]
194            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
195                .get(*ks, &transform_th_key(key.as_ref(), prefix))
196                .map_err(typed_store_error_from_th_error)?
197                .map(GetResult::TideHunter)),
198
199            _ => Err(TypedStoreError::RocksDBError(
200                "typed store invariant violation".to_string(),
201            )),
202        }
203    }
204
205    fn multi_get<I, K>(
206        &self,
207        cf: &ColumnFamily,
208        keys: I,
209        readopts: &ReadOptions,
210    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
211    where
212        I: IntoIterator<Item = K>,
213        K: AsRef<[u8]>,
214    {
215        match (&self.storage, cf) {
216            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
217                let keys_vec: Vec<K> = keys.into_iter().collect();
218                let res = db.underlying.batched_multi_get_cf_opt(
219                    &cf.rocks_cf(db),
220                    keys_vec.iter(),
221                    /* sorted_input */ false,
222                    readopts,
223                );
224                res.into_iter()
225                    .map(|r| {
226                        r.map_err(typed_store_err_from_rocks_err)
227                            .map(|item| item.map(GetResult::Rocks))
228                    })
229                    .collect()
230            }
231            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
232                .multi_get(cf_name, keys)
233                .into_iter()
234                .map(|r| Ok(r.map(GetResult::InMemory)))
235                .collect(),
236            #[cfg(tidehunter)]
237            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
238                let res = keys.into_iter().map(|k| {
239                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
240                        .map_err(typed_store_error_from_th_error)
241                });
242                res.into_iter()
243                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
244                    .collect()
245            }
246            _ => unreachable!("typed store invariant violation"),
247        }
248    }
249
250    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
251        match &self.storage {
252            Storage::Rocks(db) => db.underlying.drop_cf(name),
253            Storage::InMemory(db) => {
254                db.drop_cf(name);
255                Ok(())
256            }
257            #[cfg(tidehunter)]
258            Storage::TideHunter(_) => {
259                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
260            }
261        }
262    }
263
264    pub fn delete_file_in_range<K: AsRef<[u8]>>(
265        &self,
266        cf: &impl AsColumnFamilyRef,
267        from: K,
268        to: K,
269    ) -> Result<(), rocksdb::Error> {
270        match &self.storage {
271            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
272            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
273        }
274    }
275
276    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
277        fail_point!("delete-cf-before");
278        let ret = match (&self.storage, cf) {
279            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
280                .underlying
281                .delete_cf(&cf.rocks_cf(db), key)
282                .map_err(typed_store_err_from_rocks_err),
283            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
284                db.delete(cf_name, key.as_ref());
285                Ok(())
286            }
287            #[cfg(tidehunter)]
288            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
289                .remove(*ks, transform_th_key(key.as_ref(), prefix))
290                .map_err(typed_store_error_from_th_error),
291            _ => Err(TypedStoreError::RocksDBError(
292                "typed store invariant violation".to_string(),
293            )),
294        };
295        fail_point!("delete-cf-after");
296        #[allow(clippy::let_and_return)]
297        ret
298    }
299
300    pub fn path_for_pruning(&self) -> &Path {
301        match &self.storage {
302            Storage::Rocks(rocks) => rocks.underlying.path(),
303            _ => unimplemented!("method is only supported for rocksdb backend"),
304        }
305    }
306
307    fn put_cf(
308        &self,
309        cf: &ColumnFamily,
310        key: Vec<u8>,
311        value: Vec<u8>,
312    ) -> Result<(), TypedStoreError> {
313        fail_point!("put-cf-before");
314        let ret = match (&self.storage, cf) {
315            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
316                .underlying
317                .put_cf(&cf.rocks_cf(db), key, value)
318                .map_err(typed_store_err_from_rocks_err),
319            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
320                db.put(cf_name, key, value);
321                Ok(())
322            }
323            #[cfg(tidehunter)]
324            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
325                .insert(*ks, transform_th_key(&key, prefix), value)
326                .map_err(typed_store_error_from_th_error),
327            _ => Err(TypedStoreError::RocksDBError(
328                "typed store invariant violation".to_string(),
329            )),
330        };
331        fail_point!("put-cf-after");
332        #[allow(clippy::let_and_return)]
333        ret
334    }
335
336    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
337        &self,
338        cf_name: &str,
339        key: K,
340        readopts: &ReadOptions,
341    ) -> bool {
342        match &self.storage {
343            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
344            // but no false negatives. We use it to short-circuit the absent case
345            Storage::Rocks(rocks) => {
346                rocks
347                    .underlying
348                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
349            }
350            _ => true,
351        }
352    }
353
354    pub fn write(&self, batch: StorageWriteBatch) -> Result<(), TypedStoreError> {
355        self.write_opt(batch, &rocksdb::WriteOptions::default())
356    }
357
358    pub fn write_opt(
359        &self,
360        batch: StorageWriteBatch,
361        write_options: &rocksdb::WriteOptions,
362    ) -> Result<(), TypedStoreError> {
363        fail_point!("batch-write-before");
364        let ret = match (&self.storage, batch) {
365            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
366                .underlying
367                .write_opt(batch, write_options)
368                .map_err(typed_store_err_from_rocks_err),
369            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
370                // InMemory doesn't support write options
371                db.write(batch);
372                Ok(())
373            }
374            #[cfg(tidehunter)]
375            (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
376                // TideHunter doesn't support write options
377                db.write_batch(batch)
378                    .map_err(typed_store_error_from_th_error)
379            }
380            _ => Err(TypedStoreError::RocksDBError(
381                "using invalid batch type for the database".to_string(),
382            )),
383        };
384        fail_point!("batch-write-after");
385        #[allow(clippy::let_and_return)]
386        ret
387    }
388
389    #[cfg(tidehunter)]
390    pub fn start_relocation(&self) -> anyhow::Result<()> {
391        if let Storage::TideHunter(db) = &self.storage {
392            db.start_relocation()?;
393        }
394        Ok(())
395    }
396
397    #[cfg(tidehunter)]
398    pub fn drop_cells_in_range(
399        &self,
400        ks: KeySpace,
401        from_inclusive: &[u8],
402        to_inclusive: &[u8],
403    ) -> anyhow::Result<()> {
404        if let Storage::TideHunter(db) = &self.storage {
405            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
406                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
407        } else {
408            panic!("drop_cells_in_range called on non-TideHunter storage");
409        }
410        Ok(())
411    }
412
413    pub fn compact_range_cf<K: AsRef<[u8]>>(
414        &self,
415        cf_name: &str,
416        start: Option<K>,
417        end: Option<K>,
418    ) {
419        if let Storage::Rocks(rocksdb) = &self.storage {
420            rocksdb
421                .underlying
422                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
423        }
424    }
425
426    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
427        // TODO: implement for other storage types
428        if let Storage::Rocks(rocks) = &self.storage {
429            let checkpoint =
430                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
431            checkpoint
432                .create_checkpoint(path)
433                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
434        }
435        Ok(())
436    }
437
438    pub fn get_sampling_interval(&self) -> SamplingInterval {
439        self.metric_conf.read_sample_interval.new_from_self()
440    }
441
442    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
443        self.metric_conf.read_sample_interval.new_from_self()
444    }
445
446    pub fn write_sampling_interval(&self) -> SamplingInterval {
447        self.metric_conf.write_sample_interval.new_from_self()
448    }
449
450    pub fn iter_sampling_interval(&self) -> SamplingInterval {
451        self.metric_conf.iter_sample_interval.new_from_self()
452    }
453
454    fn db_name(&self) -> String {
455        let name = &self.metric_conf.db_name;
456        if name.is_empty() {
457            "default".to_string()
458        } else {
459            name.clone()
460        }
461    }
462
463    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
464        match &self.storage {
465            Storage::Rocks(rocks) => rocks.underlying.live_files(),
466            _ => Ok(vec![]),
467        }
468    }
469}
470
471fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
472    rocks_db
473        .underlying
474        .cf_handle(cf_name)
475        .expect("Map-keying column family should have been checked at DB creation")
476}
477
478fn rocks_cf_from_db<'a>(
479    db: &'a Database,
480    cf_name: &str,
481) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
482    match &db.storage {
483        Storage::Rocks(rocksdb) => Ok(rocksdb
484            .underlying
485            .cf_handle(cf_name)
486            .expect("Map-keying column family should have been checked at DB creation")),
487        _ => Err(TypedStoreError::RocksDBError(
488            "using invalid batch type for the database".to_string(),
489        )),
490    }
491}
492
493#[derive(Debug, Default)]
494pub struct MetricConf {
495    pub db_name: String,
496    pub read_sample_interval: SamplingInterval,
497    pub write_sample_interval: SamplingInterval,
498    pub iter_sample_interval: SamplingInterval,
499}
500
501impl MetricConf {
502    pub fn new(db_name: &str) -> Self {
503        if db_name.is_empty() {
504            error!("A meaningful db name should be used for metrics reporting.")
505        }
506        Self {
507            db_name: db_name.to_string(),
508            read_sample_interval: SamplingInterval::default(),
509            write_sample_interval: SamplingInterval::default(),
510            iter_sample_interval: SamplingInterval::default(),
511        }
512    }
513
514    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
515        Self {
516            db_name: self.db_name,
517            read_sample_interval: read_interval,
518            write_sample_interval: SamplingInterval::default(),
519            iter_sample_interval: SamplingInterval::default(),
520        }
521    }
522}
523const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
524const METRICS_ERROR: i64 = -1;
525
526/// An interface to a rocksDB database, keyed by a columnfamily
527#[derive(Clone, Debug)]
528pub struct DBMap<K, V> {
529    pub db: Arc<Database>,
530    _phantom: PhantomData<fn(K) -> V>,
531    column_family: ColumnFamily,
532    // the column family under which the map is stored
533    cf: String,
534    pub opts: ReadWriteOptions,
535    db_metrics: Arc<DBMetrics>,
536    get_sample_interval: SamplingInterval,
537    multiget_sample_interval: SamplingInterval,
538    write_sample_interval: SamplingInterval,
539    iter_sample_interval: SamplingInterval,
540    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
541}
542
543unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
544
545impl<K, V> DBMap<K, V> {
546    pub(crate) fn new(
547        db: Arc<Database>,
548        opts: &ReadWriteOptions,
549        opt_cf: &str,
550        column_family: ColumnFamily,
551        is_deprecated: bool,
552    ) -> Self {
553        let db_cloned = Arc::downgrade(&db.clone());
554        let db_metrics = DBMetrics::get();
555        let db_metrics_cloned = db_metrics.clone();
556        let cf = opt_cf.to_string();
557
558        let (sender, mut recv) = tokio::sync::oneshot::channel();
559        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
560            tokio::task::spawn(async move {
561                let mut interval =
562                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
563                loop {
564                    tokio::select! {
565                        _ = interval.tick() => {
566                            if let Some(db) = db_cloned.upgrade() {
567                                let cf = cf.clone();
568                                let db_metrics = db_metrics.clone();
569                                if let Err(e) = tokio::task::spawn_blocking(move || {
570                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
571                                }).await {
572                                    error!("Failed to log metrics with error: {}", e);
573                                }
574                            }
575                        }
576                        _ = &mut recv => break,
577                    }
578                }
579                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
580            });
581        }
582        DBMap {
583            db: db.clone(),
584            opts: opts.clone(),
585            _phantom: PhantomData,
586            column_family,
587            cf: opt_cf.to_string(),
588            db_metrics: db_metrics_cloned,
589            _metrics_task_cancel_handle: Arc::new(sender),
590            get_sample_interval: db.get_sampling_interval(),
591            multiget_sample_interval: db.multiget_sampling_interval(),
592            write_sample_interval: db.write_sampling_interval(),
593            iter_sample_interval: db.iter_sampling_interval(),
594        }
595    }
596
597    /// Reopens an open database as a typed map operating under a specific column family.
598    /// if no column family is passed, the default column family is used.
599    #[instrument(level = "debug", skip(db), err)]
600    pub fn reopen(
601        db: &Arc<Database>,
602        opt_cf: Option<&str>,
603        rw_options: &ReadWriteOptions,
604        is_deprecated: bool,
605    ) -> Result<Self, TypedStoreError> {
606        let cf_key = opt_cf
607            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
608            .to_owned();
609        Ok(DBMap::new(
610            db.clone(),
611            rw_options,
612            &cf_key,
613            ColumnFamily::Rocks(cf_key.to_string()),
614            is_deprecated,
615        ))
616    }
617
618    #[cfg(tidehunter)]
619    pub fn reopen_th(
620        db: Arc<Database>,
621        cf_name: &str,
622        ks: KeySpace,
623        prefix: Option<Vec<u8>>,
624    ) -> Self {
625        DBMap::new(
626            db,
627            &ReadWriteOptions::default(),
628            cf_name,
629            ColumnFamily::TideHunter((ks, prefix.clone())),
630            false,
631        )
632    }
633
634    pub fn cf_name(&self) -> &str {
635        &self.cf
636    }
637
638    pub fn batch(&self) -> DBBatch {
639        let batch = match &self.db.storage {
640            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
641            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
642            #[cfg(tidehunter)]
643            Storage::TideHunter(_) => {
644                StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
645            }
646        };
647        DBBatch::new(
648            &self.db,
649            batch,
650            &self.db_metrics,
651            &self.write_sample_interval,
652        )
653    }
654
655    pub fn flush(&self) -> Result<(), TypedStoreError> {
656        self.db.flush()
657    }
658
659    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
660        let from_buf = be_fix_int_ser(start);
661        let to_buf = be_fix_int_ser(end);
662        self.db
663            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
664        Ok(())
665    }
666
667    pub fn compact_range_raw(
668        &self,
669        cf_name: &str,
670        start: Vec<u8>,
671        end: Vec<u8>,
672    ) -> Result<(), TypedStoreError> {
673        self.db.compact_range_cf(cf_name, Some(start), Some(end));
674        Ok(())
675    }
676
677    #[cfg(tidehunter)]
678    pub fn drop_cells_in_range<J: Serialize>(
679        &self,
680        from_inclusive: &J,
681        to_inclusive: &J,
682    ) -> Result<(), TypedStoreError>
683    where
684        K: Serialize,
685    {
686        let from_buf = be_fix_int_ser(from_inclusive);
687        let to_buf = be_fix_int_ser(to_inclusive);
688        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
689            self.db
690                .drop_cells_in_range(*ks, &from_buf, &to_buf)
691                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
692        }
693        Ok(())
694    }
695
696    /// Returns a vector of raw values corresponding to the keys provided.
697    fn multi_get_pinned<J>(
698        &self,
699        keys: impl IntoIterator<Item = J>,
700    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
701    where
702        J: Borrow<K>,
703        K: Serialize,
704    {
705        let _timer = self
706            .db_metrics
707            .op_metrics
708            .rocksdb_multiget_latency_seconds
709            .with_label_values(&[&self.cf])
710            .start_timer();
711        let perf_ctx = if self.multiget_sample_interval.sample() {
712            Some(RocksDBPerfContext)
713        } else {
714            None
715        };
716        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
717        let results: Result<Vec<_>, TypedStoreError> = self
718            .db
719            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
720            .into_iter()
721            .collect();
722        let entries = results?;
723        let entry_size = entries
724            .iter()
725            .flatten()
726            .map(|entry| entry.len())
727            .sum::<usize>();
728        self.db_metrics
729            .op_metrics
730            .rocksdb_multiget_bytes
731            .with_label_values(&[&self.cf])
732            .observe(entry_size as f64);
733        if perf_ctx.is_some() {
734            self.db_metrics
735                .read_perf_ctx_metrics
736                .report_metrics(&self.cf);
737        }
738        Ok(entries)
739    }
740
741    fn get_rocksdb_int_property(
742        rocksdb: &RocksDB,
743        cf: &impl AsColumnFamilyRef,
744        property_name: &std::ffi::CStr,
745    ) -> Result<i64, TypedStoreError> {
746        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
747            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
748            Ok(None) => Ok(0),
749            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
750        }
751    }
752
753    fn report_rocksdb_metrics(
754        database: &Arc<Database>,
755        cf_name: &str,
756        db_metrics: &Arc<DBMetrics>,
757    ) {
758        let Storage::Rocks(rocksdb) = &database.storage else {
759            return;
760        };
761
762        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
763            tracing::warn!(
764                "unable to report metrics for cf {cf_name:?} in db {:?}",
765                database.db_name()
766            );
767            return;
768        };
769
770        db_metrics
771            .cf_metrics
772            .rocksdb_total_sst_files_size
773            .with_label_values(&[cf_name])
774            .set(
775                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
776                    .unwrap_or(METRICS_ERROR),
777            );
778        db_metrics
779            .cf_metrics
780            .rocksdb_total_blob_files_size
781            .with_label_values(&[cf_name])
782            .set(
783                Self::get_rocksdb_int_property(
784                    rocksdb,
785                    &cf,
786                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
787                )
788                .unwrap_or(METRICS_ERROR),
789            );
790        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
791        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
792        let total_num_files: i64 = (0..=6)
793            .map(|level| {
794                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
795                    .unwrap_or(METRICS_ERROR)
796            })
797            .sum();
798        db_metrics
799            .cf_metrics
800            .rocksdb_total_num_files
801            .with_label_values(&[cf_name])
802            .set(total_num_files);
803        db_metrics
804            .cf_metrics
805            .rocksdb_num_level0_files
806            .with_label_values(&[cf_name])
807            .set(
808                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
809                    .unwrap_or(METRICS_ERROR),
810            );
811        db_metrics
812            .cf_metrics
813            .rocksdb_current_size_active_mem_tables
814            .with_label_values(&[cf_name])
815            .set(
816                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
817                    .unwrap_or(METRICS_ERROR),
818            );
819        db_metrics
820            .cf_metrics
821            .rocksdb_size_all_mem_tables
822            .with_label_values(&[cf_name])
823            .set(
824                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
825                    .unwrap_or(METRICS_ERROR),
826            );
827        db_metrics
828            .cf_metrics
829            .rocksdb_num_snapshots
830            .with_label_values(&[cf_name])
831            .set(
832                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
833                    .unwrap_or(METRICS_ERROR),
834            );
835        db_metrics
836            .cf_metrics
837            .rocksdb_oldest_snapshot_time
838            .with_label_values(&[cf_name])
839            .set(
840                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
841                    .unwrap_or(METRICS_ERROR),
842            );
843        db_metrics
844            .cf_metrics
845            .rocksdb_actual_delayed_write_rate
846            .with_label_values(&[cf_name])
847            .set(
848                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
849                    .unwrap_or(METRICS_ERROR),
850            );
851        db_metrics
852            .cf_metrics
853            .rocksdb_is_write_stopped
854            .with_label_values(&[cf_name])
855            .set(
856                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
857                    .unwrap_or(METRICS_ERROR),
858            );
859        db_metrics
860            .cf_metrics
861            .rocksdb_block_cache_capacity
862            .with_label_values(&[cf_name])
863            .set(
864                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
865                    .unwrap_or(METRICS_ERROR),
866            );
867        db_metrics
868            .cf_metrics
869            .rocksdb_block_cache_usage
870            .with_label_values(&[cf_name])
871            .set(
872                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
873                    .unwrap_or(METRICS_ERROR),
874            );
875        db_metrics
876            .cf_metrics
877            .rocksdb_block_cache_pinned_usage
878            .with_label_values(&[cf_name])
879            .set(
880                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
881                    .unwrap_or(METRICS_ERROR),
882            );
883        db_metrics
884            .cf_metrics
885            .rocksdb_estimate_table_readers_mem
886            .with_label_values(&[cf_name])
887            .set(
888                Self::get_rocksdb_int_property(
889                    rocksdb,
890                    &cf,
891                    properties::ESTIMATE_TABLE_READERS_MEM,
892                )
893                .unwrap_or(METRICS_ERROR),
894            );
895        db_metrics
896            .cf_metrics
897            .rocksdb_estimated_num_keys
898            .with_label_values(&[cf_name])
899            .set(
900                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
901                    .unwrap_or(METRICS_ERROR),
902            );
903        db_metrics
904            .cf_metrics
905            .rocksdb_num_immutable_mem_tables
906            .with_label_values(&[cf_name])
907            .set(
908                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
909                    .unwrap_or(METRICS_ERROR),
910            );
911        db_metrics
912            .cf_metrics
913            .rocksdb_mem_table_flush_pending
914            .with_label_values(&[cf_name])
915            .set(
916                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
917                    .unwrap_or(METRICS_ERROR),
918            );
919        db_metrics
920            .cf_metrics
921            .rocksdb_compaction_pending
922            .with_label_values(&[cf_name])
923            .set(
924                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
925                    .unwrap_or(METRICS_ERROR),
926            );
927        db_metrics
928            .cf_metrics
929            .rocksdb_estimate_pending_compaction_bytes
930            .with_label_values(&[cf_name])
931            .set(
932                Self::get_rocksdb_int_property(
933                    rocksdb,
934                    &cf,
935                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
936                )
937                .unwrap_or(METRICS_ERROR),
938            );
939        db_metrics
940            .cf_metrics
941            .rocksdb_num_running_compactions
942            .with_label_values(&[cf_name])
943            .set(
944                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
945                    .unwrap_or(METRICS_ERROR),
946            );
947        db_metrics
948            .cf_metrics
949            .rocksdb_num_running_flushes
950            .with_label_values(&[cf_name])
951            .set(
952                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
953                    .unwrap_or(METRICS_ERROR),
954            );
955        db_metrics
956            .cf_metrics
957            .rocksdb_estimate_oldest_key_time
958            .with_label_values(&[cf_name])
959            .set(
960                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
961                    .unwrap_or(METRICS_ERROR),
962            );
963        db_metrics
964            .cf_metrics
965            .rocksdb_background_errors
966            .with_label_values(&[cf_name])
967            .set(
968                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
969                    .unwrap_or(METRICS_ERROR),
970            );
971        db_metrics
972            .cf_metrics
973            .rocksdb_base_level
974            .with_label_values(&[cf_name])
975            .set(
976                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
977                    .unwrap_or(METRICS_ERROR),
978            );
979    }
980
981    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
982        self.db.checkpoint(path)
983    }
984
985    pub fn table_summary(&self) -> eyre::Result<TableSummary>
986    where
987        K: Serialize + DeserializeOwned,
988        V: Serialize + DeserializeOwned,
989    {
990        let mut num_keys = 0;
991        let mut key_bytes_total = 0;
992        let mut value_bytes_total = 0;
993        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
994        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
995        for item in self.safe_iter() {
996            let (key, value) = item?;
997            num_keys += 1;
998            let key_len = be_fix_int_ser(key.borrow()).len();
999            let value_len = bcs::to_bytes(value.borrow())?.len();
1000            key_bytes_total += key_len;
1001            value_bytes_total += value_len;
1002            key_hist.record(key_len as u64)?;
1003            value_hist.record(value_len as u64)?;
1004        }
1005        Ok(TableSummary {
1006            num_keys,
1007            key_bytes_total,
1008            value_bytes_total,
1009            key_hist,
1010            value_hist,
1011        })
1012    }
1013
1014    fn start_iter_timer(&self) -> HistogramTimer {
1015        self.db_metrics
1016            .op_metrics
1017            .rocksdb_iter_latency_seconds
1018            .with_label_values(&[&self.cf])
1019            .start_timer()
1020    }
1021
1022    // Creates metrics and context for tracking an iterator usage and performance.
1023    fn create_iter_context(
1024        &self,
1025    ) -> (
1026        Option<HistogramTimer>,
1027        Option<Histogram>,
1028        Option<Histogram>,
1029        Option<RocksDBPerfContext>,
1030    ) {
1031        let timer = self.start_iter_timer();
1032        let bytes_scanned = self
1033            .db_metrics
1034            .op_metrics
1035            .rocksdb_iter_bytes
1036            .with_label_values(&[&self.cf]);
1037        let keys_scanned = self
1038            .db_metrics
1039            .op_metrics
1040            .rocksdb_iter_keys
1041            .with_label_values(&[&self.cf]);
1042        let perf_ctx = if self.iter_sample_interval.sample() {
1043            Some(RocksDBPerfContext)
1044        } else {
1045            None
1046        };
1047        (
1048            Some(timer),
1049            Some(bytes_scanned),
1050            Some(keys_scanned),
1051            perf_ctx,
1052        )
1053    }
1054
1055    /// Creates a safe reversed iterator with optional bounds.
1056    /// Both upper bound and lower bound are included.
1057    #[allow(clippy::complexity)]
1058    pub fn reversed_safe_iter_with_bounds(
1059        &self,
1060        lower_bound: Option<K>,
1061        upper_bound: Option<K>,
1062    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1063    where
1064        K: Serialize + DeserializeOwned,
1065        V: Serialize + DeserializeOwned,
1066    {
1067        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1068            lower_bound
1069                .as_ref()
1070                .map(Bound::Included)
1071                .unwrap_or(Bound::Unbounded),
1072            upper_bound
1073                .as_ref()
1074                .map(Bound::Included)
1075                .unwrap_or(Bound::Unbounded),
1076        ));
1077        match &self.db.storage {
1078            Storage::Rocks(db) => {
1079                let readopts = rocks_util::apply_range_bounds(
1080                    self.opts.readopts(),
1081                    it_lower_bound,
1082                    it_upper_bound,
1083                );
1084                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1085                let db_iter = db
1086                    .underlying
1087                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1088                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1089                let iter = SafeIter::new(
1090                    self.cf.clone(),
1091                    db_iter,
1092                    _timer,
1093                    _perf_ctx,
1094                    bytes_scanned,
1095                    keys_scanned,
1096                    Some(self.db_metrics.clone()),
1097                );
1098                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1099            }
1100            Storage::InMemory(db) => {
1101                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1102            }
1103            #[cfg(tidehunter)]
1104            Storage::TideHunter(db) => match &self.column_family {
1105                ColumnFamily::TideHunter((ks, prefix)) => {
1106                    let mut iter = db.iterator(*ks);
1107                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1108                    iter.reverse();
1109                    Ok(Box::new(transform_th_iterator(
1110                        iter,
1111                        prefix,
1112                        self.start_iter_timer(),
1113                    )))
1114                }
1115                _ => unreachable!("storage backend invariant violation"),
1116            },
1117        }
1118    }
1119}
1120
1121pub enum StorageWriteBatch {
1122    Rocks(rocksdb::WriteBatch),
1123    InMemory(InMemoryBatch),
1124    #[cfg(tidehunter)]
1125    TideHunter(tidehunter::batch::WriteBatch),
1126}
1127
1128/// Provides a mutable struct to form a collection of database write operations, and execute them.
1129///
1130/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1131///  ie. they are all written or none is.
1132/// This is also true of operations across column families in the same database.
1133///
1134/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1135/// with each operation.
1136///
1137/// ```
1138/// use typed_store::rocks::*;
1139/// use tempfile::tempdir;
1140/// use typed_store::Map;
1141/// use typed_store::metrics::DBMetrics;
1142/// use prometheus::Registry;
1143/// use core::fmt::Error;
1144/// use std::sync::Arc;
1145///
1146/// #[tokio::main]
1147/// async fn main() -> Result<(), Error> {
1148/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1149///
1150/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1151///     .expect("Failed to open storage");
1152/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1153///
1154/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1155///     .expect("Failed to open storage");
1156/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1157///
1158/// let mut batch = db_cf_1.batch();
1159/// batch
1160///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1161///     .expect("Failed to batch insert")
1162///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1163///     .expect("Failed to batch insert");
1164///
1165/// let _ = batch.write().expect("Failed to execute batch");
1166/// for (k, v) in keys_vals_1 {
1167///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1168///     assert_eq!(Some(v), val);
1169/// }
1170///
1171/// for (k, v) in keys_vals_2 {
1172///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1173///     assert_eq!(Some(v), val);
1174/// }
1175/// Ok(())
1176/// }
1177/// ```
1178///
1179pub struct DBBatch {
1180    database: Arc<Database>,
1181    batch: StorageWriteBatch,
1182    db_metrics: Arc<DBMetrics>,
1183    write_sample_interval: SamplingInterval,
1184}
1185
1186impl DBBatch {
1187    /// Create a new batch associated with a DB reference.
1188    ///
1189    /// Use `open_cf` to get the DB reference or an existing open database.
1190    pub fn new(
1191        dbref: &Arc<Database>,
1192        batch: StorageWriteBatch,
1193        db_metrics: &Arc<DBMetrics>,
1194        write_sample_interval: &SamplingInterval,
1195    ) -> Self {
1196        DBBatch {
1197            database: dbref.clone(),
1198            batch,
1199            db_metrics: db_metrics.clone(),
1200            write_sample_interval: write_sample_interval.clone(),
1201        }
1202    }
1203
1204    /// Consume the batch and write its operations to the database
1205    #[instrument(level = "trace", skip_all, err)]
1206    pub fn write(self) -> Result<(), TypedStoreError> {
1207        self.write_opt(&rocksdb::WriteOptions::default())
1208    }
1209
1210    /// Consume the batch and write its operations to the database with custom write options
1211    #[instrument(level = "trace", skip_all, err)]
1212    pub fn write_opt(self, write_options: &rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1213        let db_name = self.database.db_name();
1214        let timer = self
1215            .db_metrics
1216            .op_metrics
1217            .rocksdb_batch_commit_latency_seconds
1218            .with_label_values(&[&db_name])
1219            .start_timer();
1220        let batch_size = self.size_in_bytes();
1221
1222        let perf_ctx = if self.write_sample_interval.sample() {
1223            Some(RocksDBPerfContext)
1224        } else {
1225            None
1226        };
1227        self.database.write_opt(self.batch, write_options)?;
1228        self.db_metrics
1229            .op_metrics
1230            .rocksdb_batch_commit_bytes
1231            .with_label_values(&[&db_name])
1232            .observe(batch_size as f64);
1233
1234        if perf_ctx.is_some() {
1235            self.db_metrics
1236                .write_perf_ctx_metrics
1237                .report_metrics(&db_name);
1238        }
1239        let elapsed = timer.stop_and_record();
1240        if elapsed > 1.0 {
1241            warn!(?elapsed, ?db_name, "very slow batch write");
1242            self.db_metrics
1243                .op_metrics
1244                .rocksdb_very_slow_batch_writes_count
1245                .with_label_values(&[&db_name])
1246                .inc();
1247            self.db_metrics
1248                .op_metrics
1249                .rocksdb_very_slow_batch_writes_duration_ms
1250                .with_label_values(&[&db_name])
1251                .inc_by((elapsed * 1000.0) as u64);
1252        }
1253        Ok(())
1254    }
1255
1256    pub fn size_in_bytes(&self) -> usize {
1257        match self.batch {
1258            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1259            StorageWriteBatch::InMemory(_) => 0,
1260            // TODO: implement size_in_bytes method
1261            #[cfg(tidehunter)]
1262            StorageWriteBatch::TideHunter(_) => 0,
1263        }
1264    }
1265
1266    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1267        &mut self,
1268        db: &DBMap<K, V>,
1269        purged_vals: impl IntoIterator<Item = J>,
1270    ) -> Result<(), TypedStoreError> {
1271        if !Arc::ptr_eq(&db.db, &self.database) {
1272            return Err(TypedStoreError::CrossDBBatch);
1273        }
1274
1275        purged_vals
1276            .into_iter()
1277            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1278                let k_buf = be_fix_int_ser(k.borrow());
1279                match (&mut self.batch, &db.column_family) {
1280                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1281                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1282                    }
1283                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1284                        b.delete_cf(name, k_buf)
1285                    }
1286                    #[cfg(tidehunter)]
1287                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1288                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1289                    }
1290                    _ => Err(TypedStoreError::RocksDBError(
1291                        "typed store invariant violation".to_string(),
1292                    ))?,
1293                }
1294                Ok(())
1295            })?;
1296        Ok(())
1297    }
1298
1299    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1300    /// by writing a range delete tombstone in the db map
1301    /// If the DBMap is configured with ignore_range_deletions set to false,
1302    /// the effect of this write will be visible immediately i.e. you won't
1303    /// see old values when you do a lookup or scan. But if it is configured
1304    /// with ignore_range_deletions set to true, the old value are visible until
1305    /// compaction actually deletes them which will happen sometime after. By
1306    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1307    /// overridden in the config), so please use this function with caution
1308    pub fn schedule_delete_range<K: Serialize, V>(
1309        &mut self,
1310        db: &DBMap<K, V>,
1311        from: &K,
1312        to: &K,
1313    ) -> Result<(), TypedStoreError> {
1314        if !Arc::ptr_eq(&db.db, &self.database) {
1315            return Err(TypedStoreError::CrossDBBatch);
1316        }
1317
1318        let from_buf = be_fix_int_ser(from);
1319        let to_buf = be_fix_int_ser(to);
1320
1321        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1322            b.delete_range_cf(
1323                &rocks_cf_from_db(&self.database, db.cf_name())?,
1324                from_buf,
1325                to_buf,
1326            );
1327        }
1328        Ok(())
1329    }
1330
1331    /// inserts a range of (key, value) pairs given as an iterator
1332    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1333        &mut self,
1334        db: &DBMap<K, V>,
1335        new_vals: impl IntoIterator<Item = (J, U)>,
1336    ) -> Result<&mut Self, TypedStoreError> {
1337        if !Arc::ptr_eq(&db.db, &self.database) {
1338            return Err(TypedStoreError::CrossDBBatch);
1339        }
1340        let mut total = 0usize;
1341        new_vals
1342            .into_iter()
1343            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1344                let k_buf = be_fix_int_ser(k.borrow());
1345                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1346                total += k_buf.len() + v_buf.len();
1347                if db.opts.log_value_hash {
1348                    let key_hash = default_hash(&k_buf);
1349                    let value_hash = default_hash(&v_buf);
1350                    debug!(
1351                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1352                        db.cf_name(),
1353                        key_hash,
1354                        value_hash
1355                    );
1356                }
1357                match (&mut self.batch, &db.column_family) {
1358                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1359                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1360                    }
1361                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1362                        b.put_cf(name, k_buf, v_buf)
1363                    }
1364                    #[cfg(tidehunter)]
1365                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1366                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1367                    }
1368                    _ => Err(TypedStoreError::RocksDBError(
1369                        "typed store invariant violation".to_string(),
1370                    ))?,
1371                }
1372                Ok(())
1373            })?;
1374        self.db_metrics
1375            .op_metrics
1376            .rocksdb_batch_put_bytes
1377            .with_label_values(&[&db.cf])
1378            .observe(total as f64);
1379        Ok(self)
1380    }
1381
1382    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1383        &mut self,
1384        db: &DBMap<K, V>,
1385        new_vals: impl IntoIterator<Item = (J, U)>,
1386    ) -> Result<&mut Self, TypedStoreError> {
1387        if !Arc::ptr_eq(&db.db, &self.database) {
1388            return Err(TypedStoreError::CrossDBBatch);
1389        }
1390        new_vals
1391            .into_iter()
1392            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1393                let k_buf = be_fix_int_ser(k.borrow());
1394                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1395                match &mut self.batch {
1396                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1397                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1398                        k_buf,
1399                        v_buf,
1400                    ),
1401                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1402                }
1403                Ok(())
1404            })?;
1405        Ok(self)
1406    }
1407}
1408
1409impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1410where
1411    K: Serialize + DeserializeOwned,
1412    V: Serialize + DeserializeOwned,
1413{
1414    type Error = TypedStoreError;
1415
1416    #[instrument(level = "trace", skip_all, err)]
1417    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1418        let key_buf = be_fix_int_ser(key);
1419        let readopts = self.opts.readopts();
1420        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1421            && self
1422                .db
1423                .get(&self.column_family, &key_buf, &readopts)?
1424                .is_some())
1425    }
1426
1427    #[instrument(level = "trace", skip_all, err)]
1428    fn multi_contains_keys<J>(
1429        &self,
1430        keys: impl IntoIterator<Item = J>,
1431    ) -> Result<Vec<bool>, Self::Error>
1432    where
1433        J: Borrow<K>,
1434    {
1435        let values = self.multi_get_pinned(keys)?;
1436        Ok(values.into_iter().map(|v| v.is_some()).collect())
1437    }
1438
1439    #[instrument(level = "trace", skip_all, err)]
1440    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1441        let _timer = self
1442            .db_metrics
1443            .op_metrics
1444            .rocksdb_get_latency_seconds
1445            .with_label_values(&[&self.cf])
1446            .start_timer();
1447        let perf_ctx = if self.get_sample_interval.sample() {
1448            Some(RocksDBPerfContext)
1449        } else {
1450            None
1451        };
1452        let key_buf = be_fix_int_ser(key);
1453        let res = self
1454            .db
1455            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1456        self.db_metrics
1457            .op_metrics
1458            .rocksdb_get_bytes
1459            .with_label_values(&[&self.cf])
1460            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1461        if perf_ctx.is_some() {
1462            self.db_metrics
1463                .read_perf_ctx_metrics
1464                .report_metrics(&self.cf);
1465        }
1466        match res {
1467            Some(data) => {
1468                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1469                if value.is_err() {
1470                    let key_hash = default_hash(&key_buf);
1471                    let value_hash = default_hash(&data);
1472                    debug_fatal!(
1473                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1474                        self.cf_name(),
1475                        key_hash,
1476                        value_hash,
1477                        value.as_ref().err().unwrap()
1478                    );
1479                }
1480                Ok(Some(value?))
1481            }
1482            None => Ok(None),
1483        }
1484    }
1485
1486    #[instrument(level = "trace", skip_all, err)]
1487    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1488        let timer = self
1489            .db_metrics
1490            .op_metrics
1491            .rocksdb_put_latency_seconds
1492            .with_label_values(&[&self.cf])
1493            .start_timer();
1494        let perf_ctx = if self.write_sample_interval.sample() {
1495            Some(RocksDBPerfContext)
1496        } else {
1497            None
1498        };
1499        let key_buf = be_fix_int_ser(key);
1500        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1501        self.db_metrics
1502            .op_metrics
1503            .rocksdb_put_bytes
1504            .with_label_values(&[&self.cf])
1505            .observe((key_buf.len() + value_buf.len()) as f64);
1506        if perf_ctx.is_some() {
1507            self.db_metrics
1508                .write_perf_ctx_metrics
1509                .report_metrics(&self.cf);
1510        }
1511        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1512
1513        let elapsed = timer.stop_and_record();
1514        if elapsed > 1.0 {
1515            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1516            self.db_metrics
1517                .op_metrics
1518                .rocksdb_very_slow_puts_count
1519                .with_label_values(&[&self.cf])
1520                .inc();
1521            self.db_metrics
1522                .op_metrics
1523                .rocksdb_very_slow_puts_duration_ms
1524                .with_label_values(&[&self.cf])
1525                .inc_by((elapsed * 1000.0) as u64);
1526        }
1527
1528        Ok(())
1529    }
1530
1531    #[instrument(level = "trace", skip_all, err)]
1532    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1533        let _timer = self
1534            .db_metrics
1535            .op_metrics
1536            .rocksdb_delete_latency_seconds
1537            .with_label_values(&[&self.cf])
1538            .start_timer();
1539        let perf_ctx = if self.write_sample_interval.sample() {
1540            Some(RocksDBPerfContext)
1541        } else {
1542            None
1543        };
1544        let key_buf = be_fix_int_ser(key);
1545        self.db.delete_cf(&self.column_family, key_buf)?;
1546        self.db_metrics
1547            .op_metrics
1548            .rocksdb_deletes
1549            .with_label_values(&[&self.cf])
1550            .inc();
1551        if perf_ctx.is_some() {
1552            self.db_metrics
1553                .write_perf_ctx_metrics
1554                .report_metrics(&self.cf);
1555        }
1556        Ok(())
1557    }
1558
1559    /// Writes a range delete tombstone to delete all entries in the db map
1560    /// If the DBMap is configured with ignore_range_deletions set to false,
1561    /// the effect of this write will be visible immediately i.e. you won't
1562    /// see old values when you do a lookup or scan. But if it is configured
1563    /// with ignore_range_deletions set to true, the old value are visible until
1564    /// compaction actually deletes them which will happen sometime after. By
1565    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1566    /// overridden in the config), so please use this function with caution
1567    #[instrument(level = "trace", skip_all, err)]
1568    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1569        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1570        let last_key = self
1571            .reversed_safe_iter_with_bounds(None, None)?
1572            .next()
1573            .transpose()?
1574            .map(|(k, _v)| k);
1575        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1576            let mut batch = self.batch();
1577            batch.schedule_delete_range(self, &first_key, &last_key)?;
1578            batch.write()?;
1579        }
1580        Ok(())
1581    }
1582
1583    fn is_empty(&self) -> bool {
1584        self.safe_iter().next().is_none()
1585    }
1586
1587    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1588        match &self.db.storage {
1589            Storage::Rocks(db) => {
1590                let db_iter = db
1591                    .underlying
1592                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1593                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1594                Box::new(SafeIter::new(
1595                    self.cf.clone(),
1596                    db_iter,
1597                    _timer,
1598                    _perf_ctx,
1599                    bytes_scanned,
1600                    keys_scanned,
1601                    Some(self.db_metrics.clone()),
1602                ))
1603            }
1604            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1605            #[cfg(tidehunter)]
1606            Storage::TideHunter(db) => match &self.column_family {
1607                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1608                    db.iterator(*ks),
1609                    prefix,
1610                    self.start_iter_timer(),
1611                )),
1612                _ => unreachable!("storage backend invariant violation"),
1613            },
1614        }
1615    }
1616
1617    fn safe_iter_with_bounds(
1618        &'a self,
1619        lower_bound: Option<K>,
1620        upper_bound: Option<K>,
1621    ) -> DbIterator<'a, (K, V)> {
1622        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1623        match &self.db.storage {
1624            Storage::Rocks(db) => {
1625                let readopts =
1626                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1627                let db_iter = db
1628                    .underlying
1629                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1630                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1631                Box::new(SafeIter::new(
1632                    self.cf.clone(),
1633                    db_iter,
1634                    _timer,
1635                    _perf_ctx,
1636                    bytes_scanned,
1637                    keys_scanned,
1638                    Some(self.db_metrics.clone()),
1639                ))
1640            }
1641            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1642            #[cfg(tidehunter)]
1643            Storage::TideHunter(db) => match &self.column_family {
1644                ColumnFamily::TideHunter((ks, prefix)) => {
1645                    let mut iter = db.iterator(*ks);
1646                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1647                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1648                }
1649                _ => unreachable!("storage backend invariant violation"),
1650            },
1651        }
1652    }
1653
1654    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1655        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1656        match &self.db.storage {
1657            Storage::Rocks(db) => {
1658                let readopts =
1659                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1660                let db_iter = db
1661                    .underlying
1662                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1663                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1664                Box::new(SafeIter::new(
1665                    self.cf.clone(),
1666                    db_iter,
1667                    _timer,
1668                    _perf_ctx,
1669                    bytes_scanned,
1670                    keys_scanned,
1671                    Some(self.db_metrics.clone()),
1672                ))
1673            }
1674            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1675            #[cfg(tidehunter)]
1676            Storage::TideHunter(db) => match &self.column_family {
1677                ColumnFamily::TideHunter((ks, prefix)) => {
1678                    let mut iter = db.iterator(*ks);
1679                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1680                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1681                }
1682                _ => unreachable!("storage backend invariant violation"),
1683            },
1684        }
1685    }
1686
1687    /// Returns a vector of values corresponding to the keys provided.
1688    #[instrument(level = "trace", skip_all, err)]
1689    fn multi_get<J>(
1690        &self,
1691        keys: impl IntoIterator<Item = J>,
1692    ) -> Result<Vec<Option<V>>, TypedStoreError>
1693    where
1694        J: Borrow<K>,
1695    {
1696        let results = self.multi_get_pinned(keys)?;
1697        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1698            .into_iter()
1699            .map(|value_byte| match value_byte {
1700                Some(data) => Ok(Some(
1701                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1702                )),
1703                None => Ok(None),
1704            })
1705            .collect();
1706
1707        values_parsed
1708    }
1709
1710    /// Convenience method for batch insertion
1711    #[instrument(level = "trace", skip_all, err)]
1712    fn multi_insert<J, U>(
1713        &self,
1714        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1715    ) -> Result<(), Self::Error>
1716    where
1717        J: Borrow<K>,
1718        U: Borrow<V>,
1719    {
1720        let mut batch = self.batch();
1721        batch.insert_batch(self, key_val_pairs)?;
1722        batch.write()
1723    }
1724
1725    /// Convenience method for batch removal
1726    #[instrument(level = "trace", skip_all, err)]
1727    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1728    where
1729        J: Borrow<K>,
1730    {
1731        let mut batch = self.batch();
1732        batch.delete_batch(self, keys)?;
1733        batch.write()
1734    }
1735
1736    /// Try to catch up with primary when running as secondary
1737    #[instrument(level = "trace", skip_all, err)]
1738    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1739        if let Storage::Rocks(rocks) = &self.db.storage {
1740            rocks
1741                .underlying
1742                .try_catch_up_with_primary()
1743                .map_err(typed_store_err_from_rocks_err)?;
1744        }
1745        Ok(())
1746    }
1747}
1748
1749/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1750#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1751pub fn open_cf_opts<P: AsRef<Path>>(
1752    path: P,
1753    db_options: Option<rocksdb::Options>,
1754    metric_conf: MetricConf,
1755    opt_cfs: &[(&str, rocksdb::Options)],
1756) -> Result<Arc<Database>, TypedStoreError> {
1757    let path = path.as_ref();
1758    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1759    // because rocksdb uses the simulated clock when creating its background threads, but then
1760    // those threads see the real wall clock (because they are not the test thread), which causes
1761    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1762    // resolves the issue.
1763    //
1764    // This is a no-op in non-simulator builds.
1765
1766    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1767    nondeterministic!({
1768        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1769        options.create_if_missing(true);
1770        options.create_missing_column_families(true);
1771        let rocksdb = {
1772            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1773                &options,
1774                path,
1775                cfs.into_iter()
1776                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1777            )
1778            .map_err(typed_store_err_from_rocks_err)?
1779        };
1780        Ok(Arc::new(Database::new(
1781            Storage::Rocks(RocksDB {
1782                underlying: rocksdb,
1783            }),
1784            metric_conf,
1785        )))
1786    })
1787}
1788
1789/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1790pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1791    primary_path: P,
1792    secondary_path: Option<P>,
1793    db_options: Option<rocksdb::Options>,
1794    metric_conf: MetricConf,
1795    opt_cfs: &[(&str, rocksdb::Options)],
1796) -> Result<Arc<Database>, TypedStoreError> {
1797    let primary_path = primary_path.as_ref();
1798    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1799    // See comment above for explanation of why nondeterministic is necessary here.
1800    nondeterministic!({
1801        // Customize database options
1802        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1803
1804        fdlimit::raise_fd_limit();
1805        // This is a requirement by RocksDB when opening as secondary
1806        options.set_max_open_files(-1);
1807
1808        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1809        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1810            .ok()
1811            .unwrap_or_default();
1812
1813        let default_db_options = default_db_options();
1814        // Add CFs not explicitly listed
1815        for cf_key in cfs.iter() {
1816            if !opt_cfs.contains_key(&cf_key[..]) {
1817                opt_cfs.insert(cf_key, default_db_options.options.clone());
1818            }
1819        }
1820
1821        let primary_path = primary_path.to_path_buf();
1822        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1823            let mut s = primary_path.clone();
1824            s.pop();
1825            s.push("SECONDARY");
1826            s.as_path().to_path_buf()
1827        });
1828
1829        let rocksdb = {
1830            options.create_if_missing(true);
1831            options.create_missing_column_families(true);
1832            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1833                &options,
1834                &primary_path,
1835                &secondary_path,
1836                opt_cfs
1837                    .iter()
1838                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1839            )
1840            .map_err(typed_store_err_from_rocks_err)?;
1841            db.try_catch_up_with_primary()
1842                .map_err(typed_store_err_from_rocks_err)?;
1843            db
1844        };
1845        Ok(Arc::new(Database::new(
1846            Storage::Rocks(RocksDB {
1847                underlying: rocksdb,
1848            }),
1849            metric_conf,
1850        )))
1851    })
1852}
1853
1854// Drops a database if there is no other handle to it, with retries and timeout.
1855#[cfg(not(tidehunter))]
1856pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1857    let mut backoff = backoff::ExponentialBackoff {
1858        max_elapsed_time: Some(timeout),
1859        ..Default::default()
1860    };
1861    loop {
1862        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1863            Ok(()) => return Ok(()),
1864            Err(err) => match backoff.next_backoff() {
1865                Some(duration) => tokio::time::sleep(duration).await,
1866                None => return Err(err),
1867            },
1868        }
1869    }
1870}
1871
1872#[cfg(tidehunter)]
1873pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1874    std::fs::remove_dir_all(path)
1875}
1876
1877fn populate_missing_cfs(
1878    input_cfs: &[(&str, rocksdb::Options)],
1879    path: &Path,
1880) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1881    let mut cfs = vec![];
1882    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1883    let existing_cfs =
1884        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1885            .ok()
1886            .unwrap_or_default();
1887
1888    for cf_name in existing_cfs {
1889        if !input_cf_index.contains(&cf_name[..]) {
1890            cfs.push((cf_name, rocksdb::Options::default()));
1891        }
1892    }
1893    cfs.extend(
1894        input_cfs
1895            .iter()
1896            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1897    );
1898    Ok(cfs)
1899}
1900
1901fn default_hash(value: &[u8]) -> Digest<32> {
1902    let mut hasher = fastcrypto::hash::Blake2b256::default();
1903    hasher.update(value);
1904    hasher.finalize()
1905}