1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn contains(
234 &self,
235 cf: &ColumnFamily,
236 key: &[u8],
237 readopts: &ReadOptions,
238 ) -> Result<bool, TypedStoreError> {
239 match (&self.storage, cf) {
240 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
241 let rocks_cf = cf.rocks_cf(db);
242 Ok(db.underlying.key_may_exist_cf_opt(&rocks_cf, key, readopts)
246 && db
247 .underlying
248 .get_pinned_cf_opt(&rocks_cf, key, readopts)
249 .map_err(typed_store_err_from_rocks_err)?
250 .is_some())
251 }
252 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
253 Ok(db.get(cf_name, key).is_some())
254 }
255 #[cfg(tidehunter)]
256 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
257 .exists(*ks, &transform_th_key(key, prefix))
258 .map_err(typed_store_error_from_th_error),
259 _ => Err(TypedStoreError::RocksDBError(
260 "typed store invariant violation".to_string(),
261 )),
262 }
263 }
264
265 fn multi_contains<I, K>(
269 &self,
270 cf: &ColumnFamily,
271 keys: I,
272 readopts: &ReadOptions,
273 ) -> Result<Vec<bool>, TypedStoreError>
274 where
275 I: IntoIterator<Item = K>,
276 K: AsRef<[u8]>,
277 {
278 match (&self.storage, cf) {
279 #[cfg(tidehunter)]
280 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
281 .into_iter()
282 .map(|k| {
283 db.exists(*ks, &transform_th_key(k.as_ref(), prefix))
284 .map_err(typed_store_error_from_th_error)
285 })
286 .collect(),
287 _ => self
288 .multi_get(cf, keys, readopts)
289 .into_iter()
290 .map(|r| r.map(|v| v.is_some()))
291 .collect(),
292 }
293 }
294
295 fn multi_get<I, K>(
296 &self,
297 cf: &ColumnFamily,
298 keys: I,
299 readopts: &ReadOptions,
300 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
301 where
302 I: IntoIterator<Item = K>,
303 K: AsRef<[u8]>,
304 {
305 match (&self.storage, cf) {
306 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
307 let keys_vec: Vec<K> = keys.into_iter().collect();
308 let res = db.underlying.batched_multi_get_cf_opt(
309 &cf.rocks_cf(db),
310 keys_vec.iter(),
311 false,
312 readopts,
313 );
314 res.into_iter()
315 .map(|r| {
316 r.map_err(typed_store_err_from_rocks_err)
317 .map(|item| item.map(GetResult::Rocks))
318 })
319 .collect()
320 }
321 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
322 .multi_get(cf_name, keys)
323 .into_iter()
324 .map(|r| Ok(r.map(GetResult::InMemory)))
325 .collect(),
326 #[cfg(tidehunter)]
327 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
328 .into_iter()
329 .map(|k| {
330 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
331 .map_err(typed_store_error_from_th_error)
332 .map(|item| item.map(|bytes| GetResult::TideHunter(bytes.into_owned())))
333 })
334 .collect(),
335 _ => unreachable!("typed store invariant violation"),
336 }
337 }
338
339 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
340 match &self.storage {
341 Storage::Rocks(db) => db.underlying.drop_cf(name),
342 Storage::InMemory(db) => {
343 db.drop_cf(name);
344 Ok(())
345 }
346 #[cfg(tidehunter)]
347 Storage::TideHunter(_) => {
348 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
349 }
350 }
351 }
352
353 pub fn delete_file_in_range<K: AsRef<[u8]>>(
354 &self,
355 cf: &impl AsColumnFamilyRef,
356 from: K,
357 to: K,
358 ) -> Result<(), rocksdb::Error> {
359 match &self.storage {
360 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
361 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
362 }
363 }
364
365 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
366 fail_point!("delete-cf-before");
367 let ret = match (&self.storage, cf) {
368 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
369 .underlying
370 .delete_cf(&cf.rocks_cf(db), key)
371 .map_err(typed_store_err_from_rocks_err),
372 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
373 db.delete(cf_name, key.as_ref());
374 Ok(())
375 }
376 #[cfg(tidehunter)]
377 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
378 .remove(*ks, transform_th_key(key.as_ref(), prefix))
379 .map_err(typed_store_error_from_th_error),
380 _ => Err(TypedStoreError::RocksDBError(
381 "typed store invariant violation".to_string(),
382 )),
383 };
384 fail_point!("delete-cf-after");
385 #[allow(clippy::let_and_return)]
386 ret
387 }
388
389 pub fn path_for_pruning(&self) -> &Path {
390 match &self.storage {
391 Storage::Rocks(rocks) => rocks.underlying.path(),
392 _ => unimplemented!("method is only supported for rocksdb backend"),
393 }
394 }
395
396 fn put_cf(
397 &self,
398 cf: &ColumnFamily,
399 key: Vec<u8>,
400 value: Vec<u8>,
401 ) -> Result<(), TypedStoreError> {
402 fail_point!("put-cf-before");
403 let ret = match (&self.storage, cf) {
404 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
405 .underlying
406 .put_cf(&cf.rocks_cf(db), key, value)
407 .map_err(typed_store_err_from_rocks_err),
408 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
409 db.put(cf_name, key, value);
410 Ok(())
411 }
412 #[cfg(tidehunter)]
413 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
414 .insert(*ks, transform_th_key(&key, prefix), value)
415 .map_err(typed_store_error_from_th_error),
416 _ => Err(TypedStoreError::RocksDBError(
417 "typed store invariant violation".to_string(),
418 )),
419 };
420 fail_point!("put-cf-after");
421 #[allow(clippy::let_and_return)]
422 ret
423 }
424
425 pub(crate) fn write_opt_internal(
426 &self,
427 batch: StorageWriteBatch,
428 write_options: &rocksdb::WriteOptions,
429 ) -> Result<(), TypedStoreError> {
430 fail_point!("batch-write-before");
431 let ret = match (&self.storage, batch) {
432 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
433 .underlying
434 .write_opt(batch, write_options)
435 .map_err(typed_store_err_from_rocks_err),
436 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
437 db.write(batch);
439 Ok(())
440 }
441 #[cfg(tidehunter)]
442 (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
443 batch.commit().map_err(typed_store_error_from_th_error)
445 }
446 _ => Err(TypedStoreError::RocksDBError(
447 "using invalid batch type for the database".to_string(),
448 )),
449 };
450 fail_point!("batch-write-after");
451 #[allow(clippy::let_and_return)]
452 ret
453 }
454
455 #[cfg(tidehunter)]
456 pub fn start_relocation(&self) -> anyhow::Result<()> {
457 if let Storage::TideHunter(db) = &self.storage {
458 db.start_relocation()?;
459 }
460 Ok(())
461 }
462
463 #[cfg(tidehunter)]
464 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
465 if let Storage::TideHunter(db) = &self.storage {
466 db.force_rebuild_control_region()
467 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
468 }
469 Ok(())
470 }
471
472 #[cfg(tidehunter)]
479 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
480 let strong = Arc::strong_count(&self);
481 if strong != 1 {
482 println!(
483 "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
484 strong,
485 );
486 }
487 let Storage::TideHunter(th_arc) = &self.storage else {
488 return;
489 };
490 let th_arc = th_arc.clone();
491 drop(self);
492 th_arc.wait_for_background_threads_to_finish();
493 }
494
495 #[cfg(tidehunter)]
496 pub fn drop_cells_in_range(
497 &self,
498 ks: KeySpace,
499 from_inclusive: &[u8],
500 to_inclusive: &[u8],
501 ) -> anyhow::Result<()> {
502 if let Storage::TideHunter(db) = &self.storage {
503 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
504 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
505 } else {
506 panic!("drop_cells_in_range called on non-TideHunter storage");
507 }
508 Ok(())
509 }
510
511 pub fn compact_range_cf<K: AsRef<[u8]>>(
512 &self,
513 cf_name: &str,
514 start: Option<K>,
515 end: Option<K>,
516 ) {
517 if let Storage::Rocks(rocksdb) = &self.storage {
518 rocksdb
519 .underlying
520 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
521 }
522 }
523
524 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
525 if let Storage::Rocks(rocks) = &self.storage {
527 let checkpoint =
528 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
529 checkpoint
530 .create_checkpoint(path)
531 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
532 }
533 Ok(())
534 }
535
536 pub fn get_sampling_interval(&self) -> SamplingInterval {
537 self.metric_conf.read_sample_interval.new_from_self()
538 }
539
540 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
541 self.metric_conf.read_sample_interval.new_from_self()
542 }
543
544 pub fn write_sampling_interval(&self) -> SamplingInterval {
545 self.metric_conf.write_sample_interval.new_from_self()
546 }
547
548 pub fn iter_sampling_interval(&self) -> SamplingInterval {
549 self.metric_conf.iter_sample_interval.new_from_self()
550 }
551
552 fn db_name(&self) -> String {
553 let name = &self.metric_conf.db_name;
554 if name.is_empty() {
555 "default".to_string()
556 } else {
557 name.clone()
558 }
559 }
560
561 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
562 match &self.storage {
563 Storage::Rocks(rocks) => rocks.underlying.live_files(),
564 _ => Ok(vec![]),
565 }
566 }
567}
568
569fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
570 rocks_db
571 .underlying
572 .cf_handle(cf_name)
573 .expect("Map-keying column family should have been checked at DB creation")
574}
575
576fn rocks_cf_from_db<'a>(
577 db: &'a Database,
578 cf_name: &str,
579) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
580 match &db.storage {
581 Storage::Rocks(rocksdb) => Ok(rocksdb
582 .underlying
583 .cf_handle(cf_name)
584 .expect("Map-keying column family should have been checked at DB creation")),
585 _ => Err(TypedStoreError::RocksDBError(
586 "using invalid batch type for the database".to_string(),
587 )),
588 }
589}
590
591#[derive(Debug, Default)]
592pub struct MetricConf {
593 pub db_name: String,
594 pub read_sample_interval: SamplingInterval,
595 pub write_sample_interval: SamplingInterval,
596 pub iter_sample_interval: SamplingInterval,
597 pub enable_th_batch_compression: bool,
600}
601
602impl MetricConf {
603 pub fn new(db_name: &str) -> Self {
604 if db_name.is_empty() {
605 error!("A meaningful db name should be used for metrics reporting.")
606 }
607 Self {
608 db_name: db_name.to_string(),
609 read_sample_interval: SamplingInterval::default(),
610 write_sample_interval: SamplingInterval::default(),
611 iter_sample_interval: SamplingInterval::default(),
612 enable_th_batch_compression: false,
613 }
614 }
615
616 pub fn with_sampling(mut self, read_interval: SamplingInterval) -> Self {
617 self.read_sample_interval = read_interval;
618 self
619 }
620
621 pub fn with_th_batch_compression(mut self) -> Self {
622 self.enable_th_batch_compression = true;
623 self
624 }
625}
626const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
627const METRICS_ERROR: i64 = -1;
628
629#[derive(Clone, Debug)]
631pub struct DBMap<K, V> {
632 pub db: Arc<Database>,
633 _phantom: PhantomData<fn(K) -> V>,
634 column_family: ColumnFamily,
635 cf: String,
637 pub opts: ReadWriteOptions,
638 db_metrics: Arc<DBMetrics>,
639 get_sample_interval: SamplingInterval,
640 multiget_sample_interval: SamplingInterval,
641 write_sample_interval: SamplingInterval,
642 iter_sample_interval: SamplingInterval,
643 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
644}
645
646unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
647
648impl<K, V> DBMap<K, V> {
649 pub(crate) fn new(
650 db: Arc<Database>,
651 opts: &ReadWriteOptions,
652 opt_cf: &str,
653 column_family: ColumnFamily,
654 is_deprecated: bool,
655 ) -> Self {
656 let db_cloned = Arc::downgrade(&db.clone());
657 let db_metrics = DBMetrics::get();
658 let db_metrics_cloned = db_metrics.clone();
659 let cf = opt_cf.to_string();
660
661 let (sender, mut recv) = tokio::sync::oneshot::channel();
662 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
663 tokio::task::spawn(async move {
664 let mut interval =
665 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
666 loop {
667 tokio::select! {
668 _ = interval.tick() => {
669 if let Some(db) = db_cloned.upgrade() {
670 let cf = cf.clone();
671 let db_metrics = db_metrics.clone();
672 if let Err(e) = tokio::task::spawn_blocking(move || {
673 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
674 }).await {
675 error!("Failed to log metrics with error: {}", e);
676 }
677 }
678 }
679 _ = &mut recv => break,
680 }
681 }
682 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
683 });
684 }
685 DBMap {
686 db: db.clone(),
687 opts: opts.clone(),
688 _phantom: PhantomData,
689 column_family,
690 cf: opt_cf.to_string(),
691 db_metrics: db_metrics_cloned,
692 _metrics_task_cancel_handle: Arc::new(sender),
693 get_sample_interval: db.get_sampling_interval(),
694 multiget_sample_interval: db.multiget_sampling_interval(),
695 write_sample_interval: db.write_sampling_interval(),
696 iter_sample_interval: db.iter_sampling_interval(),
697 }
698 }
699
700 #[instrument(level = "debug", skip(db), err)]
703 pub fn reopen(
704 db: &Arc<Database>,
705 opt_cf: Option<&str>,
706 rw_options: &ReadWriteOptions,
707 is_deprecated: bool,
708 ) -> Result<Self, TypedStoreError> {
709 let cf_key = opt_cf
710 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
711 .to_owned();
712 Ok(DBMap::new(
713 db.clone(),
714 rw_options,
715 &cf_key,
716 ColumnFamily::Rocks(cf_key.to_string()),
717 is_deprecated,
718 ))
719 }
720
721 #[cfg(tidehunter)]
722 pub fn reopen_th(
723 db: Arc<Database>,
724 cf_name: &str,
725 ks: KeySpace,
726 prefix: Option<Vec<u8>>,
727 ) -> Self {
728 DBMap::new(
729 db,
730 &ReadWriteOptions::default(),
731 cf_name,
732 ColumnFamily::TideHunter((ks, prefix.clone())),
733 false,
734 )
735 }
736
737 pub fn cf_name(&self) -> &str {
738 &self.cf
739 }
740
741 pub fn batch(&self) -> DBBatch {
742 let batch = match &self.db.storage {
743 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
744 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
745 #[cfg(tidehunter)]
746 Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
747 };
748 DBBatch::new(
749 &self.db,
750 batch,
751 &self.db_metrics,
752 &self.write_sample_interval,
753 )
754 }
755
756 pub fn flush(&self) -> Result<(), TypedStoreError> {
757 self.db.flush()
758 }
759
760 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
761 let from_buf = be_fix_int_ser(start);
762 let to_buf = be_fix_int_ser(end);
763 self.db
764 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
765 Ok(())
766 }
767
768 pub fn compact_range_raw(
769 &self,
770 cf_name: &str,
771 start: Vec<u8>,
772 end: Vec<u8>,
773 ) -> Result<(), TypedStoreError> {
774 self.db.compact_range_cf(cf_name, Some(start), Some(end));
775 Ok(())
776 }
777
778 #[cfg(tidehunter)]
779 pub fn drop_cells_in_range<J: Serialize>(
780 &self,
781 from_inclusive: &J,
782 to_inclusive: &J,
783 ) -> Result<(), TypedStoreError>
784 where
785 K: Serialize,
786 {
787 let from_buf = be_fix_int_ser(from_inclusive);
788 let to_buf = be_fix_int_ser(to_inclusive);
789 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
790 self.db
791 .drop_cells_in_range(*ks, &from_buf, &to_buf)
792 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
793 }
794 Ok(())
795 }
796
797 #[cfg(tidehunter)]
798 pub fn drop_cells_in_range_raw(
799 &self,
800 from_inclusive: &[u8],
801 to_inclusive: &[u8],
802 ) -> Result<(), TypedStoreError> {
803 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
804 self.db
805 .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
806 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
807 }
808 Ok(())
809 }
810
811 fn multi_get_pinned<J>(
813 &self,
814 keys: impl IntoIterator<Item = J>,
815 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
816 where
817 J: Borrow<K>,
818 K: Serialize,
819 {
820 let _timer = self
821 .db_metrics
822 .op_metrics
823 .rocksdb_multiget_latency_seconds
824 .with_label_values(&[&self.cf])
825 .start_timer();
826 let perf_ctx = if self.multiget_sample_interval.sample() {
827 Some(RocksDBPerfContext)
828 } else {
829 None
830 };
831 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
832 let results: Result<Vec<_>, TypedStoreError> = self
833 .db
834 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
835 .into_iter()
836 .collect();
837 let entries = results?;
838 let entry_size = entries
839 .iter()
840 .flatten()
841 .map(|entry| entry.len())
842 .sum::<usize>();
843 self.db_metrics
844 .op_metrics
845 .rocksdb_multiget_bytes
846 .with_label_values(&[&self.cf])
847 .observe(entry_size as f64);
848 if perf_ctx.is_some() {
849 self.db_metrics
850 .read_perf_ctx_metrics
851 .report_metrics(&self.cf);
852 }
853 Ok(entries)
854 }
855
856 fn get_rocksdb_int_property(
857 rocksdb: &RocksDB,
858 cf: &impl AsColumnFamilyRef,
859 property_name: &std::ffi::CStr,
860 ) -> Result<i64, TypedStoreError> {
861 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
862 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
863 Ok(None) => Ok(0),
864 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
865 }
866 }
867
868 fn report_rocksdb_metrics(
869 database: &Arc<Database>,
870 cf_name: &str,
871 db_metrics: &Arc<DBMetrics>,
872 ) {
873 let Storage::Rocks(rocksdb) = &database.storage else {
874 return;
875 };
876
877 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
878 tracing::warn!(
879 "unable to report metrics for cf {cf_name:?} in db {:?}",
880 database.db_name()
881 );
882 return;
883 };
884
885 db_metrics
886 .cf_metrics
887 .rocksdb_total_sst_files_size
888 .with_label_values(&[cf_name])
889 .set(
890 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
891 .unwrap_or(METRICS_ERROR),
892 );
893 db_metrics
894 .cf_metrics
895 .rocksdb_total_blob_files_size
896 .with_label_values(&[cf_name])
897 .set(
898 Self::get_rocksdb_int_property(
899 rocksdb,
900 &cf,
901 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
902 )
903 .unwrap_or(METRICS_ERROR),
904 );
905 let total_num_files: i64 = (0..=6)
908 .map(|level| {
909 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
910 .unwrap_or(METRICS_ERROR)
911 })
912 .sum();
913 db_metrics
914 .cf_metrics
915 .rocksdb_total_num_files
916 .with_label_values(&[cf_name])
917 .set(total_num_files);
918 db_metrics
919 .cf_metrics
920 .rocksdb_num_level0_files
921 .with_label_values(&[cf_name])
922 .set(
923 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
924 .unwrap_or(METRICS_ERROR),
925 );
926 db_metrics
927 .cf_metrics
928 .rocksdb_current_size_active_mem_tables
929 .with_label_values(&[cf_name])
930 .set(
931 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
932 .unwrap_or(METRICS_ERROR),
933 );
934 db_metrics
935 .cf_metrics
936 .rocksdb_size_all_mem_tables
937 .with_label_values(&[cf_name])
938 .set(
939 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
940 .unwrap_or(METRICS_ERROR),
941 );
942 db_metrics
943 .cf_metrics
944 .rocksdb_num_snapshots
945 .with_label_values(&[cf_name])
946 .set(
947 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
948 .unwrap_or(METRICS_ERROR),
949 );
950 db_metrics
951 .cf_metrics
952 .rocksdb_oldest_snapshot_time
953 .with_label_values(&[cf_name])
954 .set(
955 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
956 .unwrap_or(METRICS_ERROR),
957 );
958 db_metrics
959 .cf_metrics
960 .rocksdb_actual_delayed_write_rate
961 .with_label_values(&[cf_name])
962 .set(
963 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
964 .unwrap_or(METRICS_ERROR),
965 );
966 db_metrics
967 .cf_metrics
968 .rocksdb_is_write_stopped
969 .with_label_values(&[cf_name])
970 .set(
971 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
972 .unwrap_or(METRICS_ERROR),
973 );
974 db_metrics
975 .cf_metrics
976 .rocksdb_block_cache_capacity
977 .with_label_values(&[cf_name])
978 .set(
979 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
980 .unwrap_or(METRICS_ERROR),
981 );
982 db_metrics
983 .cf_metrics
984 .rocksdb_block_cache_usage
985 .with_label_values(&[cf_name])
986 .set(
987 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
988 .unwrap_or(METRICS_ERROR),
989 );
990 db_metrics
991 .cf_metrics
992 .rocksdb_block_cache_pinned_usage
993 .with_label_values(&[cf_name])
994 .set(
995 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
996 .unwrap_or(METRICS_ERROR),
997 );
998 db_metrics
999 .cf_metrics
1000 .rocksdb_estimate_table_readers_mem
1001 .with_label_values(&[cf_name])
1002 .set(
1003 Self::get_rocksdb_int_property(
1004 rocksdb,
1005 &cf,
1006 properties::ESTIMATE_TABLE_READERS_MEM,
1007 )
1008 .unwrap_or(METRICS_ERROR),
1009 );
1010 db_metrics
1011 .cf_metrics
1012 .rocksdb_estimated_num_keys
1013 .with_label_values(&[cf_name])
1014 .set(
1015 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
1016 .unwrap_or(METRICS_ERROR),
1017 );
1018 db_metrics
1019 .cf_metrics
1020 .rocksdb_num_immutable_mem_tables
1021 .with_label_values(&[cf_name])
1022 .set(
1023 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
1024 .unwrap_or(METRICS_ERROR),
1025 );
1026 db_metrics
1027 .cf_metrics
1028 .rocksdb_mem_table_flush_pending
1029 .with_label_values(&[cf_name])
1030 .set(
1031 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
1032 .unwrap_or(METRICS_ERROR),
1033 );
1034 db_metrics
1035 .cf_metrics
1036 .rocksdb_compaction_pending
1037 .with_label_values(&[cf_name])
1038 .set(
1039 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
1040 .unwrap_or(METRICS_ERROR),
1041 );
1042 db_metrics
1043 .cf_metrics
1044 .rocksdb_estimate_pending_compaction_bytes
1045 .with_label_values(&[cf_name])
1046 .set(
1047 Self::get_rocksdb_int_property(
1048 rocksdb,
1049 &cf,
1050 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1051 )
1052 .unwrap_or(METRICS_ERROR),
1053 );
1054 db_metrics
1055 .cf_metrics
1056 .rocksdb_num_running_compactions
1057 .with_label_values(&[cf_name])
1058 .set(
1059 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1060 .unwrap_or(METRICS_ERROR),
1061 );
1062 db_metrics
1063 .cf_metrics
1064 .rocksdb_num_running_flushes
1065 .with_label_values(&[cf_name])
1066 .set(
1067 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1068 .unwrap_or(METRICS_ERROR),
1069 );
1070 db_metrics
1071 .cf_metrics
1072 .rocksdb_estimate_oldest_key_time
1073 .with_label_values(&[cf_name])
1074 .set(
1075 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1076 .unwrap_or(METRICS_ERROR),
1077 );
1078 db_metrics
1079 .cf_metrics
1080 .rocksdb_background_errors
1081 .with_label_values(&[cf_name])
1082 .set(
1083 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1084 .unwrap_or(METRICS_ERROR),
1085 );
1086 db_metrics
1087 .cf_metrics
1088 .rocksdb_base_level
1089 .with_label_values(&[cf_name])
1090 .set(
1091 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1092 .unwrap_or(METRICS_ERROR),
1093 );
1094 }
1095
1096 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1097 self.db.checkpoint(path)
1098 }
1099
1100 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1101 where
1102 K: Serialize + DeserializeOwned,
1103 V: Serialize + DeserializeOwned,
1104 {
1105 let mut num_keys = 0;
1106 let mut key_bytes_total = 0;
1107 let mut value_bytes_total = 0;
1108 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1109 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1110 for item in self.safe_iter() {
1111 let (key, value) = item?;
1112 num_keys += 1;
1113 let key_len = be_fix_int_ser(key.borrow()).len();
1114 let value_len = bcs::to_bytes(value.borrow())?.len();
1115 key_bytes_total += key_len;
1116 value_bytes_total += value_len;
1117 key_hist.record(key_len as u64)?;
1118 value_hist.record(value_len as u64)?;
1119 }
1120 Ok(TableSummary {
1121 num_keys,
1122 key_bytes_total,
1123 value_bytes_total,
1124 key_hist,
1125 value_hist,
1126 })
1127 }
1128
1129 fn start_iter_timer(&self) -> HistogramTimer {
1130 self.db_metrics
1131 .op_metrics
1132 .rocksdb_iter_latency_seconds
1133 .with_label_values(&[&self.cf])
1134 .start_timer()
1135 }
1136
1137 fn create_iter_context(
1139 &self,
1140 ) -> (
1141 Option<HistogramTimer>,
1142 Option<Histogram>,
1143 Option<Histogram>,
1144 Option<RocksDBPerfContext>,
1145 ) {
1146 let timer = self.start_iter_timer();
1147 let bytes_scanned = self
1148 .db_metrics
1149 .op_metrics
1150 .rocksdb_iter_bytes
1151 .with_label_values(&[&self.cf]);
1152 let keys_scanned = self
1153 .db_metrics
1154 .op_metrics
1155 .rocksdb_iter_keys
1156 .with_label_values(&[&self.cf]);
1157 let perf_ctx = if self.iter_sample_interval.sample() {
1158 Some(RocksDBPerfContext)
1159 } else {
1160 None
1161 };
1162 (
1163 Some(timer),
1164 Some(bytes_scanned),
1165 Some(keys_scanned),
1166 perf_ctx,
1167 )
1168 }
1169
1170 #[allow(clippy::complexity)]
1173 pub fn reversed_safe_iter_with_bounds(
1174 &self,
1175 lower_bound: Option<K>,
1176 upper_bound: Option<K>,
1177 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1178 where
1179 K: Serialize + DeserializeOwned,
1180 V: Serialize + DeserializeOwned,
1181 {
1182 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1183 lower_bound
1184 .as_ref()
1185 .map(Bound::Included)
1186 .unwrap_or(Bound::Unbounded),
1187 upper_bound
1188 .as_ref()
1189 .map(Bound::Included)
1190 .unwrap_or(Bound::Unbounded),
1191 ));
1192 match &self.db.storage {
1193 Storage::Rocks(db) => {
1194 let readopts = rocks_util::apply_range_bounds(
1195 self.opts.readopts(),
1196 it_lower_bound,
1197 it_upper_bound,
1198 );
1199 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1200 let db_iter = db
1201 .underlying
1202 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1203 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1204 let iter = SafeIter::new(
1205 self.cf.clone(),
1206 db_iter,
1207 _timer,
1208 _perf_ctx,
1209 bytes_scanned,
1210 keys_scanned,
1211 Some(self.db_metrics.clone()),
1212 );
1213 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1214 }
1215 Storage::InMemory(db) => {
1216 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1217 }
1218 #[cfg(tidehunter)]
1219 Storage::TideHunter(db) => match &self.column_family {
1220 ColumnFamily::TideHunter((ks, prefix)) => {
1221 let mut iter = db.iterator(*ks);
1222 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1223 iter.reverse();
1224 Ok(Box::new(transform_th_iterator(
1225 iter,
1226 prefix,
1227 self.start_iter_timer(),
1228 )))
1229 }
1230 _ => unreachable!("storage backend invariant violation"),
1231 },
1232 }
1233 }
1234
1235 pub fn snapshot_iterator(&self) -> DbIterator<'_, (K, V)>
1238 where
1239 K: Serialize + DeserializeOwned,
1240 V: Serialize + DeserializeOwned,
1241 {
1242 self.snapshot_iterator_with_bounds(None, None, false)
1243 }
1244
1245 pub fn snapshot_iterator_with_bounds(
1258 &self,
1259 lower_bound: Option<K>,
1260 upper_bound: Option<K>,
1261 reverse: bool,
1262 ) -> DbIterator<'_, (K, V)>
1263 where
1264 K: Serialize + DeserializeOwned,
1265 V: Serialize + DeserializeOwned,
1266 {
1267 #[cfg(tidehunter)]
1268 if let Storage::TideHunter(db) = &self.db.storage {
1269 let ColumnFamily::TideHunter((ks, prefix)) = &self.column_family else {
1270 unreachable!("storage backend invariant violation");
1271 };
1272 let (lower, upper) = if reverse {
1276 iterator_bounds_with_range::<K>((
1277 lower_bound
1278 .as_ref()
1279 .map(Bound::Included)
1280 .unwrap_or(Bound::Unbounded),
1281 upper_bound
1282 .as_ref()
1283 .map(Bound::Included)
1284 .unwrap_or(Bound::Unbounded),
1285 ))
1286 } else {
1287 iterator_bounds(lower_bound, upper_bound)
1288 };
1289 let mut iter = db.checkpoint().iterator(*ks);
1290 apply_range_bounds(&mut iter, lower, upper, prefix);
1291 if reverse {
1292 iter.reverse();
1293 }
1294 return Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()));
1295 }
1296 if reverse {
1297 self.reversed_safe_iter_with_bounds(lower_bound, upper_bound)
1299 .expect("reversed iterator construction is infallible")
1300 } else {
1301 self.safe_iter_with_bounds(lower_bound, upper_bound)
1302 }
1303 }
1304}
1305
1306pub enum StorageWriteBatch {
1307 Rocks(rocksdb::WriteBatch),
1308 InMemory(InMemoryBatch),
1309 #[cfg(tidehunter)]
1310 TideHunter(tidehunter::batch::WriteBatch),
1311}
1312
1313struct EntryHeader {
1317 offset: usize,
1319 cf_name_len: usize,
1320 key_len: usize,
1321 is_put: bool,
1322}
1323
1324#[derive(Default)]
1330pub struct StagedBatch {
1331 data: Vec<u8>,
1332 entries: Vec<EntryHeader>,
1333}
1334
1335impl StagedBatch {
1336 pub fn new() -> Self {
1337 Self {
1338 data: Vec::with_capacity(1024),
1339 entries: Vec::with_capacity(16),
1340 }
1341 }
1342
1343 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1344 &mut self,
1345 db: &DBMap<K, V>,
1346 new_vals: impl IntoIterator<Item = (J, U)>,
1347 ) -> Result<&mut Self, TypedStoreError> {
1348 let cf_name = db.cf_name();
1349 new_vals
1350 .into_iter()
1351 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1352 let offset = self.data.len();
1353 self.data.extend_from_slice(cf_name.as_bytes());
1354 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1355 bcs::serialize_into(&mut self.data, v.borrow())
1356 .map_err(typed_store_err_from_bcs_err)?;
1357 self.entries.push(EntryHeader {
1358 offset,
1359 cf_name_len: cf_name.len(),
1360 key_len,
1361 is_put: true,
1362 });
1363 Ok(())
1364 })?;
1365 Ok(self)
1366 }
1367
1368 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1369 &mut self,
1370 db: &DBMap<K, V>,
1371 purged_vals: impl IntoIterator<Item = J>,
1372 ) -> Result<(), TypedStoreError> {
1373 let cf_name = db.cf_name();
1374 purged_vals
1375 .into_iter()
1376 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1377 let offset = self.data.len();
1378 self.data.extend_from_slice(cf_name.as_bytes());
1379 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1380 self.entries.push(EntryHeader {
1381 offset,
1382 cf_name_len: cf_name.len(),
1383 key_len,
1384 is_put: false,
1385 });
1386 Ok(())
1387 })?;
1388 Ok(())
1389 }
1390
1391 pub fn size_in_bytes(&self) -> usize {
1392 self.data.len()
1393 }
1394}
1395
1396pub struct DBBatch {
1448 database: Arc<Database>,
1449 batch: StorageWriteBatch,
1450 db_metrics: Arc<DBMetrics>,
1451 write_sample_interval: SamplingInterval,
1452}
1453
1454impl DBBatch {
1455 pub fn new(
1459 dbref: &Arc<Database>,
1460 batch: StorageWriteBatch,
1461 db_metrics: &Arc<DBMetrics>,
1462 write_sample_interval: &SamplingInterval,
1463 ) -> Self {
1464 DBBatch {
1465 database: dbref.clone(),
1466 batch,
1467 db_metrics: db_metrics.clone(),
1468 write_sample_interval: write_sample_interval.clone(),
1469 }
1470 }
1471
1472 #[instrument(level = "trace", skip_all, err)]
1474 pub fn write(self) -> Result<(), TypedStoreError> {
1475 let mut write_options = rocksdb::WriteOptions::default();
1476
1477 if write_sync_enabled() {
1478 write_options.set_sync(true);
1479 }
1480
1481 self.write_opt(write_options)
1482 }
1483
1484 #[instrument(level = "trace", skip_all, err)]
1486 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1487 let db_name = self.database.db_name();
1488 let timer = self
1489 .db_metrics
1490 .op_metrics
1491 .rocksdb_batch_commit_latency_seconds
1492 .with_label_values(&[&db_name])
1493 .start_timer();
1494 let batch_size = self.size_in_bytes();
1495
1496 let perf_ctx = if self.write_sample_interval.sample() {
1497 Some(RocksDBPerfContext)
1498 } else {
1499 None
1500 };
1501
1502 self.database
1503 .write_opt_internal(self.batch, &write_options)?;
1504
1505 self.db_metrics
1506 .op_metrics
1507 .rocksdb_batch_commit_bytes
1508 .with_label_values(&[&db_name])
1509 .observe(batch_size as f64);
1510
1511 if perf_ctx.is_some() {
1512 self.db_metrics
1513 .write_perf_ctx_metrics
1514 .report_metrics(&db_name);
1515 }
1516 let elapsed = timer.stop_and_record();
1517 if elapsed > 1.0 {
1518 warn!(?elapsed, ?db_name, "very slow batch write");
1519 self.db_metrics
1520 .op_metrics
1521 .rocksdb_very_slow_batch_writes_count
1522 .with_label_values(&[&db_name])
1523 .inc();
1524 self.db_metrics
1525 .op_metrics
1526 .rocksdb_very_slow_batch_writes_duration_ms
1527 .with_label_values(&[&db_name])
1528 .inc_by((elapsed * 1000.0) as u64);
1529 }
1530 Ok(())
1531 }
1532
1533 pub fn size_in_bytes(&self) -> usize {
1534 match self.batch {
1535 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1536 StorageWriteBatch::InMemory(_) => 0,
1537 #[cfg(tidehunter)]
1539 StorageWriteBatch::TideHunter(_) => 0,
1540 }
1541 }
1542
1543 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1545 for raw_batch in raw_batches {
1546 let data = &raw_batch.data;
1547 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1548 let end = raw_batch
1549 .entries
1550 .get(i + 1)
1551 .map_or(data.len(), |next| next.offset);
1552 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1553 let key_start = hdr.offset + hdr.cf_name_len;
1554 let key = &data[key_start..key_start + hdr.key_len];
1555 let cf_name = std::str::from_utf8(cf_bytes)
1557 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1558
1559 if hdr.is_put {
1560 let value = &data[key_start + hdr.key_len..end];
1561 match &mut self.batch {
1562 StorageWriteBatch::Rocks(b) => {
1563 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1564 }
1565 StorageWriteBatch::InMemory(b) => {
1566 b.put_cf(cf_name, key, value);
1567 }
1568 #[cfg(tidehunter)]
1569 _ => {
1570 return Err(TypedStoreError::RocksDBError(
1571 "concat not supported for TideHunter".to_string(),
1572 ));
1573 }
1574 }
1575 } else {
1576 match &mut self.batch {
1577 StorageWriteBatch::Rocks(b) => {
1578 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1579 }
1580 StorageWriteBatch::InMemory(b) => {
1581 b.delete_cf(cf_name, key);
1582 }
1583 #[cfg(tidehunter)]
1584 _ => {
1585 return Err(TypedStoreError::RocksDBError(
1586 "concat not supported for TideHunter".to_string(),
1587 ));
1588 }
1589 }
1590 }
1591 }
1592 }
1593 Ok(self)
1594 }
1595
1596 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1597 &mut self,
1598 db: &DBMap<K, V>,
1599 purged_vals: impl IntoIterator<Item = J>,
1600 ) -> Result<(), TypedStoreError> {
1601 if !Arc::ptr_eq(&db.db, &self.database) {
1602 return Err(TypedStoreError::CrossDBBatch);
1603 }
1604
1605 purged_vals
1606 .into_iter()
1607 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1608 let k_buf = be_fix_int_ser(k.borrow());
1609 match (&mut self.batch, &db.column_family) {
1610 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1611 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1612 }
1613 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1614 b.delete_cf(name, k_buf)
1615 }
1616 #[cfg(tidehunter)]
1617 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1618 b.delete(*ks, transform_th_key(&k_buf, prefix))
1619 }
1620 _ => Err(TypedStoreError::RocksDBError(
1621 "typed store invariant violation".to_string(),
1622 ))?,
1623 }
1624 Ok(())
1625 })?;
1626 Ok(())
1627 }
1628
1629 pub fn schedule_delete_range<K: Serialize, V>(
1639 &mut self,
1640 db: &DBMap<K, V>,
1641 from: &K,
1642 to: &K,
1643 ) -> Result<(), TypedStoreError> {
1644 if !Arc::ptr_eq(&db.db, &self.database) {
1645 return Err(TypedStoreError::CrossDBBatch);
1646 }
1647
1648 let from_buf = be_fix_int_ser(from);
1649 let to_buf = be_fix_int_ser(to);
1650
1651 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1652 b.delete_range_cf(
1653 &rocks_cf_from_db(&self.database, db.cf_name())?,
1654 from_buf,
1655 to_buf,
1656 );
1657 }
1658 Ok(())
1659 }
1660
1661 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1663 &mut self,
1664 db: &DBMap<K, V>,
1665 new_vals: impl IntoIterator<Item = (J, U)>,
1666 ) -> Result<&mut Self, TypedStoreError> {
1667 if !Arc::ptr_eq(&db.db, &self.database) {
1668 return Err(TypedStoreError::CrossDBBatch);
1669 }
1670 let mut total = 0usize;
1671 new_vals
1672 .into_iter()
1673 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1674 let k_buf = be_fix_int_ser(k.borrow());
1675 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1676 total += k_buf.len() + v_buf.len();
1677 if db.opts.log_value_hash {
1678 let key_hash = default_hash(&k_buf);
1679 let value_hash = default_hash(&v_buf);
1680 debug!(
1681 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1682 db.cf_name(),
1683 key_hash,
1684 value_hash
1685 );
1686 }
1687 match (&mut self.batch, &db.column_family) {
1688 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1689 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1690 }
1691 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1692 b.put_cf(name, k_buf, v_buf)
1693 }
1694 #[cfg(tidehunter)]
1695 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1696 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1697 }
1698 _ => Err(TypedStoreError::RocksDBError(
1699 "typed store invariant violation".to_string(),
1700 ))?,
1701 }
1702 Ok(())
1703 })?;
1704 self.db_metrics
1705 .op_metrics
1706 .rocksdb_batch_put_bytes
1707 .with_label_values(&[&db.cf])
1708 .observe(total as f64);
1709 Ok(self)
1710 }
1711
1712 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1713 &mut self,
1714 db: &DBMap<K, V>,
1715 new_vals: impl IntoIterator<Item = (J, U)>,
1716 ) -> Result<&mut Self, TypedStoreError> {
1717 if !Arc::ptr_eq(&db.db, &self.database) {
1718 return Err(TypedStoreError::CrossDBBatch);
1719 }
1720 new_vals
1721 .into_iter()
1722 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1723 let k_buf = be_fix_int_ser(k.borrow());
1724 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1725 match &mut self.batch {
1726 StorageWriteBatch::Rocks(b) => b.merge_cf(
1727 &rocks_cf_from_db(&self.database, db.cf_name())?,
1728 k_buf,
1729 v_buf,
1730 ),
1731 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1732 }
1733 Ok(())
1734 })?;
1735 Ok(self)
1736 }
1737}
1738
1739impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1740where
1741 K: Serialize + DeserializeOwned,
1742 V: Serialize + DeserializeOwned,
1743{
1744 type Error = TypedStoreError;
1745
1746 #[instrument(level = "trace", skip_all, err)]
1747 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1748 let key_buf = be_fix_int_ser(key);
1749 self.db
1750 .contains(&self.column_family, &key_buf, &self.opts.readopts())
1751 }
1752
1753 #[instrument(level = "trace", skip_all, err)]
1754 fn multi_contains_keys<J>(
1755 &self,
1756 keys: impl IntoIterator<Item = J>,
1757 ) -> Result<Vec<bool>, Self::Error>
1758 where
1759 J: Borrow<K>,
1760 {
1761 let _timer = self
1762 .db_metrics
1763 .op_metrics
1764 .rocksdb_multiget_latency_seconds
1765 .with_label_values(&[&self.cf])
1766 .start_timer();
1767 let perf_ctx = if self.multiget_sample_interval.sample() {
1768 Some(RocksDBPerfContext)
1769 } else {
1770 None
1771 };
1772 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
1773 let result = self
1774 .db
1775 .multi_contains(&self.column_family, keys_bytes, &self.opts.readopts());
1776 if perf_ctx.is_some() {
1777 self.db_metrics
1778 .read_perf_ctx_metrics
1779 .report_metrics(&self.cf);
1780 }
1781 result
1782 }
1783
1784 #[instrument(level = "trace", skip_all, err)]
1785 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1786 let _timer = self
1787 .db_metrics
1788 .op_metrics
1789 .rocksdb_get_latency_seconds
1790 .with_label_values(&[&self.cf])
1791 .start_timer();
1792 let perf_ctx = if self.get_sample_interval.sample() {
1793 Some(RocksDBPerfContext)
1794 } else {
1795 None
1796 };
1797 let key_buf = be_fix_int_ser(key);
1798 let res = self
1799 .db
1800 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1801 self.db_metrics
1802 .op_metrics
1803 .rocksdb_get_bytes
1804 .with_label_values(&[&self.cf])
1805 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1806 if perf_ctx.is_some() {
1807 self.db_metrics
1808 .read_perf_ctx_metrics
1809 .report_metrics(&self.cf);
1810 }
1811 match res {
1812 Some(data) => {
1813 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1814 if value.is_err() {
1815 let key_hash = default_hash(&key_buf);
1816 let value_hash = default_hash(&data);
1817 debug_fatal!(
1818 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1819 self.cf_name(),
1820 key_hash,
1821 value_hash,
1822 value.as_ref().err().unwrap()
1823 );
1824 }
1825 Ok(Some(value?))
1826 }
1827 None => Ok(None),
1828 }
1829 }
1830
1831 #[instrument(level = "trace", skip_all, err)]
1832 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1833 let timer = self
1834 .db_metrics
1835 .op_metrics
1836 .rocksdb_put_latency_seconds
1837 .with_label_values(&[&self.cf])
1838 .start_timer();
1839 let perf_ctx = if self.write_sample_interval.sample() {
1840 Some(RocksDBPerfContext)
1841 } else {
1842 None
1843 };
1844 let key_buf = be_fix_int_ser(key);
1845 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1846 self.db_metrics
1847 .op_metrics
1848 .rocksdb_put_bytes
1849 .with_label_values(&[&self.cf])
1850 .observe((key_buf.len() + value_buf.len()) as f64);
1851 if perf_ctx.is_some() {
1852 self.db_metrics
1853 .write_perf_ctx_metrics
1854 .report_metrics(&self.cf);
1855 }
1856 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1857
1858 let elapsed = timer.stop_and_record();
1859 if elapsed > 1.0 {
1860 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1861 self.db_metrics
1862 .op_metrics
1863 .rocksdb_very_slow_puts_count
1864 .with_label_values(&[&self.cf])
1865 .inc();
1866 self.db_metrics
1867 .op_metrics
1868 .rocksdb_very_slow_puts_duration_ms
1869 .with_label_values(&[&self.cf])
1870 .inc_by((elapsed * 1000.0) as u64);
1871 }
1872
1873 Ok(())
1874 }
1875
1876 #[instrument(level = "trace", skip_all, err)]
1877 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1878 let _timer = self
1879 .db_metrics
1880 .op_metrics
1881 .rocksdb_delete_latency_seconds
1882 .with_label_values(&[&self.cf])
1883 .start_timer();
1884 let perf_ctx = if self.write_sample_interval.sample() {
1885 Some(RocksDBPerfContext)
1886 } else {
1887 None
1888 };
1889 let key_buf = be_fix_int_ser(key);
1890 self.db.delete_cf(&self.column_family, key_buf)?;
1891 self.db_metrics
1892 .op_metrics
1893 .rocksdb_deletes
1894 .with_label_values(&[&self.cf])
1895 .inc();
1896 if perf_ctx.is_some() {
1897 self.db_metrics
1898 .write_perf_ctx_metrics
1899 .report_metrics(&self.cf);
1900 }
1901 Ok(())
1902 }
1903
1904 #[instrument(level = "trace", skip_all, err)]
1913 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1914 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1915 let last_key = self
1916 .reversed_safe_iter_with_bounds(None, None)?
1917 .next()
1918 .transpose()?
1919 .map(|(k, _v)| k);
1920 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1921 let mut batch = self.batch();
1922 batch.schedule_delete_range(self, &first_key, &last_key)?;
1923 batch.write()?;
1924 }
1925 Ok(())
1926 }
1927
1928 fn is_empty(&self) -> bool {
1929 self.safe_iter().next().is_none()
1930 }
1931
1932 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1933 match &self.db.storage {
1934 Storage::Rocks(db) => {
1935 let db_iter = db
1936 .underlying
1937 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1938 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1939 Box::new(SafeIter::new(
1940 self.cf.clone(),
1941 db_iter,
1942 _timer,
1943 _perf_ctx,
1944 bytes_scanned,
1945 keys_scanned,
1946 Some(self.db_metrics.clone()),
1947 ))
1948 }
1949 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1950 #[cfg(tidehunter)]
1951 Storage::TideHunter(db) => match &self.column_family {
1952 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1953 db.iterator(*ks),
1954 prefix,
1955 self.start_iter_timer(),
1956 )),
1957 _ => unreachable!("storage backend invariant violation"),
1958 },
1959 }
1960 }
1961
1962 fn safe_iter_with_bounds(
1963 &'a self,
1964 lower_bound: Option<K>,
1965 upper_bound: Option<K>,
1966 ) -> DbIterator<'a, (K, V)> {
1967 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1968 match &self.db.storage {
1969 Storage::Rocks(db) => {
1970 let readopts =
1971 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1972 let db_iter = db
1973 .underlying
1974 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1975 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1976 Box::new(SafeIter::new(
1977 self.cf.clone(),
1978 db_iter,
1979 _timer,
1980 _perf_ctx,
1981 bytes_scanned,
1982 keys_scanned,
1983 Some(self.db_metrics.clone()),
1984 ))
1985 }
1986 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1987 #[cfg(tidehunter)]
1988 Storage::TideHunter(db) => match &self.column_family {
1989 ColumnFamily::TideHunter((ks, prefix)) => {
1990 let mut iter = db.iterator(*ks);
1991 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1992 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1993 }
1994 _ => unreachable!("storage backend invariant violation"),
1995 },
1996 }
1997 }
1998
1999 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
2000 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
2001 match &self.db.storage {
2002 Storage::Rocks(db) => {
2003 let readopts =
2004 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
2005 let db_iter = db
2006 .underlying
2007 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
2008 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
2009 Box::new(SafeIter::new(
2010 self.cf.clone(),
2011 db_iter,
2012 _timer,
2013 _perf_ctx,
2014 bytes_scanned,
2015 keys_scanned,
2016 Some(self.db_metrics.clone()),
2017 ))
2018 }
2019 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
2020 #[cfg(tidehunter)]
2021 Storage::TideHunter(db) => match &self.column_family {
2022 ColumnFamily::TideHunter((ks, prefix)) => {
2023 let mut iter = db.iterator(*ks);
2024 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
2025 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
2026 }
2027 _ => unreachable!("storage backend invariant violation"),
2028 },
2029 }
2030 }
2031
2032 #[instrument(level = "trace", skip_all, err)]
2034 fn multi_get<J>(
2035 &self,
2036 keys: impl IntoIterator<Item = J>,
2037 ) -> Result<Vec<Option<V>>, TypedStoreError>
2038 where
2039 J: Borrow<K>,
2040 {
2041 let results = self.multi_get_pinned(keys)?;
2042 let values_parsed: Result<Vec<_>, TypedStoreError> = results
2043 .into_iter()
2044 .map(|value_byte| match value_byte {
2045 Some(data) => Ok(Some(
2046 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
2047 )),
2048 None => Ok(None),
2049 })
2050 .collect();
2051
2052 values_parsed
2053 }
2054
2055 #[instrument(level = "trace", skip_all, err)]
2057 fn multi_insert<J, U>(
2058 &self,
2059 key_val_pairs: impl IntoIterator<Item = (J, U)>,
2060 ) -> Result<(), Self::Error>
2061 where
2062 J: Borrow<K>,
2063 U: Borrow<V>,
2064 {
2065 let mut batch = self.batch();
2066 batch.insert_batch(self, key_val_pairs)?;
2067 batch.write()
2068 }
2069
2070 #[instrument(level = "trace", skip_all, err)]
2072 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
2073 where
2074 J: Borrow<K>,
2075 {
2076 let mut batch = self.batch();
2077 batch.delete_batch(self, keys)?;
2078 batch.write()
2079 }
2080
2081 #[instrument(level = "trace", skip_all, err)]
2083 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
2084 if let Storage::Rocks(rocks) = &self.db.storage {
2085 rocks
2086 .underlying
2087 .try_catch_up_with_primary()
2088 .map_err(typed_store_err_from_rocks_err)?;
2089 }
2090 Ok(())
2091 }
2092}
2093
2094#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
2096pub fn open_cf_opts<P: AsRef<Path>>(
2097 path: P,
2098 db_options: Option<rocksdb::Options>,
2099 metric_conf: MetricConf,
2100 opt_cfs: &[(&str, rocksdb::Options)],
2101) -> Result<Arc<Database>, TypedStoreError> {
2102 let path = path.as_ref();
2103 ensure_database_type(path, StorageType::Rocks)
2104 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2105 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
2114 nondeterministic!({
2115 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2116 options.create_if_missing(true);
2117 options.create_missing_column_families(true);
2118 let rocksdb = {
2119 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
2120 &options,
2121 path,
2122 cfs.into_iter()
2123 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
2124 )
2125 .map_err(typed_store_err_from_rocks_err)?
2126 };
2127 Ok(Arc::new(Database::new(
2128 Storage::Rocks(RocksDB {
2129 underlying: rocksdb,
2130 }),
2131 metric_conf,
2132 None,
2133 )))
2134 })
2135}
2136
2137pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2139 primary_path: P,
2140 secondary_path: Option<P>,
2141 db_options: Option<rocksdb::Options>,
2142 metric_conf: MetricConf,
2143 opt_cfs: &[(&str, rocksdb::Options)],
2144) -> Result<Arc<Database>, TypedStoreError> {
2145 let primary_path = primary_path.as_ref();
2146 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2147 nondeterministic!({
2149 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2151
2152 fdlimit::raise_fd_limit();
2153 options.set_max_open_files(-1);
2155
2156 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2157 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2158 .ok()
2159 .unwrap_or_default();
2160
2161 let default_db_options = default_db_options();
2162 for cf_key in cfs.iter() {
2164 if !opt_cfs.contains_key(&cf_key[..]) {
2165 opt_cfs.insert(cf_key, default_db_options.options.clone());
2166 }
2167 }
2168
2169 let primary_path = primary_path.to_path_buf();
2170 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2171 let mut s = primary_path.clone();
2172 s.pop();
2173 s.push("SECONDARY");
2174 s.as_path().to_path_buf()
2175 });
2176
2177 ensure_database_type(&primary_path, StorageType::Rocks)
2178 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2179 ensure_database_type(&secondary_path, StorageType::Rocks)
2180 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2181
2182 let rocksdb = {
2183 options.create_if_missing(true);
2184 options.create_missing_column_families(true);
2185 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2186 &options,
2187 &primary_path,
2188 &secondary_path,
2189 opt_cfs
2190 .iter()
2191 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2192 )
2193 .map_err(typed_store_err_from_rocks_err)?;
2194 db.try_catch_up_with_primary()
2195 .map_err(typed_store_err_from_rocks_err)?;
2196 db
2197 };
2198 Ok(Arc::new(Database::new(
2199 Storage::Rocks(RocksDB {
2200 underlying: rocksdb,
2201 }),
2202 metric_conf,
2203 None,
2204 )))
2205 })
2206}
2207
2208pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2214 #[cfg(tidehunter)]
2215 if is_tidehunter_db(&path) {
2216 return safe_drop_tidehunter_db(path, timeout).await;
2217 }
2218 safe_drop_rocksdb(path, timeout).await
2219}
2220
2221async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2222 let mut backoff = backoff::ExponentialBackoff {
2223 max_elapsed_time: Some(timeout),
2224 ..Default::default()
2225 };
2226 loop {
2227 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2228 Ok(()) => return Ok(()),
2229 Err(err) => match backoff.next_backoff() {
2230 Some(duration) => tokio::time::sleep(duration).await,
2231 None => return Err(std::io::Error::other(err)),
2232 },
2233 }
2234 }
2235}
2236
2237#[cfg(tidehunter)]
2238fn is_tidehunter_db(path: &Path) -> bool {
2239 path.join("shape.yaml").exists()
2242}
2243
2244#[cfg(tidehunter)]
2245async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2246 let mut backoff = backoff::ExponentialBackoff {
2247 max_elapsed_time: Some(timeout),
2248 ..Default::default()
2249 };
2250 loop {
2251 match TideHunterDb::drop_db(&path) {
2252 Ok(()) => return Ok(()),
2253 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2254 match backoff.next_backoff() {
2255 Some(duration) => tokio::time::sleep(duration).await,
2256 None => {
2257 warn!(
2258 "Database at {:?} is still locked after timeout ({:?})",
2259 path, timeout
2260 );
2261 return Err(err);
2262 }
2263 }
2264 }
2265 Err(err) => return Err(err),
2266 }
2267 }
2268}
2269
2270fn populate_missing_cfs(
2271 input_cfs: &[(&str, rocksdb::Options)],
2272 path: &Path,
2273) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2274 let mut cfs = vec![];
2275 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2276 let existing_cfs =
2277 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2278 .ok()
2279 .unwrap_or_default();
2280
2281 for cf_name in existing_cfs {
2282 if !input_cf_index.contains(&cf_name[..]) {
2283 cfs.push((cf_name, rocksdb::Options::default()));
2284 }
2285 }
2286 cfs.extend(
2287 input_cfs
2288 .iter()
2289 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2290 );
2291 Ok(cfs)
2292}
2293
2294fn default_hash(value: &[u8]) -> Digest<32> {
2295 let mut hasher = fastcrypto::hash::Blake2b256::default();
2296 hasher.update(value);
2297 hasher.finalize()
2298}