typed_store/rocks/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{be_fix_int_ser, iterator_bounds, iterator_bounds_with_range};
20use crate::{DbIterator, TypedStoreError};
21use crate::{
22    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
23    traits::{Map, TableSummary},
24};
25use backoff::backoff::Backoff;
26use fastcrypto::hash::{Digest, HashFunction};
27use mysten_common::debug_fatal;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::properties::num_files_at_level;
30use rocksdb::{
31    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
32    properties,
33};
34use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
35use serde::{Serialize, de::DeserializeOwned};
36use std::ops::{Bound, Deref};
37use std::{
38    borrow::Borrow,
39    marker::PhantomData,
40    ops::RangeBounds,
41    path::{Path, PathBuf},
42    sync::Arc,
43    time::Duration,
44};
45use std::{collections::HashSet, ffi::CStr};
46use sui_macros::{fail_point, nondeterministic};
47#[cfg(tidehunter)]
48use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
49use tokio::sync::oneshot;
50use tracing::{debug, error, instrument, warn};
51
52// TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in.
53// From https://github.com/facebook/rocksdb/blob/bd80433c73691031ba7baa65c16c63a83aef201a/include/rocksdb/db.h#L1169
54const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
55    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
56
57#[cfg(test)]
58mod tests;
59
60#[derive(Debug)]
61pub struct RocksDB {
62    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
63}
64
65impl Drop for RocksDB {
66    fn drop(&mut self) {
67        self.underlying.cancel_all_background_work(/* wait */ true);
68    }
69}
70
71#[derive(Clone)]
72pub enum ColumnFamily {
73    Rocks(String),
74    InMemory(String),
75    #[cfg(tidehunter)]
76    TideHunter((KeySpace, Option<Vec<u8>>)),
77}
78
79impl std::fmt::Debug for ColumnFamily {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
83            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
84            #[cfg(tidehunter)]
85            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
86        }
87    }
88}
89
90impl ColumnFamily {
91    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
92        match &self {
93            ColumnFamily::Rocks(name) => rocks_db
94                .underlying
95                .cf_handle(name)
96                .expect("Map-keying column family should have been checked at DB creation"),
97            _ => unreachable!("invariant is checked by the caller"),
98        }
99    }
100}
101
102pub enum Storage {
103    Rocks(RocksDB),
104    InMemory(InMemoryDB),
105    #[cfg(tidehunter)]
106    TideHunter(Arc<TideHunterDb>),
107}
108
109impl std::fmt::Debug for Storage {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        match self {
112            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
113            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
114            #[cfg(tidehunter)]
115            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
116        }
117    }
118}
119
120#[derive(Debug)]
121pub struct Database {
122    storage: Storage,
123    metric_conf: MetricConf,
124}
125
126impl Drop for Database {
127    fn drop(&mut self) {
128        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
129    }
130}
131
132enum GetResult<'a> {
133    Rocks(DBPinnableSlice<'a>),
134    InMemory(Vec<u8>),
135    #[cfg(tidehunter)]
136    TideHunter(tidehunter::minibytes::Bytes),
137}
138
139impl Deref for GetResult<'_> {
140    type Target = [u8];
141    fn deref(&self) -> &[u8] {
142        match self {
143            GetResult::Rocks(d) => d.deref(),
144            GetResult::InMemory(d) => d.deref(),
145            #[cfg(tidehunter)]
146            GetResult::TideHunter(d) => d.deref(),
147        }
148    }
149}
150
151impl Database {
152    pub fn new(storage: Storage, metric_conf: MetricConf) -> Self {
153        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154        Self {
155            storage,
156            metric_conf,
157        }
158    }
159
160    /// Flush all memtables to SST files on disk.
161    pub fn flush(&self) -> Result<(), TypedStoreError> {
162        match &self.storage {
163            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
164                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
165            }),
166            Storage::InMemory(_) => {
167                // InMemory databases don't need flushing
168                Ok(())
169            }
170            #[cfg(tidehunter)]
171            Storage::TideHunter(_) => {
172                // TideHunter doesn't support an explicit flush.
173                Ok(())
174            }
175        }
176    }
177
178    fn get<K: AsRef<[u8]>>(
179        &self,
180        cf: &ColumnFamily,
181        key: K,
182        readopts: &ReadOptions,
183    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
184        match (&self.storage, cf) {
185            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
186                .underlying
187                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
188                .map_err(typed_store_err_from_rocks_err)?
189                .map(GetResult::Rocks)),
190            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
191                Ok(db.get(cf_name, key).map(GetResult::InMemory))
192            }
193            #[cfg(tidehunter)]
194            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
195                .get(*ks, &transform_th_key(key.as_ref(), prefix))
196                .map_err(typed_store_error_from_th_error)?
197                .map(GetResult::TideHunter)),
198
199            _ => Err(TypedStoreError::RocksDBError(
200                "typed store invariant violation".to_string(),
201            )),
202        }
203    }
204
205    fn multi_get<I, K>(
206        &self,
207        cf: &ColumnFamily,
208        keys: I,
209        readopts: &ReadOptions,
210    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
211    where
212        I: IntoIterator<Item = K>,
213        K: AsRef<[u8]>,
214    {
215        match (&self.storage, cf) {
216            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
217                let keys_vec: Vec<K> = keys.into_iter().collect();
218                let res = db.underlying.batched_multi_get_cf_opt(
219                    &cf.rocks_cf(db),
220                    keys_vec.iter(),
221                    /* sorted_input */ false,
222                    readopts,
223                );
224                res.into_iter()
225                    .map(|r| {
226                        r.map_err(typed_store_err_from_rocks_err)
227                            .map(|item| item.map(GetResult::Rocks))
228                    })
229                    .collect()
230            }
231            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
232                .multi_get(cf_name, keys)
233                .into_iter()
234                .map(|r| Ok(r.map(GetResult::InMemory)))
235                .collect(),
236            #[cfg(tidehunter)]
237            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
238                let res = keys.into_iter().map(|k| {
239                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
240                        .map_err(typed_store_error_from_th_error)
241                });
242                res.into_iter()
243                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
244                    .collect()
245            }
246            _ => unreachable!("typed store invariant violation"),
247        }
248    }
249
250    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
251        match &self.storage {
252            Storage::Rocks(db) => db.underlying.drop_cf(name),
253            Storage::InMemory(db) => {
254                db.drop_cf(name);
255                Ok(())
256            }
257            #[cfg(tidehunter)]
258            Storage::TideHunter(_) => {
259                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
260            }
261        }
262    }
263
264    pub fn delete_file_in_range<K: AsRef<[u8]>>(
265        &self,
266        cf: &impl AsColumnFamilyRef,
267        from: K,
268        to: K,
269    ) -> Result<(), rocksdb::Error> {
270        match &self.storage {
271            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
272            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
273        }
274    }
275
276    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
277        fail_point!("delete-cf-before");
278        let ret = match (&self.storage, cf) {
279            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
280                .underlying
281                .delete_cf(&cf.rocks_cf(db), key)
282                .map_err(typed_store_err_from_rocks_err),
283            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
284                db.delete(cf_name, key.as_ref());
285                Ok(())
286            }
287            #[cfg(tidehunter)]
288            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
289                .remove(*ks, transform_th_key(key.as_ref(), prefix))
290                .map_err(typed_store_error_from_th_error),
291            _ => Err(TypedStoreError::RocksDBError(
292                "typed store invariant violation".to_string(),
293            )),
294        };
295        fail_point!("delete-cf-after");
296        #[allow(clippy::let_and_return)]
297        ret
298    }
299
300    pub fn path_for_pruning(&self) -> &Path {
301        match &self.storage {
302            Storage::Rocks(rocks) => rocks.underlying.path(),
303            _ => unimplemented!("method is only supported for rocksdb backend"),
304        }
305    }
306
307    fn put_cf(
308        &self,
309        cf: &ColumnFamily,
310        key: Vec<u8>,
311        value: Vec<u8>,
312    ) -> Result<(), TypedStoreError> {
313        fail_point!("put-cf-before");
314        let ret = match (&self.storage, cf) {
315            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
316                .underlying
317                .put_cf(&cf.rocks_cf(db), key, value)
318                .map_err(typed_store_err_from_rocks_err),
319            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
320                db.put(cf_name, key, value);
321                Ok(())
322            }
323            #[cfg(tidehunter)]
324            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
325                .insert(*ks, transform_th_key(&key, prefix), value)
326                .map_err(typed_store_error_from_th_error),
327            _ => Err(TypedStoreError::RocksDBError(
328                "typed store invariant violation".to_string(),
329            )),
330        };
331        fail_point!("put-cf-after");
332        #[allow(clippy::let_and_return)]
333        ret
334    }
335
336    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
337        &self,
338        cf_name: &str,
339        key: K,
340        readopts: &ReadOptions,
341    ) -> bool {
342        match &self.storage {
343            // [`rocksdb::DBWithThreadMode::key_may_exist_cf`] can have false positives,
344            // but no false negatives. We use it to short-circuit the absent case
345            Storage::Rocks(rocks) => {
346                rocks
347                    .underlying
348                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
349            }
350            _ => true,
351        }
352    }
353
354    pub(crate) fn write_opt_internal(
355        &self,
356        batch: StorageWriteBatch,
357        write_options: &rocksdb::WriteOptions,
358    ) -> Result<(), TypedStoreError> {
359        fail_point!("batch-write-before");
360        let ret = match (&self.storage, batch) {
361            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
362                .underlying
363                .write_opt(batch, write_options)
364                .map_err(typed_store_err_from_rocks_err),
365            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
366                // InMemory doesn't support write options
367                db.write(batch);
368                Ok(())
369            }
370            #[cfg(tidehunter)]
371            (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
372                // TideHunter doesn't support write options
373                db.write_batch(batch)
374                    .map_err(typed_store_error_from_th_error)
375            }
376            _ => Err(TypedStoreError::RocksDBError(
377                "using invalid batch type for the database".to_string(),
378            )),
379        };
380        fail_point!("batch-write-after");
381        #[allow(clippy::let_and_return)]
382        ret
383    }
384
385    #[cfg(tidehunter)]
386    pub fn start_relocation(&self) -> anyhow::Result<()> {
387        if let Storage::TideHunter(db) = &self.storage {
388            db.start_relocation()?;
389        }
390        Ok(())
391    }
392
393    #[cfg(tidehunter)]
394    pub fn drop_cells_in_range(
395        &self,
396        ks: KeySpace,
397        from_inclusive: &[u8],
398        to_inclusive: &[u8],
399    ) -> anyhow::Result<()> {
400        if let Storage::TideHunter(db) = &self.storage {
401            db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
402                .map_err(|e| anyhow::anyhow!("{:?}", e))?;
403        } else {
404            panic!("drop_cells_in_range called on non-TideHunter storage");
405        }
406        Ok(())
407    }
408
409    pub fn compact_range_cf<K: AsRef<[u8]>>(
410        &self,
411        cf_name: &str,
412        start: Option<K>,
413        end: Option<K>,
414    ) {
415        if let Storage::Rocks(rocksdb) = &self.storage {
416            rocksdb
417                .underlying
418                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
419        }
420    }
421
422    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
423        // TODO: implement for other storage types
424        if let Storage::Rocks(rocks) = &self.storage {
425            let checkpoint =
426                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
427            checkpoint
428                .create_checkpoint(path)
429                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
430        }
431        Ok(())
432    }
433
434    pub fn get_sampling_interval(&self) -> SamplingInterval {
435        self.metric_conf.read_sample_interval.new_from_self()
436    }
437
438    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
439        self.metric_conf.read_sample_interval.new_from_self()
440    }
441
442    pub fn write_sampling_interval(&self) -> SamplingInterval {
443        self.metric_conf.write_sample_interval.new_from_self()
444    }
445
446    pub fn iter_sampling_interval(&self) -> SamplingInterval {
447        self.metric_conf.iter_sample_interval.new_from_self()
448    }
449
450    fn db_name(&self) -> String {
451        let name = &self.metric_conf.db_name;
452        if name.is_empty() {
453            "default".to_string()
454        } else {
455            name.clone()
456        }
457    }
458
459    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
460        match &self.storage {
461            Storage::Rocks(rocks) => rocks.underlying.live_files(),
462            _ => Ok(vec![]),
463        }
464    }
465}
466
467fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
468    rocks_db
469        .underlying
470        .cf_handle(cf_name)
471        .expect("Map-keying column family should have been checked at DB creation")
472}
473
474fn rocks_cf_from_db<'a>(
475    db: &'a Database,
476    cf_name: &str,
477) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
478    match &db.storage {
479        Storage::Rocks(rocksdb) => Ok(rocksdb
480            .underlying
481            .cf_handle(cf_name)
482            .expect("Map-keying column family should have been checked at DB creation")),
483        _ => Err(TypedStoreError::RocksDBError(
484            "using invalid batch type for the database".to_string(),
485        )),
486    }
487}
488
489#[derive(Debug, Default)]
490pub struct MetricConf {
491    pub db_name: String,
492    pub read_sample_interval: SamplingInterval,
493    pub write_sample_interval: SamplingInterval,
494    pub iter_sample_interval: SamplingInterval,
495}
496
497impl MetricConf {
498    pub fn new(db_name: &str) -> Self {
499        if db_name.is_empty() {
500            error!("A meaningful db name should be used for metrics reporting.")
501        }
502        Self {
503            db_name: db_name.to_string(),
504            read_sample_interval: SamplingInterval::default(),
505            write_sample_interval: SamplingInterval::default(),
506            iter_sample_interval: SamplingInterval::default(),
507        }
508    }
509
510    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
511        Self {
512            db_name: self.db_name,
513            read_sample_interval: read_interval,
514            write_sample_interval: SamplingInterval::default(),
515            iter_sample_interval: SamplingInterval::default(),
516        }
517    }
518}
519const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
520const METRICS_ERROR: i64 = -1;
521
522/// An interface to a rocksDB database, keyed by a columnfamily
523#[derive(Clone, Debug)]
524pub struct DBMap<K, V> {
525    pub db: Arc<Database>,
526    _phantom: PhantomData<fn(K) -> V>,
527    column_family: ColumnFamily,
528    // the column family under which the map is stored
529    cf: String,
530    pub opts: ReadWriteOptions,
531    db_metrics: Arc<DBMetrics>,
532    get_sample_interval: SamplingInterval,
533    multiget_sample_interval: SamplingInterval,
534    write_sample_interval: SamplingInterval,
535    iter_sample_interval: SamplingInterval,
536    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
537}
538
539unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
540
541impl<K, V> DBMap<K, V> {
542    pub(crate) fn new(
543        db: Arc<Database>,
544        opts: &ReadWriteOptions,
545        opt_cf: &str,
546        column_family: ColumnFamily,
547        is_deprecated: bool,
548    ) -> Self {
549        let db_cloned = Arc::downgrade(&db.clone());
550        let db_metrics = DBMetrics::get();
551        let db_metrics_cloned = db_metrics.clone();
552        let cf = opt_cf.to_string();
553
554        let (sender, mut recv) = tokio::sync::oneshot::channel();
555        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
556            tokio::task::spawn(async move {
557                let mut interval =
558                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
559                loop {
560                    tokio::select! {
561                        _ = interval.tick() => {
562                            if let Some(db) = db_cloned.upgrade() {
563                                let cf = cf.clone();
564                                let db_metrics = db_metrics.clone();
565                                if let Err(e) = tokio::task::spawn_blocking(move || {
566                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
567                                }).await {
568                                    error!("Failed to log metrics with error: {}", e);
569                                }
570                            }
571                        }
572                        _ = &mut recv => break,
573                    }
574                }
575                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
576            });
577        }
578        DBMap {
579            db: db.clone(),
580            opts: opts.clone(),
581            _phantom: PhantomData,
582            column_family,
583            cf: opt_cf.to_string(),
584            db_metrics: db_metrics_cloned,
585            _metrics_task_cancel_handle: Arc::new(sender),
586            get_sample_interval: db.get_sampling_interval(),
587            multiget_sample_interval: db.multiget_sampling_interval(),
588            write_sample_interval: db.write_sampling_interval(),
589            iter_sample_interval: db.iter_sampling_interval(),
590        }
591    }
592
593    /// Reopens an open database as a typed map operating under a specific column family.
594    /// if no column family is passed, the default column family is used.
595    #[instrument(level = "debug", skip(db), err)]
596    pub fn reopen(
597        db: &Arc<Database>,
598        opt_cf: Option<&str>,
599        rw_options: &ReadWriteOptions,
600        is_deprecated: bool,
601    ) -> Result<Self, TypedStoreError> {
602        let cf_key = opt_cf
603            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
604            .to_owned();
605        Ok(DBMap::new(
606            db.clone(),
607            rw_options,
608            &cf_key,
609            ColumnFamily::Rocks(cf_key.to_string()),
610            is_deprecated,
611        ))
612    }
613
614    #[cfg(tidehunter)]
615    pub fn reopen_th(
616        db: Arc<Database>,
617        cf_name: &str,
618        ks: KeySpace,
619        prefix: Option<Vec<u8>>,
620    ) -> Self {
621        DBMap::new(
622            db,
623            &ReadWriteOptions::default(),
624            cf_name,
625            ColumnFamily::TideHunter((ks, prefix.clone())),
626            false,
627        )
628    }
629
630    pub fn cf_name(&self) -> &str {
631        &self.cf
632    }
633
634    pub fn batch(&self) -> DBBatch {
635        let batch = match &self.db.storage {
636            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
637            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
638            #[cfg(tidehunter)]
639            Storage::TideHunter(_) => {
640                StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
641            }
642        };
643        DBBatch::new(
644            &self.db,
645            batch,
646            &self.opts,
647            &self.db_metrics,
648            &self.write_sample_interval,
649        )
650    }
651
652    pub fn flush(&self) -> Result<(), TypedStoreError> {
653        self.db.flush()
654    }
655
656    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
657        let from_buf = be_fix_int_ser(start);
658        let to_buf = be_fix_int_ser(end);
659        self.db
660            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
661        Ok(())
662    }
663
664    pub fn compact_range_raw(
665        &self,
666        cf_name: &str,
667        start: Vec<u8>,
668        end: Vec<u8>,
669    ) -> Result<(), TypedStoreError> {
670        self.db.compact_range_cf(cf_name, Some(start), Some(end));
671        Ok(())
672    }
673
674    #[cfg(tidehunter)]
675    pub fn drop_cells_in_range<J: Serialize>(
676        &self,
677        from_inclusive: &J,
678        to_inclusive: &J,
679    ) -> Result<(), TypedStoreError>
680    where
681        K: Serialize,
682    {
683        let from_buf = be_fix_int_ser(from_inclusive);
684        let to_buf = be_fix_int_ser(to_inclusive);
685        if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
686            self.db
687                .drop_cells_in_range(*ks, &from_buf, &to_buf)
688                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
689        }
690        Ok(())
691    }
692
693    /// Returns a vector of raw values corresponding to the keys provided.
694    fn multi_get_pinned<J>(
695        &self,
696        keys: impl IntoIterator<Item = J>,
697    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
698    where
699        J: Borrow<K>,
700        K: Serialize,
701    {
702        let _timer = self
703            .db_metrics
704            .op_metrics
705            .rocksdb_multiget_latency_seconds
706            .with_label_values(&[&self.cf])
707            .start_timer();
708        let perf_ctx = if self.multiget_sample_interval.sample() {
709            Some(RocksDBPerfContext)
710        } else {
711            None
712        };
713        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
714        let results: Result<Vec<_>, TypedStoreError> = self
715            .db
716            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
717            .into_iter()
718            .collect();
719        let entries = results?;
720        let entry_size = entries
721            .iter()
722            .flatten()
723            .map(|entry| entry.len())
724            .sum::<usize>();
725        self.db_metrics
726            .op_metrics
727            .rocksdb_multiget_bytes
728            .with_label_values(&[&self.cf])
729            .observe(entry_size as f64);
730        if perf_ctx.is_some() {
731            self.db_metrics
732                .read_perf_ctx_metrics
733                .report_metrics(&self.cf);
734        }
735        Ok(entries)
736    }
737
738    fn get_rocksdb_int_property(
739        rocksdb: &RocksDB,
740        cf: &impl AsColumnFamilyRef,
741        property_name: &std::ffi::CStr,
742    ) -> Result<i64, TypedStoreError> {
743        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
744            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
745            Ok(None) => Ok(0),
746            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
747        }
748    }
749
750    fn report_rocksdb_metrics(
751        database: &Arc<Database>,
752        cf_name: &str,
753        db_metrics: &Arc<DBMetrics>,
754    ) {
755        let Storage::Rocks(rocksdb) = &database.storage else {
756            return;
757        };
758
759        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
760            tracing::warn!(
761                "unable to report metrics for cf {cf_name:?} in db {:?}",
762                database.db_name()
763            );
764            return;
765        };
766
767        db_metrics
768            .cf_metrics
769            .rocksdb_total_sst_files_size
770            .with_label_values(&[cf_name])
771            .set(
772                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
773                    .unwrap_or(METRICS_ERROR),
774            );
775        db_metrics
776            .cf_metrics
777            .rocksdb_total_blob_files_size
778            .with_label_values(&[cf_name])
779            .set(
780                Self::get_rocksdb_int_property(
781                    rocksdb,
782                    &cf,
783                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
784                )
785                .unwrap_or(METRICS_ERROR),
786            );
787        // 7 is the default number of levels in RocksDB. If we ever change the number of levels using `set_num_levels`,
788        // we need to update here as well. Note that there isn't an API to query the DB to get the number of levels (yet).
789        let total_num_files: i64 = (0..=6)
790            .map(|level| {
791                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
792                    .unwrap_or(METRICS_ERROR)
793            })
794            .sum();
795        db_metrics
796            .cf_metrics
797            .rocksdb_total_num_files
798            .with_label_values(&[cf_name])
799            .set(total_num_files);
800        db_metrics
801            .cf_metrics
802            .rocksdb_num_level0_files
803            .with_label_values(&[cf_name])
804            .set(
805                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
806                    .unwrap_or(METRICS_ERROR),
807            );
808        db_metrics
809            .cf_metrics
810            .rocksdb_current_size_active_mem_tables
811            .with_label_values(&[cf_name])
812            .set(
813                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
814                    .unwrap_or(METRICS_ERROR),
815            );
816        db_metrics
817            .cf_metrics
818            .rocksdb_size_all_mem_tables
819            .with_label_values(&[cf_name])
820            .set(
821                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
822                    .unwrap_or(METRICS_ERROR),
823            );
824        db_metrics
825            .cf_metrics
826            .rocksdb_num_snapshots
827            .with_label_values(&[cf_name])
828            .set(
829                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
830                    .unwrap_or(METRICS_ERROR),
831            );
832        db_metrics
833            .cf_metrics
834            .rocksdb_oldest_snapshot_time
835            .with_label_values(&[cf_name])
836            .set(
837                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
838                    .unwrap_or(METRICS_ERROR),
839            );
840        db_metrics
841            .cf_metrics
842            .rocksdb_actual_delayed_write_rate
843            .with_label_values(&[cf_name])
844            .set(
845                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
846                    .unwrap_or(METRICS_ERROR),
847            );
848        db_metrics
849            .cf_metrics
850            .rocksdb_is_write_stopped
851            .with_label_values(&[cf_name])
852            .set(
853                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
854                    .unwrap_or(METRICS_ERROR),
855            );
856        db_metrics
857            .cf_metrics
858            .rocksdb_block_cache_capacity
859            .with_label_values(&[cf_name])
860            .set(
861                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
862                    .unwrap_or(METRICS_ERROR),
863            );
864        db_metrics
865            .cf_metrics
866            .rocksdb_block_cache_usage
867            .with_label_values(&[cf_name])
868            .set(
869                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
870                    .unwrap_or(METRICS_ERROR),
871            );
872        db_metrics
873            .cf_metrics
874            .rocksdb_block_cache_pinned_usage
875            .with_label_values(&[cf_name])
876            .set(
877                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
878                    .unwrap_or(METRICS_ERROR),
879            );
880        db_metrics
881            .cf_metrics
882            .rocksdb_estimate_table_readers_mem
883            .with_label_values(&[cf_name])
884            .set(
885                Self::get_rocksdb_int_property(
886                    rocksdb,
887                    &cf,
888                    properties::ESTIMATE_TABLE_READERS_MEM,
889                )
890                .unwrap_or(METRICS_ERROR),
891            );
892        db_metrics
893            .cf_metrics
894            .rocksdb_estimated_num_keys
895            .with_label_values(&[cf_name])
896            .set(
897                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
898                    .unwrap_or(METRICS_ERROR),
899            );
900        db_metrics
901            .cf_metrics
902            .rocksdb_num_immutable_mem_tables
903            .with_label_values(&[cf_name])
904            .set(
905                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
906                    .unwrap_or(METRICS_ERROR),
907            );
908        db_metrics
909            .cf_metrics
910            .rocksdb_mem_table_flush_pending
911            .with_label_values(&[cf_name])
912            .set(
913                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
914                    .unwrap_or(METRICS_ERROR),
915            );
916        db_metrics
917            .cf_metrics
918            .rocksdb_compaction_pending
919            .with_label_values(&[cf_name])
920            .set(
921                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
922                    .unwrap_or(METRICS_ERROR),
923            );
924        db_metrics
925            .cf_metrics
926            .rocksdb_estimate_pending_compaction_bytes
927            .with_label_values(&[cf_name])
928            .set(
929                Self::get_rocksdb_int_property(
930                    rocksdb,
931                    &cf,
932                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
933                )
934                .unwrap_or(METRICS_ERROR),
935            );
936        db_metrics
937            .cf_metrics
938            .rocksdb_num_running_compactions
939            .with_label_values(&[cf_name])
940            .set(
941                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
942                    .unwrap_or(METRICS_ERROR),
943            );
944        db_metrics
945            .cf_metrics
946            .rocksdb_num_running_flushes
947            .with_label_values(&[cf_name])
948            .set(
949                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
950                    .unwrap_or(METRICS_ERROR),
951            );
952        db_metrics
953            .cf_metrics
954            .rocksdb_estimate_oldest_key_time
955            .with_label_values(&[cf_name])
956            .set(
957                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
958                    .unwrap_or(METRICS_ERROR),
959            );
960        db_metrics
961            .cf_metrics
962            .rocksdb_background_errors
963            .with_label_values(&[cf_name])
964            .set(
965                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
966                    .unwrap_or(METRICS_ERROR),
967            );
968        db_metrics
969            .cf_metrics
970            .rocksdb_base_level
971            .with_label_values(&[cf_name])
972            .set(
973                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
974                    .unwrap_or(METRICS_ERROR),
975            );
976    }
977
978    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
979        self.db.checkpoint(path)
980    }
981
982    pub fn table_summary(&self) -> eyre::Result<TableSummary>
983    where
984        K: Serialize + DeserializeOwned,
985        V: Serialize + DeserializeOwned,
986    {
987        let mut num_keys = 0;
988        let mut key_bytes_total = 0;
989        let mut value_bytes_total = 0;
990        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
991        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
992        for item in self.safe_iter() {
993            let (key, value) = item?;
994            num_keys += 1;
995            let key_len = be_fix_int_ser(key.borrow()).len();
996            let value_len = bcs::to_bytes(value.borrow())?.len();
997            key_bytes_total += key_len;
998            value_bytes_total += value_len;
999            key_hist.record(key_len as u64)?;
1000            value_hist.record(value_len as u64)?;
1001        }
1002        Ok(TableSummary {
1003            num_keys,
1004            key_bytes_total,
1005            value_bytes_total,
1006            key_hist,
1007            value_hist,
1008        })
1009    }
1010
1011    fn start_iter_timer(&self) -> HistogramTimer {
1012        self.db_metrics
1013            .op_metrics
1014            .rocksdb_iter_latency_seconds
1015            .with_label_values(&[&self.cf])
1016            .start_timer()
1017    }
1018
1019    // Creates metrics and context for tracking an iterator usage and performance.
1020    fn create_iter_context(
1021        &self,
1022    ) -> (
1023        Option<HistogramTimer>,
1024        Option<Histogram>,
1025        Option<Histogram>,
1026        Option<RocksDBPerfContext>,
1027    ) {
1028        let timer = self.start_iter_timer();
1029        let bytes_scanned = self
1030            .db_metrics
1031            .op_metrics
1032            .rocksdb_iter_bytes
1033            .with_label_values(&[&self.cf]);
1034        let keys_scanned = self
1035            .db_metrics
1036            .op_metrics
1037            .rocksdb_iter_keys
1038            .with_label_values(&[&self.cf]);
1039        let perf_ctx = if self.iter_sample_interval.sample() {
1040            Some(RocksDBPerfContext)
1041        } else {
1042            None
1043        };
1044        (
1045            Some(timer),
1046            Some(bytes_scanned),
1047            Some(keys_scanned),
1048            perf_ctx,
1049        )
1050    }
1051
1052    /// Creates a safe reversed iterator with optional bounds.
1053    /// Both upper bound and lower bound are included.
1054    #[allow(clippy::complexity)]
1055    pub fn reversed_safe_iter_with_bounds(
1056        &self,
1057        lower_bound: Option<K>,
1058        upper_bound: Option<K>,
1059    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1060    where
1061        K: Serialize + DeserializeOwned,
1062        V: Serialize + DeserializeOwned,
1063    {
1064        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1065            lower_bound
1066                .as_ref()
1067                .map(Bound::Included)
1068                .unwrap_or(Bound::Unbounded),
1069            upper_bound
1070                .as_ref()
1071                .map(Bound::Included)
1072                .unwrap_or(Bound::Unbounded),
1073        ));
1074        match &self.db.storage {
1075            Storage::Rocks(db) => {
1076                let readopts = rocks_util::apply_range_bounds(
1077                    self.opts.readopts(),
1078                    it_lower_bound,
1079                    it_upper_bound,
1080                );
1081                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1082                let db_iter = db
1083                    .underlying
1084                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1085                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1086                let iter = SafeIter::new(
1087                    self.cf.clone(),
1088                    db_iter,
1089                    _timer,
1090                    _perf_ctx,
1091                    bytes_scanned,
1092                    keys_scanned,
1093                    Some(self.db_metrics.clone()),
1094                );
1095                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1096            }
1097            Storage::InMemory(db) => {
1098                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1099            }
1100            #[cfg(tidehunter)]
1101            Storage::TideHunter(db) => match &self.column_family {
1102                ColumnFamily::TideHunter((ks, prefix)) => {
1103                    let mut iter = db.iterator(*ks);
1104                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1105                    iter.reverse();
1106                    Ok(Box::new(transform_th_iterator(
1107                        iter,
1108                        prefix,
1109                        self.start_iter_timer(),
1110                    )))
1111                }
1112                _ => unreachable!("storage backend invariant violation"),
1113            },
1114        }
1115    }
1116}
1117
1118pub enum StorageWriteBatch {
1119    Rocks(rocksdb::WriteBatch),
1120    InMemory(InMemoryBatch),
1121    #[cfg(tidehunter)]
1122    TideHunter(tidehunter::batch::WriteBatch),
1123}
1124
1125/// Provides a mutable struct to form a collection of database write operations, and execute them.
1126///
1127/// Batching write and delete operations is faster than performing them one by one and ensures their atomicity,
1128///  ie. they are all written or none is.
1129/// This is also true of operations across column families in the same database.
1130///
1131/// Serializations / Deserialization, and naming of column families is performed by passing a DBMap<K,V>
1132/// with each operation.
1133///
1134/// ```
1135/// use typed_store::rocks::*;
1136/// use tempfile::tempdir;
1137/// use typed_store::Map;
1138/// use typed_store::metrics::DBMetrics;
1139/// use prometheus::Registry;
1140/// use core::fmt::Error;
1141/// use std::sync::Arc;
1142///
1143/// #[tokio::main]
1144/// async fn main() -> Result<(), Error> {
1145/// let rocks = open_cf_opts(tempfile::tempdir().unwrap(), None, MetricConf::default(), &[("First_CF", rocksdb::Options::default()), ("Second_CF", rocksdb::Options::default())]).unwrap();
1146///
1147/// let db_cf_1 = DBMap::reopen(&rocks, Some("First_CF"), &ReadWriteOptions::default(), false)
1148///     .expect("Failed to open storage");
1149/// let keys_vals_1 = (1..100).map(|i| (i, i.to_string()));
1150///
1151/// let db_cf_2 = DBMap::reopen(&rocks, Some("Second_CF"), &ReadWriteOptions::default(), false)
1152///     .expect("Failed to open storage");
1153/// let keys_vals_2 = (1000..1100).map(|i| (i, i.to_string()));
1154///
1155/// let mut batch = db_cf_1.batch();
1156/// batch
1157///     .insert_batch(&db_cf_1, keys_vals_1.clone())
1158///     .expect("Failed to batch insert")
1159///     .insert_batch(&db_cf_2, keys_vals_2.clone())
1160///     .expect("Failed to batch insert");
1161///
1162/// let _ = batch.write().expect("Failed to execute batch");
1163/// for (k, v) in keys_vals_1 {
1164///     let val = db_cf_1.get(&k).expect("Failed to get inserted key");
1165///     assert_eq!(Some(v), val);
1166/// }
1167///
1168/// for (k, v) in keys_vals_2 {
1169///     let val = db_cf_2.get(&k).expect("Failed to get inserted key");
1170///     assert_eq!(Some(v), val);
1171/// }
1172/// Ok(())
1173/// }
1174/// ```
1175///
1176pub struct DBBatch {
1177    database: Arc<Database>,
1178    batch: StorageWriteBatch,
1179    options: ReadWriteOptions,
1180    db_metrics: Arc<DBMetrics>,
1181    write_sample_interval: SamplingInterval,
1182}
1183
1184impl DBBatch {
1185    /// Create a new batch associated with a DB reference.
1186    ///
1187    /// Use `open_cf` to get the DB reference or an existing open database.
1188    pub fn new(
1189        dbref: &Arc<Database>,
1190        batch: StorageWriteBatch,
1191        options: &ReadWriteOptions,
1192        db_metrics: &Arc<DBMetrics>,
1193        write_sample_interval: &SamplingInterval,
1194    ) -> Self {
1195        DBBatch {
1196            database: dbref.clone(),
1197            batch,
1198            options: options.clone(),
1199            db_metrics: db_metrics.clone(),
1200            write_sample_interval: write_sample_interval.clone(),
1201        }
1202    }
1203
1204    /// Consume the batch and write its operations to the database
1205    #[instrument(level = "trace", skip_all, err)]
1206    pub fn write(self) -> Result<(), TypedStoreError> {
1207        self.write_opt(rocksdb::WriteOptions::default())
1208    }
1209
1210    /// Consume the batch and write its operations to the database with custom write options
1211    #[instrument(level = "trace", skip_all, err)]
1212    pub fn write_opt(
1213        self,
1214        mut write_options: rocksdb::WriteOptions,
1215    ) -> Result<(), TypedStoreError> {
1216        let db_name = self.database.db_name();
1217        let timer = self
1218            .db_metrics
1219            .op_metrics
1220            .rocksdb_batch_commit_latency_seconds
1221            .with_label_values(&[&db_name])
1222            .start_timer();
1223        let batch_size = self.size_in_bytes();
1224
1225        let perf_ctx = if self.write_sample_interval.sample() {
1226            Some(RocksDBPerfContext)
1227        } else {
1228            None
1229        };
1230
1231        if self.options.sync_writes {
1232            write_options.set_sync(true);
1233        }
1234        self.database
1235            .write_opt_internal(self.batch, &write_options)?;
1236
1237        self.db_metrics
1238            .op_metrics
1239            .rocksdb_batch_commit_bytes
1240            .with_label_values(&[&db_name])
1241            .observe(batch_size as f64);
1242
1243        if perf_ctx.is_some() {
1244            self.db_metrics
1245                .write_perf_ctx_metrics
1246                .report_metrics(&db_name);
1247        }
1248        let elapsed = timer.stop_and_record();
1249        if elapsed > 1.0 {
1250            warn!(?elapsed, ?db_name, "very slow batch write");
1251            self.db_metrics
1252                .op_metrics
1253                .rocksdb_very_slow_batch_writes_count
1254                .with_label_values(&[&db_name])
1255                .inc();
1256            self.db_metrics
1257                .op_metrics
1258                .rocksdb_very_slow_batch_writes_duration_ms
1259                .with_label_values(&[&db_name])
1260                .inc_by((elapsed * 1000.0) as u64);
1261        }
1262        Ok(())
1263    }
1264
1265    pub fn size_in_bytes(&self) -> usize {
1266        match self.batch {
1267            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1268            StorageWriteBatch::InMemory(_) => 0,
1269            // TODO: implement size_in_bytes method
1270            #[cfg(tidehunter)]
1271            StorageWriteBatch::TideHunter(_) => 0,
1272        }
1273    }
1274
1275    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1276        &mut self,
1277        db: &DBMap<K, V>,
1278        purged_vals: impl IntoIterator<Item = J>,
1279    ) -> Result<(), TypedStoreError> {
1280        if !Arc::ptr_eq(&db.db, &self.database) {
1281            return Err(TypedStoreError::CrossDBBatch);
1282        }
1283
1284        purged_vals
1285            .into_iter()
1286            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1287                let k_buf = be_fix_int_ser(k.borrow());
1288                match (&mut self.batch, &db.column_family) {
1289                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1290                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1291                    }
1292                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1293                        b.delete_cf(name, k_buf)
1294                    }
1295                    #[cfg(tidehunter)]
1296                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1297                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1298                    }
1299                    _ => Err(TypedStoreError::RocksDBError(
1300                        "typed store invariant violation".to_string(),
1301                    ))?,
1302                }
1303                Ok(())
1304            })?;
1305        Ok(())
1306    }
1307
1308    /// Deletes a range of keys between `from` (inclusive) and `to` (non-inclusive)
1309    /// by writing a range delete tombstone in the db map
1310    /// If the DBMap is configured with ignore_range_deletions set to false,
1311    /// the effect of this write will be visible immediately i.e. you won't
1312    /// see old values when you do a lookup or scan. But if it is configured
1313    /// with ignore_range_deletions set to true, the old value are visible until
1314    /// compaction actually deletes them which will happen sometime after. By
1315    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1316    /// overridden in the config), so please use this function with caution
1317    pub fn schedule_delete_range<K: Serialize, V>(
1318        &mut self,
1319        db: &DBMap<K, V>,
1320        from: &K,
1321        to: &K,
1322    ) -> Result<(), TypedStoreError> {
1323        if !Arc::ptr_eq(&db.db, &self.database) {
1324            return Err(TypedStoreError::CrossDBBatch);
1325        }
1326
1327        let from_buf = be_fix_int_ser(from);
1328        let to_buf = be_fix_int_ser(to);
1329
1330        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1331            b.delete_range_cf(
1332                &rocks_cf_from_db(&self.database, db.cf_name())?,
1333                from_buf,
1334                to_buf,
1335            );
1336        }
1337        Ok(())
1338    }
1339
1340    /// inserts a range of (key, value) pairs given as an iterator
1341    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1342        &mut self,
1343        db: &DBMap<K, V>,
1344        new_vals: impl IntoIterator<Item = (J, U)>,
1345    ) -> Result<&mut Self, TypedStoreError> {
1346        if !Arc::ptr_eq(&db.db, &self.database) {
1347            return Err(TypedStoreError::CrossDBBatch);
1348        }
1349        let mut total = 0usize;
1350        new_vals
1351            .into_iter()
1352            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1353                let k_buf = be_fix_int_ser(k.borrow());
1354                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1355                total += k_buf.len() + v_buf.len();
1356                if db.opts.log_value_hash {
1357                    let key_hash = default_hash(&k_buf);
1358                    let value_hash = default_hash(&v_buf);
1359                    debug!(
1360                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1361                        db.cf_name(),
1362                        key_hash,
1363                        value_hash
1364                    );
1365                }
1366                match (&mut self.batch, &db.column_family) {
1367                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1368                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1369                    }
1370                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1371                        b.put_cf(name, k_buf, v_buf)
1372                    }
1373                    #[cfg(tidehunter)]
1374                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1375                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1376                    }
1377                    _ => Err(TypedStoreError::RocksDBError(
1378                        "typed store invariant violation".to_string(),
1379                    ))?,
1380                }
1381                Ok(())
1382            })?;
1383        self.db_metrics
1384            .op_metrics
1385            .rocksdb_batch_put_bytes
1386            .with_label_values(&[&db.cf])
1387            .observe(total as f64);
1388        Ok(self)
1389    }
1390
1391    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1392        &mut self,
1393        db: &DBMap<K, V>,
1394        new_vals: impl IntoIterator<Item = (J, U)>,
1395    ) -> Result<&mut Self, TypedStoreError> {
1396        if !Arc::ptr_eq(&db.db, &self.database) {
1397            return Err(TypedStoreError::CrossDBBatch);
1398        }
1399        new_vals
1400            .into_iter()
1401            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1402                let k_buf = be_fix_int_ser(k.borrow());
1403                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1404                match &mut self.batch {
1405                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1406                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1407                        k_buf,
1408                        v_buf,
1409                    ),
1410                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1411                }
1412                Ok(())
1413            })?;
1414        Ok(self)
1415    }
1416}
1417
1418impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1419where
1420    K: Serialize + DeserializeOwned,
1421    V: Serialize + DeserializeOwned,
1422{
1423    type Error = TypedStoreError;
1424
1425    #[instrument(level = "trace", skip_all, err)]
1426    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1427        let key_buf = be_fix_int_ser(key);
1428        let readopts = self.opts.readopts();
1429        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1430            && self
1431                .db
1432                .get(&self.column_family, &key_buf, &readopts)?
1433                .is_some())
1434    }
1435
1436    #[instrument(level = "trace", skip_all, err)]
1437    fn multi_contains_keys<J>(
1438        &self,
1439        keys: impl IntoIterator<Item = J>,
1440    ) -> Result<Vec<bool>, Self::Error>
1441    where
1442        J: Borrow<K>,
1443    {
1444        let values = self.multi_get_pinned(keys)?;
1445        Ok(values.into_iter().map(|v| v.is_some()).collect())
1446    }
1447
1448    #[instrument(level = "trace", skip_all, err)]
1449    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1450        let _timer = self
1451            .db_metrics
1452            .op_metrics
1453            .rocksdb_get_latency_seconds
1454            .with_label_values(&[&self.cf])
1455            .start_timer();
1456        let perf_ctx = if self.get_sample_interval.sample() {
1457            Some(RocksDBPerfContext)
1458        } else {
1459            None
1460        };
1461        let key_buf = be_fix_int_ser(key);
1462        let res = self
1463            .db
1464            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1465        self.db_metrics
1466            .op_metrics
1467            .rocksdb_get_bytes
1468            .with_label_values(&[&self.cf])
1469            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1470        if perf_ctx.is_some() {
1471            self.db_metrics
1472                .read_perf_ctx_metrics
1473                .report_metrics(&self.cf);
1474        }
1475        match res {
1476            Some(data) => {
1477                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1478                if value.is_err() {
1479                    let key_hash = default_hash(&key_buf);
1480                    let value_hash = default_hash(&data);
1481                    debug_fatal!(
1482                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1483                        self.cf_name(),
1484                        key_hash,
1485                        value_hash,
1486                        value.as_ref().err().unwrap()
1487                    );
1488                }
1489                Ok(Some(value?))
1490            }
1491            None => Ok(None),
1492        }
1493    }
1494
1495    #[instrument(level = "trace", skip_all, err)]
1496    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1497        let timer = self
1498            .db_metrics
1499            .op_metrics
1500            .rocksdb_put_latency_seconds
1501            .with_label_values(&[&self.cf])
1502            .start_timer();
1503        let perf_ctx = if self.write_sample_interval.sample() {
1504            Some(RocksDBPerfContext)
1505        } else {
1506            None
1507        };
1508        let key_buf = be_fix_int_ser(key);
1509        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1510        self.db_metrics
1511            .op_metrics
1512            .rocksdb_put_bytes
1513            .with_label_values(&[&self.cf])
1514            .observe((key_buf.len() + value_buf.len()) as f64);
1515        if perf_ctx.is_some() {
1516            self.db_metrics
1517                .write_perf_ctx_metrics
1518                .report_metrics(&self.cf);
1519        }
1520        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1521
1522        let elapsed = timer.stop_and_record();
1523        if elapsed > 1.0 {
1524            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1525            self.db_metrics
1526                .op_metrics
1527                .rocksdb_very_slow_puts_count
1528                .with_label_values(&[&self.cf])
1529                .inc();
1530            self.db_metrics
1531                .op_metrics
1532                .rocksdb_very_slow_puts_duration_ms
1533                .with_label_values(&[&self.cf])
1534                .inc_by((elapsed * 1000.0) as u64);
1535        }
1536
1537        Ok(())
1538    }
1539
1540    #[instrument(level = "trace", skip_all, err)]
1541    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1542        let _timer = self
1543            .db_metrics
1544            .op_metrics
1545            .rocksdb_delete_latency_seconds
1546            .with_label_values(&[&self.cf])
1547            .start_timer();
1548        let perf_ctx = if self.write_sample_interval.sample() {
1549            Some(RocksDBPerfContext)
1550        } else {
1551            None
1552        };
1553        let key_buf = be_fix_int_ser(key);
1554        self.db.delete_cf(&self.column_family, key_buf)?;
1555        self.db_metrics
1556            .op_metrics
1557            .rocksdb_deletes
1558            .with_label_values(&[&self.cf])
1559            .inc();
1560        if perf_ctx.is_some() {
1561            self.db_metrics
1562                .write_perf_ctx_metrics
1563                .report_metrics(&self.cf);
1564        }
1565        Ok(())
1566    }
1567
1568    /// Writes a range delete tombstone to delete all entries in the db map
1569    /// If the DBMap is configured with ignore_range_deletions set to false,
1570    /// the effect of this write will be visible immediately i.e. you won't
1571    /// see old values when you do a lookup or scan. But if it is configured
1572    /// with ignore_range_deletions set to true, the old value are visible until
1573    /// compaction actually deletes them which will happen sometime after. By
1574    /// default ignore_range_deletions is set to true on a DBMap (unless it is
1575    /// overridden in the config), so please use this function with caution
1576    #[instrument(level = "trace", skip_all, err)]
1577    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1578        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1579        let last_key = self
1580            .reversed_safe_iter_with_bounds(None, None)?
1581            .next()
1582            .transpose()?
1583            .map(|(k, _v)| k);
1584        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1585            let mut batch = self.batch();
1586            batch.schedule_delete_range(self, &first_key, &last_key)?;
1587            batch.write()?;
1588        }
1589        Ok(())
1590    }
1591
1592    fn is_empty(&self) -> bool {
1593        self.safe_iter().next().is_none()
1594    }
1595
1596    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1597        match &self.db.storage {
1598            Storage::Rocks(db) => {
1599                let db_iter = db
1600                    .underlying
1601                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1602                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1603                Box::new(SafeIter::new(
1604                    self.cf.clone(),
1605                    db_iter,
1606                    _timer,
1607                    _perf_ctx,
1608                    bytes_scanned,
1609                    keys_scanned,
1610                    Some(self.db_metrics.clone()),
1611                ))
1612            }
1613            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1614            #[cfg(tidehunter)]
1615            Storage::TideHunter(db) => match &self.column_family {
1616                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1617                    db.iterator(*ks),
1618                    prefix,
1619                    self.start_iter_timer(),
1620                )),
1621                _ => unreachable!("storage backend invariant violation"),
1622            },
1623        }
1624    }
1625
1626    fn safe_iter_with_bounds(
1627        &'a self,
1628        lower_bound: Option<K>,
1629        upper_bound: Option<K>,
1630    ) -> DbIterator<'a, (K, V)> {
1631        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1632        match &self.db.storage {
1633            Storage::Rocks(db) => {
1634                let readopts =
1635                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1636                let db_iter = db
1637                    .underlying
1638                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1639                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1640                Box::new(SafeIter::new(
1641                    self.cf.clone(),
1642                    db_iter,
1643                    _timer,
1644                    _perf_ctx,
1645                    bytes_scanned,
1646                    keys_scanned,
1647                    Some(self.db_metrics.clone()),
1648                ))
1649            }
1650            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1651            #[cfg(tidehunter)]
1652            Storage::TideHunter(db) => match &self.column_family {
1653                ColumnFamily::TideHunter((ks, prefix)) => {
1654                    let mut iter = db.iterator(*ks);
1655                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1656                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1657                }
1658                _ => unreachable!("storage backend invariant violation"),
1659            },
1660        }
1661    }
1662
1663    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1664        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1665        match &self.db.storage {
1666            Storage::Rocks(db) => {
1667                let readopts =
1668                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1669                let db_iter = db
1670                    .underlying
1671                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1672                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1673                Box::new(SafeIter::new(
1674                    self.cf.clone(),
1675                    db_iter,
1676                    _timer,
1677                    _perf_ctx,
1678                    bytes_scanned,
1679                    keys_scanned,
1680                    Some(self.db_metrics.clone()),
1681                ))
1682            }
1683            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1684            #[cfg(tidehunter)]
1685            Storage::TideHunter(db) => match &self.column_family {
1686                ColumnFamily::TideHunter((ks, prefix)) => {
1687                    let mut iter = db.iterator(*ks);
1688                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1689                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1690                }
1691                _ => unreachable!("storage backend invariant violation"),
1692            },
1693        }
1694    }
1695
1696    /// Returns a vector of values corresponding to the keys provided.
1697    #[instrument(level = "trace", skip_all, err)]
1698    fn multi_get<J>(
1699        &self,
1700        keys: impl IntoIterator<Item = J>,
1701    ) -> Result<Vec<Option<V>>, TypedStoreError>
1702    where
1703        J: Borrow<K>,
1704    {
1705        let results = self.multi_get_pinned(keys)?;
1706        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1707            .into_iter()
1708            .map(|value_byte| match value_byte {
1709                Some(data) => Ok(Some(
1710                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1711                )),
1712                None => Ok(None),
1713            })
1714            .collect();
1715
1716        values_parsed
1717    }
1718
1719    /// Convenience method for batch insertion
1720    #[instrument(level = "trace", skip_all, err)]
1721    fn multi_insert<J, U>(
1722        &self,
1723        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1724    ) -> Result<(), Self::Error>
1725    where
1726        J: Borrow<K>,
1727        U: Borrow<V>,
1728    {
1729        let mut batch = self.batch();
1730        batch.insert_batch(self, key_val_pairs)?;
1731        batch.write()
1732    }
1733
1734    /// Convenience method for batch removal
1735    #[instrument(level = "trace", skip_all, err)]
1736    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1737    where
1738        J: Borrow<K>,
1739    {
1740        let mut batch = self.batch();
1741        batch.delete_batch(self, keys)?;
1742        batch.write()
1743    }
1744
1745    /// Try to catch up with primary when running as secondary
1746    #[instrument(level = "trace", skip_all, err)]
1747    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1748        if let Storage::Rocks(rocks) = &self.db.storage {
1749            rocks
1750                .underlying
1751                .try_catch_up_with_primary()
1752                .map_err(typed_store_err_from_rocks_err)?;
1753        }
1754        Ok(())
1755    }
1756}
1757
1758/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1759#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1760pub fn open_cf_opts<P: AsRef<Path>>(
1761    path: P,
1762    db_options: Option<rocksdb::Options>,
1763    metric_conf: MetricConf,
1764    opt_cfs: &[(&str, rocksdb::Options)],
1765) -> Result<Arc<Database>, TypedStoreError> {
1766    let path = path.as_ref();
1767    // In the simulator, we intercept the wall clock in the test thread only. This causes problems
1768    // because rocksdb uses the simulated clock when creating its background threads, but then
1769    // those threads see the real wall clock (because they are not the test thread), which causes
1770    // rocksdb to panic. The `nondeterministic` macro evaluates expressions in new threads, which
1771    // resolves the issue.
1772    //
1773    // This is a no-op in non-simulator builds.
1774
1775    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1776    nondeterministic!({
1777        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1778        options.create_if_missing(true);
1779        options.create_missing_column_families(true);
1780        let rocksdb = {
1781            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1782                &options,
1783                path,
1784                cfs.into_iter()
1785                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1786            )
1787            .map_err(typed_store_err_from_rocks_err)?
1788        };
1789        Ok(Arc::new(Database::new(
1790            Storage::Rocks(RocksDB {
1791                underlying: rocksdb,
1792            }),
1793            metric_conf,
1794        )))
1795    })
1796}
1797
1798/// Opens a database with options, and a number of column families with individual options that are created if they do not exist.
1799pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1800    primary_path: P,
1801    secondary_path: Option<P>,
1802    db_options: Option<rocksdb::Options>,
1803    metric_conf: MetricConf,
1804    opt_cfs: &[(&str, rocksdb::Options)],
1805) -> Result<Arc<Database>, TypedStoreError> {
1806    let primary_path = primary_path.as_ref();
1807    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1808    // See comment above for explanation of why nondeterministic is necessary here.
1809    nondeterministic!({
1810        // Customize database options
1811        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1812
1813        fdlimit::raise_fd_limit();
1814        // This is a requirement by RocksDB when opening as secondary
1815        options.set_max_open_files(-1);
1816
1817        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1818        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1819            .ok()
1820            .unwrap_or_default();
1821
1822        let default_db_options = default_db_options();
1823        // Add CFs not explicitly listed
1824        for cf_key in cfs.iter() {
1825            if !opt_cfs.contains_key(&cf_key[..]) {
1826                opt_cfs.insert(cf_key, default_db_options.options.clone());
1827            }
1828        }
1829
1830        let primary_path = primary_path.to_path_buf();
1831        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1832            let mut s = primary_path.clone();
1833            s.pop();
1834            s.push("SECONDARY");
1835            s.as_path().to_path_buf()
1836        });
1837
1838        let rocksdb = {
1839            options.create_if_missing(true);
1840            options.create_missing_column_families(true);
1841            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1842                &options,
1843                &primary_path,
1844                &secondary_path,
1845                opt_cfs
1846                    .iter()
1847                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1848            )
1849            .map_err(typed_store_err_from_rocks_err)?;
1850            db.try_catch_up_with_primary()
1851                .map_err(typed_store_err_from_rocks_err)?;
1852            db
1853        };
1854        Ok(Arc::new(Database::new(
1855            Storage::Rocks(RocksDB {
1856                underlying: rocksdb,
1857            }),
1858            metric_conf,
1859        )))
1860    })
1861}
1862
1863// Drops a database if there is no other handle to it, with retries and timeout.
1864#[cfg(not(tidehunter))]
1865pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1866    let mut backoff = backoff::ExponentialBackoff {
1867        max_elapsed_time: Some(timeout),
1868        ..Default::default()
1869    };
1870    loop {
1871        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1872            Ok(()) => return Ok(()),
1873            Err(err) => match backoff.next_backoff() {
1874                Some(duration) => tokio::time::sleep(duration).await,
1875                None => return Err(err),
1876            },
1877        }
1878    }
1879}
1880
1881#[cfg(tidehunter)]
1882pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1883    std::fs::remove_dir_all(path)
1884}
1885
1886fn populate_missing_cfs(
1887    input_cfs: &[(&str, rocksdb::Options)],
1888    path: &Path,
1889) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1890    let mut cfs = vec![];
1891    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1892    let existing_cfs =
1893        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1894            .ok()
1895            .unwrap_or_default();
1896
1897    for cf_name in existing_cfs {
1898        if !input_cf_index.contains(&cf_name[..]) {
1899            cfs.push((cf_name, rocksdb::Options::default()));
1900        }
1901    }
1902    cfs.extend(
1903        input_cfs
1904            .iter()
1905            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1906    );
1907    Ok(cfs)
1908}
1909
1910fn default_hash(value: &[u8]) -> Digest<32> {
1911    let mut hasher = fastcrypto::hash::Blake2b256::default();
1912    hasher.update(value);
1913    hasher.finalize()
1914}