1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12    DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17    apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{be_fix_int_ser, iterator_bounds, iterator_bounds_with_range};
20use crate::{DbIterator, TypedStoreError};
21use crate::{
22    metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
23    traits::{Map, TableSummary},
24};
25use backoff::backoff::Backoff;
26use fastcrypto::hash::{Digest, HashFunction};
27use mysten_common::debug_fatal;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::properties::num_files_at_level;
30use rocksdb::{
31    AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
32    properties,
33};
34use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
35use serde::{Serialize, de::DeserializeOwned};
36use std::ops::{Bound, Deref};
37use std::{
38    borrow::Borrow,
39    marker::PhantomData,
40    ops::RangeBounds,
41    path::{Path, PathBuf},
42    sync::Arc,
43    time::Duration,
44};
45use std::{collections::HashSet, ffi::CStr};
46use sui_macros::{fail_point, nondeterministic};
47#[cfg(tidehunter)]
48use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
49use tokio::sync::oneshot;
50use tracing::{debug, error, instrument, warn};
51
52const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
55    unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
56
57#[cfg(test)]
58mod tests;
59
60#[derive(Debug)]
61pub struct RocksDB {
62    pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
63}
64
65impl Drop for RocksDB {
66    fn drop(&mut self) {
67        self.underlying.cancel_all_background_work(true);
68    }
69}
70
71#[derive(Clone)]
72pub enum ColumnFamily {
73    Rocks(String),
74    InMemory(String),
75    #[cfg(tidehunter)]
76    TideHunter((KeySpace, Option<Vec<u8>>)),
77}
78
79impl std::fmt::Debug for ColumnFamily {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
83            ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
84            #[cfg(tidehunter)]
85            ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
86        }
87    }
88}
89
90impl ColumnFamily {
91    fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
92        match &self {
93            ColumnFamily::Rocks(name) => rocks_db
94                .underlying
95                .cf_handle(name)
96                .expect("Map-keying column family should have been checked at DB creation"),
97            _ => unreachable!("invariant is checked by the caller"),
98        }
99    }
100}
101
102pub enum Storage {
103    Rocks(RocksDB),
104    InMemory(InMemoryDB),
105    #[cfg(tidehunter)]
106    TideHunter(Arc<TideHunterDb>),
107}
108
109impl std::fmt::Debug for Storage {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        match self {
112            Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
113            Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
114            #[cfg(tidehunter)]
115            Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
116        }
117    }
118}
119
120#[derive(Debug)]
121pub struct Database {
122    storage: Storage,
123    metric_conf: MetricConf,
124}
125
126impl Drop for Database {
127    fn drop(&mut self) {
128        DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
129    }
130}
131
132enum GetResult<'a> {
133    Rocks(DBPinnableSlice<'a>),
134    InMemory(Vec<u8>),
135    #[cfg(tidehunter)]
136    TideHunter(tidehunter::minibytes::Bytes),
137}
138
139impl Deref for GetResult<'_> {
140    type Target = [u8];
141    fn deref(&self) -> &[u8] {
142        match self {
143            GetResult::Rocks(d) => d.deref(),
144            GetResult::InMemory(d) => d.deref(),
145            #[cfg(tidehunter)]
146            GetResult::TideHunter(d) => d.deref(),
147        }
148    }
149}
150
151impl Database {
152    pub fn new(storage: Storage, metric_conf: MetricConf) -> Self {
153        DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154        Self {
155            storage,
156            metric_conf,
157        }
158    }
159
160    pub fn flush(&self) -> Result<(), TypedStoreError> {
162        match &self.storage {
163            Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
164                TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
165            }),
166            Storage::InMemory(_) => {
167                Ok(())
169            }
170            #[cfg(tidehunter)]
171            Storage::TideHunter(_) => {
172                Ok(())
174            }
175        }
176    }
177
178    fn get<K: AsRef<[u8]>>(
179        &self,
180        cf: &ColumnFamily,
181        key: K,
182        readopts: &ReadOptions,
183    ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
184        match (&self.storage, cf) {
185            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
186                .underlying
187                .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
188                .map_err(typed_store_err_from_rocks_err)?
189                .map(GetResult::Rocks)),
190            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
191                Ok(db.get(cf_name, key).map(GetResult::InMemory))
192            }
193            #[cfg(tidehunter)]
194            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
195                .get(*ks, &transform_th_key(key.as_ref(), prefix))
196                .map_err(typed_store_error_from_th_error)?
197                .map(GetResult::TideHunter)),
198
199            _ => Err(TypedStoreError::RocksDBError(
200                "typed store invariant violation".to_string(),
201            )),
202        }
203    }
204
205    fn multi_get<I, K>(
206        &self,
207        cf: &ColumnFamily,
208        keys: I,
209        readopts: &ReadOptions,
210    ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
211    where
212        I: IntoIterator<Item = K>,
213        K: AsRef<[u8]>,
214    {
215        match (&self.storage, cf) {
216            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
217                let keys_vec: Vec<K> = keys.into_iter().collect();
218                let res = db.underlying.batched_multi_get_cf_opt(
219                    &cf.rocks_cf(db),
220                    keys_vec.iter(),
221                    false,
222                    readopts,
223                );
224                res.into_iter()
225                    .map(|r| {
226                        r.map_err(typed_store_err_from_rocks_err)
227                            .map(|item| item.map(GetResult::Rocks))
228                    })
229                    .collect()
230            }
231            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
232                .multi_get(cf_name, keys)
233                .into_iter()
234                .map(|r| Ok(r.map(GetResult::InMemory)))
235                .collect(),
236            #[cfg(tidehunter)]
237            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
238                let res = keys.into_iter().map(|k| {
239                    db.get(*ks, &transform_th_key(k.as_ref(), prefix))
240                        .map_err(typed_store_error_from_th_error)
241                });
242                res.into_iter()
243                    .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
244                    .collect()
245            }
246            _ => unreachable!("typed store invariant violation"),
247        }
248    }
249
250    pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
251        match &self.storage {
252            Storage::Rocks(db) => db.underlying.drop_cf(name),
253            Storage::InMemory(db) => {
254                db.drop_cf(name);
255                Ok(())
256            }
257            #[cfg(tidehunter)]
258            Storage::TideHunter(_) => {
259                unimplemented!("TideHunter: deletion of column family on a fly not implemented")
260            }
261        }
262    }
263
264    pub fn delete_file_in_range<K: AsRef<[u8]>>(
265        &self,
266        cf: &impl AsColumnFamilyRef,
267        from: K,
268        to: K,
269    ) -> Result<(), rocksdb::Error> {
270        match &self.storage {
271            Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
272            _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
273        }
274    }
275
276    fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
277        fail_point!("delete-cf-before");
278        let ret = match (&self.storage, cf) {
279            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
280                .underlying
281                .delete_cf(&cf.rocks_cf(db), key)
282                .map_err(typed_store_err_from_rocks_err),
283            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
284                db.delete(cf_name, key.as_ref());
285                Ok(())
286            }
287            #[cfg(tidehunter)]
288            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
289                .remove(*ks, transform_th_key(key.as_ref(), prefix))
290                .map_err(typed_store_error_from_th_error),
291            _ => Err(TypedStoreError::RocksDBError(
292                "typed store invariant violation".to_string(),
293            )),
294        };
295        fail_point!("delete-cf-after");
296        #[allow(clippy::let_and_return)]
297        ret
298    }
299
300    pub fn path_for_pruning(&self) -> &Path {
301        match &self.storage {
302            Storage::Rocks(rocks) => rocks.underlying.path(),
303            _ => unimplemented!("method is only supported for rocksdb backend"),
304        }
305    }
306
307    fn put_cf(
308        &self,
309        cf: &ColumnFamily,
310        key: Vec<u8>,
311        value: Vec<u8>,
312    ) -> Result<(), TypedStoreError> {
313        fail_point!("put-cf-before");
314        let ret = match (&self.storage, cf) {
315            (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
316                .underlying
317                .put_cf(&cf.rocks_cf(db), key, value)
318                .map_err(typed_store_err_from_rocks_err),
319            (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
320                db.put(cf_name, key, value);
321                Ok(())
322            }
323            #[cfg(tidehunter)]
324            (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
325                .insert(*ks, transform_th_key(&key, prefix), value)
326                .map_err(typed_store_error_from_th_error),
327            _ => Err(TypedStoreError::RocksDBError(
328                "typed store invariant violation".to_string(),
329            )),
330        };
331        fail_point!("put-cf-after");
332        #[allow(clippy::let_and_return)]
333        ret
334    }
335
336    pub fn key_may_exist_cf<K: AsRef<[u8]>>(
337        &self,
338        cf_name: &str,
339        key: K,
340        readopts: &ReadOptions,
341    ) -> bool {
342        match &self.storage {
343            Storage::Rocks(rocks) => {
346                rocks
347                    .underlying
348                    .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
349            }
350            _ => true,
351        }
352    }
353
354    pub fn write(&self, batch: StorageWriteBatch) -> Result<(), TypedStoreError> {
355        self.write_opt(batch, &rocksdb::WriteOptions::default())
356    }
357
358    pub fn write_opt(
359        &self,
360        batch: StorageWriteBatch,
361        write_options: &rocksdb::WriteOptions,
362    ) -> Result<(), TypedStoreError> {
363        fail_point!("batch-write-before");
364        let ret = match (&self.storage, batch) {
365            (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
366                .underlying
367                .write_opt(batch, write_options)
368                .map_err(typed_store_err_from_rocks_err),
369            (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
370                db.write(batch);
372                Ok(())
373            }
374            #[cfg(tidehunter)]
375            (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
376                db.write_batch(batch)
378                    .map_err(typed_store_error_from_th_error)
379            }
380            _ => Err(TypedStoreError::RocksDBError(
381                "using invalid batch type for the database".to_string(),
382            )),
383        };
384        fail_point!("batch-write-after");
385        #[allow(clippy::let_and_return)]
386        ret
387    }
388
389    #[cfg(tidehunter)]
390    pub fn start_relocation(&self) -> anyhow::Result<()> {
391        if let Storage::TideHunter(db) = &self.storage {
392            db.start_relocation()?;
393        }
394        Ok(())
395    }
396
397    pub fn compact_range_cf<K: AsRef<[u8]>>(
398        &self,
399        cf_name: &str,
400        start: Option<K>,
401        end: Option<K>,
402    ) {
403        if let Storage::Rocks(rocksdb) = &self.storage {
404            rocksdb
405                .underlying
406                .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
407        }
408    }
409
410    pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
411        if let Storage::Rocks(rocks) = &self.storage {
413            let checkpoint =
414                Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
415            checkpoint
416                .create_checkpoint(path)
417                .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
418        }
419        Ok(())
420    }
421
422    pub fn get_sampling_interval(&self) -> SamplingInterval {
423        self.metric_conf.read_sample_interval.new_from_self()
424    }
425
426    pub fn multiget_sampling_interval(&self) -> SamplingInterval {
427        self.metric_conf.read_sample_interval.new_from_self()
428    }
429
430    pub fn write_sampling_interval(&self) -> SamplingInterval {
431        self.metric_conf.write_sample_interval.new_from_self()
432    }
433
434    pub fn iter_sampling_interval(&self) -> SamplingInterval {
435        self.metric_conf.iter_sample_interval.new_from_self()
436    }
437
438    fn db_name(&self) -> String {
439        let name = &self.metric_conf.db_name;
440        if name.is_empty() {
441            "default".to_string()
442        } else {
443            name.clone()
444        }
445    }
446
447    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
448        match &self.storage {
449            Storage::Rocks(rocks) => rocks.underlying.live_files(),
450            _ => Ok(vec![]),
451        }
452    }
453}
454
455fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
456    rocks_db
457        .underlying
458        .cf_handle(cf_name)
459        .expect("Map-keying column family should have been checked at DB creation")
460}
461
462fn rocks_cf_from_db<'a>(
463    db: &'a Database,
464    cf_name: &str,
465) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
466    match &db.storage {
467        Storage::Rocks(rocksdb) => Ok(rocksdb
468            .underlying
469            .cf_handle(cf_name)
470            .expect("Map-keying column family should have been checked at DB creation")),
471        _ => Err(TypedStoreError::RocksDBError(
472            "using invalid batch type for the database".to_string(),
473        )),
474    }
475}
476
477#[derive(Debug, Default)]
478pub struct MetricConf {
479    pub db_name: String,
480    pub read_sample_interval: SamplingInterval,
481    pub write_sample_interval: SamplingInterval,
482    pub iter_sample_interval: SamplingInterval,
483}
484
485impl MetricConf {
486    pub fn new(db_name: &str) -> Self {
487        if db_name.is_empty() {
488            error!("A meaningful db name should be used for metrics reporting.")
489        }
490        Self {
491            db_name: db_name.to_string(),
492            read_sample_interval: SamplingInterval::default(),
493            write_sample_interval: SamplingInterval::default(),
494            iter_sample_interval: SamplingInterval::default(),
495        }
496    }
497
498    pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
499        Self {
500            db_name: self.db_name,
501            read_sample_interval: read_interval,
502            write_sample_interval: SamplingInterval::default(),
503            iter_sample_interval: SamplingInterval::default(),
504        }
505    }
506}
507const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
508const METRICS_ERROR: i64 = -1;
509
510#[derive(Clone, Debug)]
512pub struct DBMap<K, V> {
513    pub db: Arc<Database>,
514    _phantom: PhantomData<fn(K) -> V>,
515    column_family: ColumnFamily,
516    cf: String,
518    pub opts: ReadWriteOptions,
519    db_metrics: Arc<DBMetrics>,
520    get_sample_interval: SamplingInterval,
521    multiget_sample_interval: SamplingInterval,
522    write_sample_interval: SamplingInterval,
523    iter_sample_interval: SamplingInterval,
524    _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
525}
526
527unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
528
529impl<K, V> DBMap<K, V> {
530    pub(crate) fn new(
531        db: Arc<Database>,
532        opts: &ReadWriteOptions,
533        opt_cf: &str,
534        column_family: ColumnFamily,
535        is_deprecated: bool,
536    ) -> Self {
537        let db_cloned = Arc::downgrade(&db.clone());
538        let db_metrics = DBMetrics::get();
539        let db_metrics_cloned = db_metrics.clone();
540        let cf = opt_cf.to_string();
541
542        let (sender, mut recv) = tokio::sync::oneshot::channel();
543        if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
544            tokio::task::spawn(async move {
545                let mut interval =
546                    tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
547                loop {
548                    tokio::select! {
549                        _ = interval.tick() => {
550                            if let Some(db) = db_cloned.upgrade() {
551                                let cf = cf.clone();
552                                let db_metrics = db_metrics.clone();
553                                if let Err(e) = tokio::task::spawn_blocking(move || {
554                                    Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
555                                }).await {
556                                    error!("Failed to log metrics with error: {}", e);
557                                }
558                            }
559                        }
560                        _ = &mut recv => break,
561                    }
562                }
563                debug!("Returning the cf metric logging task for DBMap: {}", &cf);
564            });
565        }
566        DBMap {
567            db: db.clone(),
568            opts: opts.clone(),
569            _phantom: PhantomData,
570            column_family,
571            cf: opt_cf.to_string(),
572            db_metrics: db_metrics_cloned,
573            _metrics_task_cancel_handle: Arc::new(sender),
574            get_sample_interval: db.get_sampling_interval(),
575            multiget_sample_interval: db.multiget_sampling_interval(),
576            write_sample_interval: db.write_sampling_interval(),
577            iter_sample_interval: db.iter_sampling_interval(),
578        }
579    }
580
581    #[instrument(level = "debug", skip(db), err)]
584    pub fn reopen(
585        db: &Arc<Database>,
586        opt_cf: Option<&str>,
587        rw_options: &ReadWriteOptions,
588        is_deprecated: bool,
589    ) -> Result<Self, TypedStoreError> {
590        let cf_key = opt_cf
591            .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
592            .to_owned();
593        Ok(DBMap::new(
594            db.clone(),
595            rw_options,
596            &cf_key,
597            ColumnFamily::Rocks(cf_key.to_string()),
598            is_deprecated,
599        ))
600    }
601
602    #[cfg(tidehunter)]
603    pub fn reopen_th(
604        db: Arc<Database>,
605        cf_name: &str,
606        ks: KeySpace,
607        prefix: Option<Vec<u8>>,
608    ) -> Self {
609        DBMap::new(
610            db,
611            &ReadWriteOptions::default(),
612            cf_name,
613            ColumnFamily::TideHunter((ks, prefix.clone())),
614            false,
615        )
616    }
617
618    pub fn cf_name(&self) -> &str {
619        &self.cf
620    }
621
622    pub fn batch(&self) -> DBBatch {
623        let batch = match &self.db.storage {
624            Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
625            Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
626            #[cfg(tidehunter)]
627            Storage::TideHunter(_) => {
628                StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
629            }
630        };
631        DBBatch::new(
632            &self.db,
633            batch,
634            &self.db_metrics,
635            &self.write_sample_interval,
636        )
637    }
638
639    pub fn flush(&self) -> Result<(), TypedStoreError> {
640        self.db.flush()
641    }
642
643    pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
644        let from_buf = be_fix_int_ser(start);
645        let to_buf = be_fix_int_ser(end);
646        self.db
647            .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
648        Ok(())
649    }
650
651    pub fn compact_range_raw(
652        &self,
653        cf_name: &str,
654        start: Vec<u8>,
655        end: Vec<u8>,
656    ) -> Result<(), TypedStoreError> {
657        self.db.compact_range_cf(cf_name, Some(start), Some(end));
658        Ok(())
659    }
660
661    fn multi_get_pinned<J>(
663        &self,
664        keys: impl IntoIterator<Item = J>,
665    ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
666    where
667        J: Borrow<K>,
668        K: Serialize,
669    {
670        let _timer = self
671            .db_metrics
672            .op_metrics
673            .rocksdb_multiget_latency_seconds
674            .with_label_values(&[&self.cf])
675            .start_timer();
676        let perf_ctx = if self.multiget_sample_interval.sample() {
677            Some(RocksDBPerfContext)
678        } else {
679            None
680        };
681        let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
682        let results: Result<Vec<_>, TypedStoreError> = self
683            .db
684            .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
685            .into_iter()
686            .collect();
687        let entries = results?;
688        let entry_size = entries
689            .iter()
690            .flatten()
691            .map(|entry| entry.len())
692            .sum::<usize>();
693        self.db_metrics
694            .op_metrics
695            .rocksdb_multiget_bytes
696            .with_label_values(&[&self.cf])
697            .observe(entry_size as f64);
698        if perf_ctx.is_some() {
699            self.db_metrics
700                .read_perf_ctx_metrics
701                .report_metrics(&self.cf);
702        }
703        Ok(entries)
704    }
705
706    fn get_rocksdb_int_property(
707        rocksdb: &RocksDB,
708        cf: &impl AsColumnFamilyRef,
709        property_name: &std::ffi::CStr,
710    ) -> Result<i64, TypedStoreError> {
711        match rocksdb.underlying.property_int_value_cf(cf, property_name) {
712            Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
713            Ok(None) => Ok(0),
714            Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
715        }
716    }
717
718    fn report_rocksdb_metrics(
719        database: &Arc<Database>,
720        cf_name: &str,
721        db_metrics: &Arc<DBMetrics>,
722    ) {
723        let Storage::Rocks(rocksdb) = &database.storage else {
724            return;
725        };
726
727        let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
728            tracing::warn!(
729                "unable to report metrics for cf {cf_name:?} in db {:?}",
730                database.db_name()
731            );
732            return;
733        };
734
735        db_metrics
736            .cf_metrics
737            .rocksdb_total_sst_files_size
738            .with_label_values(&[cf_name])
739            .set(
740                Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
741                    .unwrap_or(METRICS_ERROR),
742            );
743        db_metrics
744            .cf_metrics
745            .rocksdb_total_blob_files_size
746            .with_label_values(&[cf_name])
747            .set(
748                Self::get_rocksdb_int_property(
749                    rocksdb,
750                    &cf,
751                    ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
752                )
753                .unwrap_or(METRICS_ERROR),
754            );
755        let total_num_files: i64 = (0..=6)
758            .map(|level| {
759                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
760                    .unwrap_or(METRICS_ERROR)
761            })
762            .sum();
763        db_metrics
764            .cf_metrics
765            .rocksdb_total_num_files
766            .with_label_values(&[cf_name])
767            .set(total_num_files);
768        db_metrics
769            .cf_metrics
770            .rocksdb_num_level0_files
771            .with_label_values(&[cf_name])
772            .set(
773                Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
774                    .unwrap_or(METRICS_ERROR),
775            );
776        db_metrics
777            .cf_metrics
778            .rocksdb_current_size_active_mem_tables
779            .with_label_values(&[cf_name])
780            .set(
781                Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
782                    .unwrap_or(METRICS_ERROR),
783            );
784        db_metrics
785            .cf_metrics
786            .rocksdb_size_all_mem_tables
787            .with_label_values(&[cf_name])
788            .set(
789                Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
790                    .unwrap_or(METRICS_ERROR),
791            );
792        db_metrics
793            .cf_metrics
794            .rocksdb_num_snapshots
795            .with_label_values(&[cf_name])
796            .set(
797                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
798                    .unwrap_or(METRICS_ERROR),
799            );
800        db_metrics
801            .cf_metrics
802            .rocksdb_oldest_snapshot_time
803            .with_label_values(&[cf_name])
804            .set(
805                Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
806                    .unwrap_or(METRICS_ERROR),
807            );
808        db_metrics
809            .cf_metrics
810            .rocksdb_actual_delayed_write_rate
811            .with_label_values(&[cf_name])
812            .set(
813                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
814                    .unwrap_or(METRICS_ERROR),
815            );
816        db_metrics
817            .cf_metrics
818            .rocksdb_is_write_stopped
819            .with_label_values(&[cf_name])
820            .set(
821                Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
822                    .unwrap_or(METRICS_ERROR),
823            );
824        db_metrics
825            .cf_metrics
826            .rocksdb_block_cache_capacity
827            .with_label_values(&[cf_name])
828            .set(
829                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
830                    .unwrap_or(METRICS_ERROR),
831            );
832        db_metrics
833            .cf_metrics
834            .rocksdb_block_cache_usage
835            .with_label_values(&[cf_name])
836            .set(
837                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
838                    .unwrap_or(METRICS_ERROR),
839            );
840        db_metrics
841            .cf_metrics
842            .rocksdb_block_cache_pinned_usage
843            .with_label_values(&[cf_name])
844            .set(
845                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
846                    .unwrap_or(METRICS_ERROR),
847            );
848        db_metrics
849            .cf_metrics
850            .rocksdb_estimate_table_readers_mem
851            .with_label_values(&[cf_name])
852            .set(
853                Self::get_rocksdb_int_property(
854                    rocksdb,
855                    &cf,
856                    properties::ESTIMATE_TABLE_READERS_MEM,
857                )
858                .unwrap_or(METRICS_ERROR),
859            );
860        db_metrics
861            .cf_metrics
862            .rocksdb_estimated_num_keys
863            .with_label_values(&[cf_name])
864            .set(
865                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
866                    .unwrap_or(METRICS_ERROR),
867            );
868        db_metrics
869            .cf_metrics
870            .rocksdb_num_immutable_mem_tables
871            .with_label_values(&[cf_name])
872            .set(
873                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
874                    .unwrap_or(METRICS_ERROR),
875            );
876        db_metrics
877            .cf_metrics
878            .rocksdb_mem_table_flush_pending
879            .with_label_values(&[cf_name])
880            .set(
881                Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
882                    .unwrap_or(METRICS_ERROR),
883            );
884        db_metrics
885            .cf_metrics
886            .rocksdb_compaction_pending
887            .with_label_values(&[cf_name])
888            .set(
889                Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
890                    .unwrap_or(METRICS_ERROR),
891            );
892        db_metrics
893            .cf_metrics
894            .rocksdb_estimate_pending_compaction_bytes
895            .with_label_values(&[cf_name])
896            .set(
897                Self::get_rocksdb_int_property(
898                    rocksdb,
899                    &cf,
900                    properties::ESTIMATE_PENDING_COMPACTION_BYTES,
901                )
902                .unwrap_or(METRICS_ERROR),
903            );
904        db_metrics
905            .cf_metrics
906            .rocksdb_num_running_compactions
907            .with_label_values(&[cf_name])
908            .set(
909                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
910                    .unwrap_or(METRICS_ERROR),
911            );
912        db_metrics
913            .cf_metrics
914            .rocksdb_num_running_flushes
915            .with_label_values(&[cf_name])
916            .set(
917                Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
918                    .unwrap_or(METRICS_ERROR),
919            );
920        db_metrics
921            .cf_metrics
922            .rocksdb_estimate_oldest_key_time
923            .with_label_values(&[cf_name])
924            .set(
925                Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
926                    .unwrap_or(METRICS_ERROR),
927            );
928        db_metrics
929            .cf_metrics
930            .rocksdb_background_errors
931            .with_label_values(&[cf_name])
932            .set(
933                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
934                    .unwrap_or(METRICS_ERROR),
935            );
936        db_metrics
937            .cf_metrics
938            .rocksdb_base_level
939            .with_label_values(&[cf_name])
940            .set(
941                Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
942                    .unwrap_or(METRICS_ERROR),
943            );
944    }
945
946    pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
947        self.db.checkpoint(path)
948    }
949
950    pub fn table_summary(&self) -> eyre::Result<TableSummary>
951    where
952        K: Serialize + DeserializeOwned,
953        V: Serialize + DeserializeOwned,
954    {
955        let mut num_keys = 0;
956        let mut key_bytes_total = 0;
957        let mut value_bytes_total = 0;
958        let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
959        let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
960        for item in self.safe_iter() {
961            let (key, value) = item?;
962            num_keys += 1;
963            let key_len = be_fix_int_ser(key.borrow()).len();
964            let value_len = bcs::to_bytes(value.borrow())?.len();
965            key_bytes_total += key_len;
966            value_bytes_total += value_len;
967            key_hist.record(key_len as u64)?;
968            value_hist.record(value_len as u64)?;
969        }
970        Ok(TableSummary {
971            num_keys,
972            key_bytes_total,
973            value_bytes_total,
974            key_hist,
975            value_hist,
976        })
977    }
978
979    fn start_iter_timer(&self) -> HistogramTimer {
980        self.db_metrics
981            .op_metrics
982            .rocksdb_iter_latency_seconds
983            .with_label_values(&[&self.cf])
984            .start_timer()
985    }
986
987    fn create_iter_context(
989        &self,
990    ) -> (
991        Option<HistogramTimer>,
992        Option<Histogram>,
993        Option<Histogram>,
994        Option<RocksDBPerfContext>,
995    ) {
996        let timer = self.start_iter_timer();
997        let bytes_scanned = self
998            .db_metrics
999            .op_metrics
1000            .rocksdb_iter_bytes
1001            .with_label_values(&[&self.cf]);
1002        let keys_scanned = self
1003            .db_metrics
1004            .op_metrics
1005            .rocksdb_iter_keys
1006            .with_label_values(&[&self.cf]);
1007        let perf_ctx = if self.iter_sample_interval.sample() {
1008            Some(RocksDBPerfContext)
1009        } else {
1010            None
1011        };
1012        (
1013            Some(timer),
1014            Some(bytes_scanned),
1015            Some(keys_scanned),
1016            perf_ctx,
1017        )
1018    }
1019
1020    #[allow(clippy::complexity)]
1023    pub fn reversed_safe_iter_with_bounds(
1024        &self,
1025        lower_bound: Option<K>,
1026        upper_bound: Option<K>,
1027    ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1028    where
1029        K: Serialize + DeserializeOwned,
1030        V: Serialize + DeserializeOwned,
1031    {
1032        let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1033            lower_bound
1034                .as_ref()
1035                .map(Bound::Included)
1036                .unwrap_or(Bound::Unbounded),
1037            upper_bound
1038                .as_ref()
1039                .map(Bound::Included)
1040                .unwrap_or(Bound::Unbounded),
1041        ));
1042        match &self.db.storage {
1043            Storage::Rocks(db) => {
1044                let readopts = rocks_util::apply_range_bounds(
1045                    self.opts.readopts(),
1046                    it_lower_bound,
1047                    it_upper_bound,
1048                );
1049                let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1050                let db_iter = db
1051                    .underlying
1052                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1053                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1054                let iter = SafeIter::new(
1055                    self.cf.clone(),
1056                    db_iter,
1057                    _timer,
1058                    _perf_ctx,
1059                    bytes_scanned,
1060                    keys_scanned,
1061                    Some(self.db_metrics.clone()),
1062                );
1063                Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1064            }
1065            Storage::InMemory(db) => {
1066                Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1067            }
1068            #[cfg(tidehunter)]
1069            Storage::TideHunter(db) => match &self.column_family {
1070                ColumnFamily::TideHunter((ks, prefix)) => {
1071                    let mut iter = db.iterator(*ks);
1072                    apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1073                    iter.reverse();
1074                    Ok(Box::new(transform_th_iterator(
1075                        iter,
1076                        prefix,
1077                        self.start_iter_timer(),
1078                    )))
1079                }
1080                _ => unreachable!("storage backend invariant violation"),
1081            },
1082        }
1083    }
1084}
1085
1086pub enum StorageWriteBatch {
1087    Rocks(rocksdb::WriteBatch),
1088    InMemory(InMemoryBatch),
1089    #[cfg(tidehunter)]
1090    TideHunter(tidehunter::batch::WriteBatch),
1091}
1092
1093pub struct DBBatch {
1145    database: Arc<Database>,
1146    batch: StorageWriteBatch,
1147    db_metrics: Arc<DBMetrics>,
1148    write_sample_interval: SamplingInterval,
1149}
1150
1151impl DBBatch {
1152    pub fn new(
1156        dbref: &Arc<Database>,
1157        batch: StorageWriteBatch,
1158        db_metrics: &Arc<DBMetrics>,
1159        write_sample_interval: &SamplingInterval,
1160    ) -> Self {
1161        DBBatch {
1162            database: dbref.clone(),
1163            batch,
1164            db_metrics: db_metrics.clone(),
1165            write_sample_interval: write_sample_interval.clone(),
1166        }
1167    }
1168
1169    #[instrument(level = "trace", skip_all, err)]
1171    pub fn write(self) -> Result<(), TypedStoreError> {
1172        self.write_opt(&rocksdb::WriteOptions::default())
1173    }
1174
1175    #[instrument(level = "trace", skip_all, err)]
1177    pub fn write_opt(self, write_options: &rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1178        let db_name = self.database.db_name();
1179        let timer = self
1180            .db_metrics
1181            .op_metrics
1182            .rocksdb_batch_commit_latency_seconds
1183            .with_label_values(&[&db_name])
1184            .start_timer();
1185        let batch_size = self.size_in_bytes();
1186
1187        let perf_ctx = if self.write_sample_interval.sample() {
1188            Some(RocksDBPerfContext)
1189        } else {
1190            None
1191        };
1192        self.database.write_opt(self.batch, write_options)?;
1193        self.db_metrics
1194            .op_metrics
1195            .rocksdb_batch_commit_bytes
1196            .with_label_values(&[&db_name])
1197            .observe(batch_size as f64);
1198
1199        if perf_ctx.is_some() {
1200            self.db_metrics
1201                .write_perf_ctx_metrics
1202                .report_metrics(&db_name);
1203        }
1204        let elapsed = timer.stop_and_record();
1205        if elapsed > 1.0 {
1206            warn!(?elapsed, ?db_name, "very slow batch write");
1207            self.db_metrics
1208                .op_metrics
1209                .rocksdb_very_slow_batch_writes_count
1210                .with_label_values(&[&db_name])
1211                .inc();
1212            self.db_metrics
1213                .op_metrics
1214                .rocksdb_very_slow_batch_writes_duration_ms
1215                .with_label_values(&[&db_name])
1216                .inc_by((elapsed * 1000.0) as u64);
1217        }
1218        Ok(())
1219    }
1220
1221    pub fn size_in_bytes(&self) -> usize {
1222        match self.batch {
1223            StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1224            StorageWriteBatch::InMemory(_) => 0,
1225            #[cfg(tidehunter)]
1227            StorageWriteBatch::TideHunter(_) => 0,
1228        }
1229    }
1230
1231    pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1232        &mut self,
1233        db: &DBMap<K, V>,
1234        purged_vals: impl IntoIterator<Item = J>,
1235    ) -> Result<(), TypedStoreError> {
1236        if !Arc::ptr_eq(&db.db, &self.database) {
1237            return Err(TypedStoreError::CrossDBBatch);
1238        }
1239
1240        purged_vals
1241            .into_iter()
1242            .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1243                let k_buf = be_fix_int_ser(k.borrow());
1244                match (&mut self.batch, &db.column_family) {
1245                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1246                        b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1247                    }
1248                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1249                        b.delete_cf(name, k_buf)
1250                    }
1251                    #[cfg(tidehunter)]
1252                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1253                        b.delete(*ks, transform_th_key(&k_buf, prefix))
1254                    }
1255                    _ => Err(TypedStoreError::RocksDBError(
1256                        "typed store invariant violation".to_string(),
1257                    ))?,
1258                }
1259                Ok(())
1260            })?;
1261        Ok(())
1262    }
1263
1264    pub fn schedule_delete_range<K: Serialize, V>(
1274        &mut self,
1275        db: &DBMap<K, V>,
1276        from: &K,
1277        to: &K,
1278    ) -> Result<(), TypedStoreError> {
1279        if !Arc::ptr_eq(&db.db, &self.database) {
1280            return Err(TypedStoreError::CrossDBBatch);
1281        }
1282
1283        let from_buf = be_fix_int_ser(from);
1284        let to_buf = be_fix_int_ser(to);
1285
1286        if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1287            b.delete_range_cf(
1288                &rocks_cf_from_db(&self.database, db.cf_name())?,
1289                from_buf,
1290                to_buf,
1291            );
1292        }
1293        Ok(())
1294    }
1295
1296    pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1298        &mut self,
1299        db: &DBMap<K, V>,
1300        new_vals: impl IntoIterator<Item = (J, U)>,
1301    ) -> Result<&mut Self, TypedStoreError> {
1302        if !Arc::ptr_eq(&db.db, &self.database) {
1303            return Err(TypedStoreError::CrossDBBatch);
1304        }
1305        let mut total = 0usize;
1306        new_vals
1307            .into_iter()
1308            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1309                let k_buf = be_fix_int_ser(k.borrow());
1310                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1311                total += k_buf.len() + v_buf.len();
1312                if db.opts.log_value_hash {
1313                    let key_hash = default_hash(&k_buf);
1314                    let value_hash = default_hash(&v_buf);
1315                    debug!(
1316                        "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1317                        db.cf_name(),
1318                        key_hash,
1319                        value_hash
1320                    );
1321                }
1322                match (&mut self.batch, &db.column_family) {
1323                    (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1324                        b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1325                    }
1326                    (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1327                        b.put_cf(name, k_buf, v_buf)
1328                    }
1329                    #[cfg(tidehunter)]
1330                    (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1331                        b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1332                    }
1333                    _ => Err(TypedStoreError::RocksDBError(
1334                        "typed store invariant violation".to_string(),
1335                    ))?,
1336                }
1337                Ok(())
1338            })?;
1339        self.db_metrics
1340            .op_metrics
1341            .rocksdb_batch_put_bytes
1342            .with_label_values(&[&db.cf])
1343            .observe(total as f64);
1344        Ok(self)
1345    }
1346
1347    pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1348        &mut self,
1349        db: &DBMap<K, V>,
1350        new_vals: impl IntoIterator<Item = (J, U)>,
1351    ) -> Result<&mut Self, TypedStoreError> {
1352        if !Arc::ptr_eq(&db.db, &self.database) {
1353            return Err(TypedStoreError::CrossDBBatch);
1354        }
1355        new_vals
1356            .into_iter()
1357            .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1358                let k_buf = be_fix_int_ser(k.borrow());
1359                let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1360                match &mut self.batch {
1361                    StorageWriteBatch::Rocks(b) => b.merge_cf(
1362                        &rocks_cf_from_db(&self.database, db.cf_name())?,
1363                        k_buf,
1364                        v_buf,
1365                    ),
1366                    _ => unimplemented!("merge operator is only implemented for RocksDB"),
1367                }
1368                Ok(())
1369            })?;
1370        Ok(self)
1371    }
1372}
1373
1374impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1375where
1376    K: Serialize + DeserializeOwned,
1377    V: Serialize + DeserializeOwned,
1378{
1379    type Error = TypedStoreError;
1380
1381    #[instrument(level = "trace", skip_all, err)]
1382    fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1383        let key_buf = be_fix_int_ser(key);
1384        let readopts = self.opts.readopts();
1385        Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1386            && self
1387                .db
1388                .get(&self.column_family, &key_buf, &readopts)?
1389                .is_some())
1390    }
1391
1392    #[instrument(level = "trace", skip_all, err)]
1393    fn multi_contains_keys<J>(
1394        &self,
1395        keys: impl IntoIterator<Item = J>,
1396    ) -> Result<Vec<bool>, Self::Error>
1397    where
1398        J: Borrow<K>,
1399    {
1400        let values = self.multi_get_pinned(keys)?;
1401        Ok(values.into_iter().map(|v| v.is_some()).collect())
1402    }
1403
1404    #[instrument(level = "trace", skip_all, err)]
1405    fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1406        let _timer = self
1407            .db_metrics
1408            .op_metrics
1409            .rocksdb_get_latency_seconds
1410            .with_label_values(&[&self.cf])
1411            .start_timer();
1412        let perf_ctx = if self.get_sample_interval.sample() {
1413            Some(RocksDBPerfContext)
1414        } else {
1415            None
1416        };
1417        let key_buf = be_fix_int_ser(key);
1418        let res = self
1419            .db
1420            .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1421        self.db_metrics
1422            .op_metrics
1423            .rocksdb_get_bytes
1424            .with_label_values(&[&self.cf])
1425            .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1426        if perf_ctx.is_some() {
1427            self.db_metrics
1428                .read_perf_ctx_metrics
1429                .report_metrics(&self.cf);
1430        }
1431        match res {
1432            Some(data) => {
1433                let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1434                if value.is_err() {
1435                    let key_hash = default_hash(&key_buf);
1436                    let value_hash = default_hash(&data);
1437                    debug_fatal!(
1438                        "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1439                        self.cf_name(),
1440                        key_hash,
1441                        value_hash,
1442                        value.as_ref().err().unwrap()
1443                    );
1444                }
1445                Ok(Some(value?))
1446            }
1447            None => Ok(None),
1448        }
1449    }
1450
1451    #[instrument(level = "trace", skip_all, err)]
1452    fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1453        let timer = self
1454            .db_metrics
1455            .op_metrics
1456            .rocksdb_put_latency_seconds
1457            .with_label_values(&[&self.cf])
1458            .start_timer();
1459        let perf_ctx = if self.write_sample_interval.sample() {
1460            Some(RocksDBPerfContext)
1461        } else {
1462            None
1463        };
1464        let key_buf = be_fix_int_ser(key);
1465        let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1466        self.db_metrics
1467            .op_metrics
1468            .rocksdb_put_bytes
1469            .with_label_values(&[&self.cf])
1470            .observe((key_buf.len() + value_buf.len()) as f64);
1471        if perf_ctx.is_some() {
1472            self.db_metrics
1473                .write_perf_ctx_metrics
1474                .report_metrics(&self.cf);
1475        }
1476        self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1477
1478        let elapsed = timer.stop_and_record();
1479        if elapsed > 1.0 {
1480            warn!(?elapsed, cf = ?self.cf, "very slow insert");
1481            self.db_metrics
1482                .op_metrics
1483                .rocksdb_very_slow_puts_count
1484                .with_label_values(&[&self.cf])
1485                .inc();
1486            self.db_metrics
1487                .op_metrics
1488                .rocksdb_very_slow_puts_duration_ms
1489                .with_label_values(&[&self.cf])
1490                .inc_by((elapsed * 1000.0) as u64);
1491        }
1492
1493        Ok(())
1494    }
1495
1496    #[instrument(level = "trace", skip_all, err)]
1497    fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1498        let _timer = self
1499            .db_metrics
1500            .op_metrics
1501            .rocksdb_delete_latency_seconds
1502            .with_label_values(&[&self.cf])
1503            .start_timer();
1504        let perf_ctx = if self.write_sample_interval.sample() {
1505            Some(RocksDBPerfContext)
1506        } else {
1507            None
1508        };
1509        let key_buf = be_fix_int_ser(key);
1510        self.db.delete_cf(&self.column_family, key_buf)?;
1511        self.db_metrics
1512            .op_metrics
1513            .rocksdb_deletes
1514            .with_label_values(&[&self.cf])
1515            .inc();
1516        if perf_ctx.is_some() {
1517            self.db_metrics
1518                .write_perf_ctx_metrics
1519                .report_metrics(&self.cf);
1520        }
1521        Ok(())
1522    }
1523
1524    #[instrument(level = "trace", skip_all, err)]
1533    fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1534        let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1535        let last_key = self
1536            .reversed_safe_iter_with_bounds(None, None)?
1537            .next()
1538            .transpose()?
1539            .map(|(k, _v)| k);
1540        if let Some((first_key, last_key)) = first_key.zip(last_key) {
1541            let mut batch = self.batch();
1542            batch.schedule_delete_range(self, &first_key, &last_key)?;
1543            batch.write()?;
1544        }
1545        Ok(())
1546    }
1547
1548    fn is_empty(&self) -> bool {
1549        self.safe_iter().next().is_none()
1550    }
1551
1552    fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1553        match &self.db.storage {
1554            Storage::Rocks(db) => {
1555                let db_iter = db
1556                    .underlying
1557                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1558                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1559                Box::new(SafeIter::new(
1560                    self.cf.clone(),
1561                    db_iter,
1562                    _timer,
1563                    _perf_ctx,
1564                    bytes_scanned,
1565                    keys_scanned,
1566                    Some(self.db_metrics.clone()),
1567                ))
1568            }
1569            Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1570            #[cfg(tidehunter)]
1571            Storage::TideHunter(db) => match &self.column_family {
1572                ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1573                    db.iterator(*ks),
1574                    prefix,
1575                    self.start_iter_timer(),
1576                )),
1577                _ => unreachable!("storage backend invariant violation"),
1578            },
1579        }
1580    }
1581
1582    fn safe_iter_with_bounds(
1583        &'a self,
1584        lower_bound: Option<K>,
1585        upper_bound: Option<K>,
1586    ) -> DbIterator<'a, (K, V)> {
1587        let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1588        match &self.db.storage {
1589            Storage::Rocks(db) => {
1590                let readopts =
1591                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1592                let db_iter = db
1593                    .underlying
1594                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1595                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1596                Box::new(SafeIter::new(
1597                    self.cf.clone(),
1598                    db_iter,
1599                    _timer,
1600                    _perf_ctx,
1601                    bytes_scanned,
1602                    keys_scanned,
1603                    Some(self.db_metrics.clone()),
1604                ))
1605            }
1606            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1607            #[cfg(tidehunter)]
1608            Storage::TideHunter(db) => match &self.column_family {
1609                ColumnFamily::TideHunter((ks, prefix)) => {
1610                    let mut iter = db.iterator(*ks);
1611                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1612                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1613                }
1614                _ => unreachable!("storage backend invariant violation"),
1615            },
1616        }
1617    }
1618
1619    fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1620        let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1621        match &self.db.storage {
1622            Storage::Rocks(db) => {
1623                let readopts =
1624                    rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1625                let db_iter = db
1626                    .underlying
1627                    .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1628                let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1629                Box::new(SafeIter::new(
1630                    self.cf.clone(),
1631                    db_iter,
1632                    _timer,
1633                    _perf_ctx,
1634                    bytes_scanned,
1635                    keys_scanned,
1636                    Some(self.db_metrics.clone()),
1637                ))
1638            }
1639            Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1640            #[cfg(tidehunter)]
1641            Storage::TideHunter(db) => match &self.column_family {
1642                ColumnFamily::TideHunter((ks, prefix)) => {
1643                    let mut iter = db.iterator(*ks);
1644                    apply_range_bounds(&mut iter, lower_bound, upper_bound);
1645                    Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1646                }
1647                _ => unreachable!("storage backend invariant violation"),
1648            },
1649        }
1650    }
1651
1652    #[instrument(level = "trace", skip_all, err)]
1654    fn multi_get<J>(
1655        &self,
1656        keys: impl IntoIterator<Item = J>,
1657    ) -> Result<Vec<Option<V>>, TypedStoreError>
1658    where
1659        J: Borrow<K>,
1660    {
1661        let results = self.multi_get_pinned(keys)?;
1662        let values_parsed: Result<Vec<_>, TypedStoreError> = results
1663            .into_iter()
1664            .map(|value_byte| match value_byte {
1665                Some(data) => Ok(Some(
1666                    bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1667                )),
1668                None => Ok(None),
1669            })
1670            .collect();
1671
1672        values_parsed
1673    }
1674
1675    #[instrument(level = "trace", skip_all, err)]
1677    fn multi_insert<J, U>(
1678        &self,
1679        key_val_pairs: impl IntoIterator<Item = (J, U)>,
1680    ) -> Result<(), Self::Error>
1681    where
1682        J: Borrow<K>,
1683        U: Borrow<V>,
1684    {
1685        let mut batch = self.batch();
1686        batch.insert_batch(self, key_val_pairs)?;
1687        batch.write()
1688    }
1689
1690    #[instrument(level = "trace", skip_all, err)]
1692    fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1693    where
1694        J: Borrow<K>,
1695    {
1696        let mut batch = self.batch();
1697        batch.delete_batch(self, keys)?;
1698        batch.write()
1699    }
1700
1701    #[instrument(level = "trace", skip_all, err)]
1703    fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1704        if let Storage::Rocks(rocks) = &self.db.storage {
1705            rocks
1706                .underlying
1707                .try_catch_up_with_primary()
1708                .map_err(typed_store_err_from_rocks_err)?;
1709        }
1710        Ok(())
1711    }
1712}
1713
1714#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1716pub fn open_cf_opts<P: AsRef<Path>>(
1717    path: P,
1718    db_options: Option<rocksdb::Options>,
1719    metric_conf: MetricConf,
1720    opt_cfs: &[(&str, rocksdb::Options)],
1721) -> Result<Arc<Database>, TypedStoreError> {
1722    let path = path.as_ref();
1723    let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1732    nondeterministic!({
1733        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1734        options.create_if_missing(true);
1735        options.create_missing_column_families(true);
1736        let rocksdb = {
1737            rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1738                &options,
1739                path,
1740                cfs.into_iter()
1741                    .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1742            )
1743            .map_err(typed_store_err_from_rocks_err)?
1744        };
1745        Ok(Arc::new(Database::new(
1746            Storage::Rocks(RocksDB {
1747                underlying: rocksdb,
1748            }),
1749            metric_conf,
1750        )))
1751    })
1752}
1753
1754pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1756    primary_path: P,
1757    secondary_path: Option<P>,
1758    db_options: Option<rocksdb::Options>,
1759    metric_conf: MetricConf,
1760    opt_cfs: &[(&str, rocksdb::Options)],
1761) -> Result<Arc<Database>, TypedStoreError> {
1762    let primary_path = primary_path.as_ref();
1763    let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1764    nondeterministic!({
1766        let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1768
1769        fdlimit::raise_fd_limit();
1770        options.set_max_open_files(-1);
1772
1773        let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1774        let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1775            .ok()
1776            .unwrap_or_default();
1777
1778        let default_db_options = default_db_options();
1779        for cf_key in cfs.iter() {
1781            if !opt_cfs.contains_key(&cf_key[..]) {
1782                opt_cfs.insert(cf_key, default_db_options.options.clone());
1783            }
1784        }
1785
1786        let primary_path = primary_path.to_path_buf();
1787        let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1788            let mut s = primary_path.clone();
1789            s.pop();
1790            s.push("SECONDARY");
1791            s.as_path().to_path_buf()
1792        });
1793
1794        let rocksdb = {
1795            options.create_if_missing(true);
1796            options.create_missing_column_families(true);
1797            let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1798                &options,
1799                &primary_path,
1800                &secondary_path,
1801                opt_cfs
1802                    .iter()
1803                    .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1804            )
1805            .map_err(typed_store_err_from_rocks_err)?;
1806            db.try_catch_up_with_primary()
1807                .map_err(typed_store_err_from_rocks_err)?;
1808            db
1809        };
1810        Ok(Arc::new(Database::new(
1811            Storage::Rocks(RocksDB {
1812                underlying: rocksdb,
1813            }),
1814            metric_conf,
1815        )))
1816    })
1817}
1818
1819#[cfg(not(tidehunter))]
1821pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1822    let mut backoff = backoff::ExponentialBackoff {
1823        max_elapsed_time: Some(timeout),
1824        ..Default::default()
1825    };
1826    loop {
1827        match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1828            Ok(()) => return Ok(()),
1829            Err(err) => match backoff.next_backoff() {
1830                Some(duration) => tokio::time::sleep(duration).await,
1831                None => return Err(err),
1832            },
1833        }
1834    }
1835}
1836
1837#[cfg(tidehunter)]
1838pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1839    std::fs::remove_dir_all(path)
1840}
1841
1842fn populate_missing_cfs(
1843    input_cfs: &[(&str, rocksdb::Options)],
1844    path: &Path,
1845) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1846    let mut cfs = vec![];
1847    let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1848    let existing_cfs =
1849        rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1850            .ok()
1851            .unwrap_or_default();
1852
1853    for cf_name in existing_cfs {
1854        if !input_cf_index.contains(&cf_name[..]) {
1855            cfs.push((cf_name, rocksdb::Options::default()));
1856        }
1857    }
1858    cfs.extend(
1859        input_cfs
1860            .iter()
1861            .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1862    );
1863    Ok(cfs)
1864}
1865
1866fn default_hash(value: &[u8]) -> Digest<32> {
1867    let mut hasher = fastcrypto::hash::Blake2b256::default();
1868    hasher.update(value);
1869    hasher.finalize()
1870}