1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn multi_get<I, K>(
231 &self,
232 cf: &ColumnFamily,
233 keys: I,
234 readopts: &ReadOptions,
235 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236 where
237 I: IntoIterator<Item = K>,
238 K: AsRef<[u8]>,
239 {
240 match (&self.storage, cf) {
241 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242 let keys_vec: Vec<K> = keys.into_iter().collect();
243 let res = db.underlying.batched_multi_get_cf_opt(
244 &cf.rocks_cf(db),
245 keys_vec.iter(),
246 false,
247 readopts,
248 );
249 res.into_iter()
250 .map(|r| {
251 r.map_err(typed_store_err_from_rocks_err)
252 .map(|item| item.map(GetResult::Rocks))
253 })
254 .collect()
255 }
256 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257 .multi_get(cf_name, keys)
258 .into_iter()
259 .map(|r| Ok(r.map(GetResult::InMemory)))
260 .collect(),
261 #[cfg(tidehunter)]
262 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263 let res = keys.into_iter().map(|k| {
264 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265 .map_err(typed_store_error_from_th_error)
266 });
267 res.into_iter()
268 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269 .collect()
270 }
271 _ => unreachable!("typed store invariant violation"),
272 }
273 }
274
275 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276 match &self.storage {
277 Storage::Rocks(db) => db.underlying.drop_cf(name),
278 Storage::InMemory(db) => {
279 db.drop_cf(name);
280 Ok(())
281 }
282 #[cfg(tidehunter)]
283 Storage::TideHunter(_) => {
284 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285 }
286 }
287 }
288
289 pub fn delete_file_in_range<K: AsRef<[u8]>>(
290 &self,
291 cf: &impl AsColumnFamilyRef,
292 from: K,
293 to: K,
294 ) -> Result<(), rocksdb::Error> {
295 match &self.storage {
296 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298 }
299 }
300
301 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302 fail_point!("delete-cf-before");
303 let ret = match (&self.storage, cf) {
304 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305 .underlying
306 .delete_cf(&cf.rocks_cf(db), key)
307 .map_err(typed_store_err_from_rocks_err),
308 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309 db.delete(cf_name, key.as_ref());
310 Ok(())
311 }
312 #[cfg(tidehunter)]
313 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314 .remove(*ks, transform_th_key(key.as_ref(), prefix))
315 .map_err(typed_store_error_from_th_error),
316 _ => Err(TypedStoreError::RocksDBError(
317 "typed store invariant violation".to_string(),
318 )),
319 };
320 fail_point!("delete-cf-after");
321 #[allow(clippy::let_and_return)]
322 ret
323 }
324
325 pub fn path_for_pruning(&self) -> &Path {
326 match &self.storage {
327 Storage::Rocks(rocks) => rocks.underlying.path(),
328 _ => unimplemented!("method is only supported for rocksdb backend"),
329 }
330 }
331
332 fn put_cf(
333 &self,
334 cf: &ColumnFamily,
335 key: Vec<u8>,
336 value: Vec<u8>,
337 ) -> Result<(), TypedStoreError> {
338 fail_point!("put-cf-before");
339 let ret = match (&self.storage, cf) {
340 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341 .underlying
342 .put_cf(&cf.rocks_cf(db), key, value)
343 .map_err(typed_store_err_from_rocks_err),
344 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345 db.put(cf_name, key, value);
346 Ok(())
347 }
348 #[cfg(tidehunter)]
349 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350 .insert(*ks, transform_th_key(&key, prefix), value)
351 .map_err(typed_store_error_from_th_error),
352 _ => Err(TypedStoreError::RocksDBError(
353 "typed store invariant violation".to_string(),
354 )),
355 };
356 fail_point!("put-cf-after");
357 #[allow(clippy::let_and_return)]
358 ret
359 }
360
361 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362 &self,
363 cf_name: &str,
364 key: K,
365 readopts: &ReadOptions,
366 ) -> bool {
367 match &self.storage {
368 Storage::Rocks(rocks) => {
371 rocks
372 .underlying
373 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374 }
375 _ => true,
376 }
377 }
378
379 pub(crate) fn write_opt_internal(
380 &self,
381 batch: StorageWriteBatch,
382 write_options: &rocksdb::WriteOptions,
383 ) -> Result<(), TypedStoreError> {
384 fail_point!("batch-write-before");
385 let ret = match (&self.storage, batch) {
386 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387 .underlying
388 .write_opt(batch, write_options)
389 .map_err(typed_store_err_from_rocks_err),
390 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391 db.write(batch);
393 Ok(())
394 }
395 #[cfg(tidehunter)]
396 (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397 batch.commit().map_err(typed_store_error_from_th_error)
399 }
400 _ => Err(TypedStoreError::RocksDBError(
401 "using invalid batch type for the database".to_string(),
402 )),
403 };
404 fail_point!("batch-write-after");
405 #[allow(clippy::let_and_return)]
406 ret
407 }
408
409 #[cfg(tidehunter)]
410 pub fn start_relocation(&self) -> anyhow::Result<()> {
411 if let Storage::TideHunter(db) = &self.storage {
412 db.start_relocation()?;
413 }
414 Ok(())
415 }
416
417 #[cfg(tidehunter)]
418 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
419 if let Storage::TideHunter(db) = &self.storage {
420 db.force_rebuild_control_region()
421 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
422 }
423 Ok(())
424 }
425
426 #[cfg(tidehunter)]
433 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
434 let strong = Arc::strong_count(&self);
435 if strong != 1 {
436 println!(
437 "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
438 strong,
439 );
440 }
441 let Storage::TideHunter(th_arc) = &self.storage else {
442 return;
443 };
444 let th_arc = th_arc.clone();
445 drop(self);
446 th_arc.wait_for_background_threads_to_finish();
447 }
448
449 #[cfg(tidehunter)]
450 pub fn drop_cells_in_range(
451 &self,
452 ks: KeySpace,
453 from_inclusive: &[u8],
454 to_inclusive: &[u8],
455 ) -> anyhow::Result<()> {
456 if let Storage::TideHunter(db) = &self.storage {
457 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
458 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
459 } else {
460 panic!("drop_cells_in_range called on non-TideHunter storage");
461 }
462 Ok(())
463 }
464
465 pub fn compact_range_cf<K: AsRef<[u8]>>(
466 &self,
467 cf_name: &str,
468 start: Option<K>,
469 end: Option<K>,
470 ) {
471 if let Storage::Rocks(rocksdb) = &self.storage {
472 rocksdb
473 .underlying
474 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
475 }
476 }
477
478 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
479 if let Storage::Rocks(rocks) = &self.storage {
481 let checkpoint =
482 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
483 checkpoint
484 .create_checkpoint(path)
485 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
486 }
487 Ok(())
488 }
489
490 pub fn get_sampling_interval(&self) -> SamplingInterval {
491 self.metric_conf.read_sample_interval.new_from_self()
492 }
493
494 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
495 self.metric_conf.read_sample_interval.new_from_self()
496 }
497
498 pub fn write_sampling_interval(&self) -> SamplingInterval {
499 self.metric_conf.write_sample_interval.new_from_self()
500 }
501
502 pub fn iter_sampling_interval(&self) -> SamplingInterval {
503 self.metric_conf.iter_sample_interval.new_from_self()
504 }
505
506 fn db_name(&self) -> String {
507 let name = &self.metric_conf.db_name;
508 if name.is_empty() {
509 "default".to_string()
510 } else {
511 name.clone()
512 }
513 }
514
515 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
516 match &self.storage {
517 Storage::Rocks(rocks) => rocks.underlying.live_files(),
518 _ => Ok(vec![]),
519 }
520 }
521}
522
523fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
524 rocks_db
525 .underlying
526 .cf_handle(cf_name)
527 .expect("Map-keying column family should have been checked at DB creation")
528}
529
530fn rocks_cf_from_db<'a>(
531 db: &'a Database,
532 cf_name: &str,
533) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
534 match &db.storage {
535 Storage::Rocks(rocksdb) => Ok(rocksdb
536 .underlying
537 .cf_handle(cf_name)
538 .expect("Map-keying column family should have been checked at DB creation")),
539 _ => Err(TypedStoreError::RocksDBError(
540 "using invalid batch type for the database".to_string(),
541 )),
542 }
543}
544
545#[derive(Debug, Default)]
546pub struct MetricConf {
547 pub db_name: String,
548 pub read_sample_interval: SamplingInterval,
549 pub write_sample_interval: SamplingInterval,
550 pub iter_sample_interval: SamplingInterval,
551}
552
553impl MetricConf {
554 pub fn new(db_name: &str) -> Self {
555 if db_name.is_empty() {
556 error!("A meaningful db name should be used for metrics reporting.")
557 }
558 Self {
559 db_name: db_name.to_string(),
560 read_sample_interval: SamplingInterval::default(),
561 write_sample_interval: SamplingInterval::default(),
562 iter_sample_interval: SamplingInterval::default(),
563 }
564 }
565
566 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
567 Self {
568 db_name: self.db_name,
569 read_sample_interval: read_interval,
570 write_sample_interval: SamplingInterval::default(),
571 iter_sample_interval: SamplingInterval::default(),
572 }
573 }
574}
575const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
576const METRICS_ERROR: i64 = -1;
577
578#[derive(Clone, Debug)]
580pub struct DBMap<K, V> {
581 pub db: Arc<Database>,
582 _phantom: PhantomData<fn(K) -> V>,
583 column_family: ColumnFamily,
584 cf: String,
586 pub opts: ReadWriteOptions,
587 db_metrics: Arc<DBMetrics>,
588 get_sample_interval: SamplingInterval,
589 multiget_sample_interval: SamplingInterval,
590 write_sample_interval: SamplingInterval,
591 iter_sample_interval: SamplingInterval,
592 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
593}
594
595unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
596
597impl<K, V> DBMap<K, V> {
598 pub(crate) fn new(
599 db: Arc<Database>,
600 opts: &ReadWriteOptions,
601 opt_cf: &str,
602 column_family: ColumnFamily,
603 is_deprecated: bool,
604 ) -> Self {
605 let db_cloned = Arc::downgrade(&db.clone());
606 let db_metrics = DBMetrics::get();
607 let db_metrics_cloned = db_metrics.clone();
608 let cf = opt_cf.to_string();
609
610 let (sender, mut recv) = tokio::sync::oneshot::channel();
611 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
612 tokio::task::spawn(async move {
613 let mut interval =
614 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
615 loop {
616 tokio::select! {
617 _ = interval.tick() => {
618 if let Some(db) = db_cloned.upgrade() {
619 let cf = cf.clone();
620 let db_metrics = db_metrics.clone();
621 if let Err(e) = tokio::task::spawn_blocking(move || {
622 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
623 }).await {
624 error!("Failed to log metrics with error: {}", e);
625 }
626 }
627 }
628 _ = &mut recv => break,
629 }
630 }
631 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
632 });
633 }
634 DBMap {
635 db: db.clone(),
636 opts: opts.clone(),
637 _phantom: PhantomData,
638 column_family,
639 cf: opt_cf.to_string(),
640 db_metrics: db_metrics_cloned,
641 _metrics_task_cancel_handle: Arc::new(sender),
642 get_sample_interval: db.get_sampling_interval(),
643 multiget_sample_interval: db.multiget_sampling_interval(),
644 write_sample_interval: db.write_sampling_interval(),
645 iter_sample_interval: db.iter_sampling_interval(),
646 }
647 }
648
649 #[instrument(level = "debug", skip(db), err)]
652 pub fn reopen(
653 db: &Arc<Database>,
654 opt_cf: Option<&str>,
655 rw_options: &ReadWriteOptions,
656 is_deprecated: bool,
657 ) -> Result<Self, TypedStoreError> {
658 let cf_key = opt_cf
659 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
660 .to_owned();
661 Ok(DBMap::new(
662 db.clone(),
663 rw_options,
664 &cf_key,
665 ColumnFamily::Rocks(cf_key.to_string()),
666 is_deprecated,
667 ))
668 }
669
670 #[cfg(tidehunter)]
671 pub fn reopen_th(
672 db: Arc<Database>,
673 cf_name: &str,
674 ks: KeySpace,
675 prefix: Option<Vec<u8>>,
676 ) -> Self {
677 DBMap::new(
678 db,
679 &ReadWriteOptions::default(),
680 cf_name,
681 ColumnFamily::TideHunter((ks, prefix.clone())),
682 false,
683 )
684 }
685
686 pub fn cf_name(&self) -> &str {
687 &self.cf
688 }
689
690 pub fn batch(&self) -> DBBatch {
691 let batch = match &self.db.storage {
692 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
693 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
694 #[cfg(tidehunter)]
695 Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
696 };
697 DBBatch::new(
698 &self.db,
699 batch,
700 &self.db_metrics,
701 &self.write_sample_interval,
702 )
703 }
704
705 pub fn flush(&self) -> Result<(), TypedStoreError> {
706 self.db.flush()
707 }
708
709 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
710 let from_buf = be_fix_int_ser(start);
711 let to_buf = be_fix_int_ser(end);
712 self.db
713 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
714 Ok(())
715 }
716
717 pub fn compact_range_raw(
718 &self,
719 cf_name: &str,
720 start: Vec<u8>,
721 end: Vec<u8>,
722 ) -> Result<(), TypedStoreError> {
723 self.db.compact_range_cf(cf_name, Some(start), Some(end));
724 Ok(())
725 }
726
727 #[cfg(tidehunter)]
728 pub fn drop_cells_in_range<J: Serialize>(
729 &self,
730 from_inclusive: &J,
731 to_inclusive: &J,
732 ) -> Result<(), TypedStoreError>
733 where
734 K: Serialize,
735 {
736 let from_buf = be_fix_int_ser(from_inclusive);
737 let to_buf = be_fix_int_ser(to_inclusive);
738 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
739 self.db
740 .drop_cells_in_range(*ks, &from_buf, &to_buf)
741 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
742 }
743 Ok(())
744 }
745
746 #[cfg(tidehunter)]
747 pub fn drop_cells_in_range_raw(
748 &self,
749 from_inclusive: &[u8],
750 to_inclusive: &[u8],
751 ) -> Result<(), TypedStoreError> {
752 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
753 self.db
754 .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
755 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
756 }
757 Ok(())
758 }
759
760 fn multi_get_pinned<J>(
762 &self,
763 keys: impl IntoIterator<Item = J>,
764 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
765 where
766 J: Borrow<K>,
767 K: Serialize,
768 {
769 let _timer = self
770 .db_metrics
771 .op_metrics
772 .rocksdb_multiget_latency_seconds
773 .with_label_values(&[&self.cf])
774 .start_timer();
775 let perf_ctx = if self.multiget_sample_interval.sample() {
776 Some(RocksDBPerfContext)
777 } else {
778 None
779 };
780 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
781 let results: Result<Vec<_>, TypedStoreError> = self
782 .db
783 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
784 .into_iter()
785 .collect();
786 let entries = results?;
787 let entry_size = entries
788 .iter()
789 .flatten()
790 .map(|entry| entry.len())
791 .sum::<usize>();
792 self.db_metrics
793 .op_metrics
794 .rocksdb_multiget_bytes
795 .with_label_values(&[&self.cf])
796 .observe(entry_size as f64);
797 if perf_ctx.is_some() {
798 self.db_metrics
799 .read_perf_ctx_metrics
800 .report_metrics(&self.cf);
801 }
802 Ok(entries)
803 }
804
805 fn get_rocksdb_int_property(
806 rocksdb: &RocksDB,
807 cf: &impl AsColumnFamilyRef,
808 property_name: &std::ffi::CStr,
809 ) -> Result<i64, TypedStoreError> {
810 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
811 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
812 Ok(None) => Ok(0),
813 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
814 }
815 }
816
817 fn report_rocksdb_metrics(
818 database: &Arc<Database>,
819 cf_name: &str,
820 db_metrics: &Arc<DBMetrics>,
821 ) {
822 let Storage::Rocks(rocksdb) = &database.storage else {
823 return;
824 };
825
826 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
827 tracing::warn!(
828 "unable to report metrics for cf {cf_name:?} in db {:?}",
829 database.db_name()
830 );
831 return;
832 };
833
834 db_metrics
835 .cf_metrics
836 .rocksdb_total_sst_files_size
837 .with_label_values(&[cf_name])
838 .set(
839 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
840 .unwrap_or(METRICS_ERROR),
841 );
842 db_metrics
843 .cf_metrics
844 .rocksdb_total_blob_files_size
845 .with_label_values(&[cf_name])
846 .set(
847 Self::get_rocksdb_int_property(
848 rocksdb,
849 &cf,
850 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
851 )
852 .unwrap_or(METRICS_ERROR),
853 );
854 let total_num_files: i64 = (0..=6)
857 .map(|level| {
858 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
859 .unwrap_or(METRICS_ERROR)
860 })
861 .sum();
862 db_metrics
863 .cf_metrics
864 .rocksdb_total_num_files
865 .with_label_values(&[cf_name])
866 .set(total_num_files);
867 db_metrics
868 .cf_metrics
869 .rocksdb_num_level0_files
870 .with_label_values(&[cf_name])
871 .set(
872 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
873 .unwrap_or(METRICS_ERROR),
874 );
875 db_metrics
876 .cf_metrics
877 .rocksdb_current_size_active_mem_tables
878 .with_label_values(&[cf_name])
879 .set(
880 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
881 .unwrap_or(METRICS_ERROR),
882 );
883 db_metrics
884 .cf_metrics
885 .rocksdb_size_all_mem_tables
886 .with_label_values(&[cf_name])
887 .set(
888 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
889 .unwrap_or(METRICS_ERROR),
890 );
891 db_metrics
892 .cf_metrics
893 .rocksdb_num_snapshots
894 .with_label_values(&[cf_name])
895 .set(
896 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
897 .unwrap_or(METRICS_ERROR),
898 );
899 db_metrics
900 .cf_metrics
901 .rocksdb_oldest_snapshot_time
902 .with_label_values(&[cf_name])
903 .set(
904 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
905 .unwrap_or(METRICS_ERROR),
906 );
907 db_metrics
908 .cf_metrics
909 .rocksdb_actual_delayed_write_rate
910 .with_label_values(&[cf_name])
911 .set(
912 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
913 .unwrap_or(METRICS_ERROR),
914 );
915 db_metrics
916 .cf_metrics
917 .rocksdb_is_write_stopped
918 .with_label_values(&[cf_name])
919 .set(
920 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
921 .unwrap_or(METRICS_ERROR),
922 );
923 db_metrics
924 .cf_metrics
925 .rocksdb_block_cache_capacity
926 .with_label_values(&[cf_name])
927 .set(
928 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
929 .unwrap_or(METRICS_ERROR),
930 );
931 db_metrics
932 .cf_metrics
933 .rocksdb_block_cache_usage
934 .with_label_values(&[cf_name])
935 .set(
936 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
937 .unwrap_or(METRICS_ERROR),
938 );
939 db_metrics
940 .cf_metrics
941 .rocksdb_block_cache_pinned_usage
942 .with_label_values(&[cf_name])
943 .set(
944 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
945 .unwrap_or(METRICS_ERROR),
946 );
947 db_metrics
948 .cf_metrics
949 .rocksdb_estimate_table_readers_mem
950 .with_label_values(&[cf_name])
951 .set(
952 Self::get_rocksdb_int_property(
953 rocksdb,
954 &cf,
955 properties::ESTIMATE_TABLE_READERS_MEM,
956 )
957 .unwrap_or(METRICS_ERROR),
958 );
959 db_metrics
960 .cf_metrics
961 .rocksdb_estimated_num_keys
962 .with_label_values(&[cf_name])
963 .set(
964 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
965 .unwrap_or(METRICS_ERROR),
966 );
967 db_metrics
968 .cf_metrics
969 .rocksdb_num_immutable_mem_tables
970 .with_label_values(&[cf_name])
971 .set(
972 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
973 .unwrap_or(METRICS_ERROR),
974 );
975 db_metrics
976 .cf_metrics
977 .rocksdb_mem_table_flush_pending
978 .with_label_values(&[cf_name])
979 .set(
980 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
981 .unwrap_or(METRICS_ERROR),
982 );
983 db_metrics
984 .cf_metrics
985 .rocksdb_compaction_pending
986 .with_label_values(&[cf_name])
987 .set(
988 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
989 .unwrap_or(METRICS_ERROR),
990 );
991 db_metrics
992 .cf_metrics
993 .rocksdb_estimate_pending_compaction_bytes
994 .with_label_values(&[cf_name])
995 .set(
996 Self::get_rocksdb_int_property(
997 rocksdb,
998 &cf,
999 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1000 )
1001 .unwrap_or(METRICS_ERROR),
1002 );
1003 db_metrics
1004 .cf_metrics
1005 .rocksdb_num_running_compactions
1006 .with_label_values(&[cf_name])
1007 .set(
1008 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1009 .unwrap_or(METRICS_ERROR),
1010 );
1011 db_metrics
1012 .cf_metrics
1013 .rocksdb_num_running_flushes
1014 .with_label_values(&[cf_name])
1015 .set(
1016 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1017 .unwrap_or(METRICS_ERROR),
1018 );
1019 db_metrics
1020 .cf_metrics
1021 .rocksdb_estimate_oldest_key_time
1022 .with_label_values(&[cf_name])
1023 .set(
1024 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1025 .unwrap_or(METRICS_ERROR),
1026 );
1027 db_metrics
1028 .cf_metrics
1029 .rocksdb_background_errors
1030 .with_label_values(&[cf_name])
1031 .set(
1032 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1033 .unwrap_or(METRICS_ERROR),
1034 );
1035 db_metrics
1036 .cf_metrics
1037 .rocksdb_base_level
1038 .with_label_values(&[cf_name])
1039 .set(
1040 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1041 .unwrap_or(METRICS_ERROR),
1042 );
1043 }
1044
1045 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1046 self.db.checkpoint(path)
1047 }
1048
1049 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1050 where
1051 K: Serialize + DeserializeOwned,
1052 V: Serialize + DeserializeOwned,
1053 {
1054 let mut num_keys = 0;
1055 let mut key_bytes_total = 0;
1056 let mut value_bytes_total = 0;
1057 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1058 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1059 for item in self.safe_iter() {
1060 let (key, value) = item?;
1061 num_keys += 1;
1062 let key_len = be_fix_int_ser(key.borrow()).len();
1063 let value_len = bcs::to_bytes(value.borrow())?.len();
1064 key_bytes_total += key_len;
1065 value_bytes_total += value_len;
1066 key_hist.record(key_len as u64)?;
1067 value_hist.record(value_len as u64)?;
1068 }
1069 Ok(TableSummary {
1070 num_keys,
1071 key_bytes_total,
1072 value_bytes_total,
1073 key_hist,
1074 value_hist,
1075 })
1076 }
1077
1078 fn start_iter_timer(&self) -> HistogramTimer {
1079 self.db_metrics
1080 .op_metrics
1081 .rocksdb_iter_latency_seconds
1082 .with_label_values(&[&self.cf])
1083 .start_timer()
1084 }
1085
1086 fn create_iter_context(
1088 &self,
1089 ) -> (
1090 Option<HistogramTimer>,
1091 Option<Histogram>,
1092 Option<Histogram>,
1093 Option<RocksDBPerfContext>,
1094 ) {
1095 let timer = self.start_iter_timer();
1096 let bytes_scanned = self
1097 .db_metrics
1098 .op_metrics
1099 .rocksdb_iter_bytes
1100 .with_label_values(&[&self.cf]);
1101 let keys_scanned = self
1102 .db_metrics
1103 .op_metrics
1104 .rocksdb_iter_keys
1105 .with_label_values(&[&self.cf]);
1106 let perf_ctx = if self.iter_sample_interval.sample() {
1107 Some(RocksDBPerfContext)
1108 } else {
1109 None
1110 };
1111 (
1112 Some(timer),
1113 Some(bytes_scanned),
1114 Some(keys_scanned),
1115 perf_ctx,
1116 )
1117 }
1118
1119 #[allow(clippy::complexity)]
1122 pub fn reversed_safe_iter_with_bounds(
1123 &self,
1124 lower_bound: Option<K>,
1125 upper_bound: Option<K>,
1126 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1127 where
1128 K: Serialize + DeserializeOwned,
1129 V: Serialize + DeserializeOwned,
1130 {
1131 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1132 lower_bound
1133 .as_ref()
1134 .map(Bound::Included)
1135 .unwrap_or(Bound::Unbounded),
1136 upper_bound
1137 .as_ref()
1138 .map(Bound::Included)
1139 .unwrap_or(Bound::Unbounded),
1140 ));
1141 match &self.db.storage {
1142 Storage::Rocks(db) => {
1143 let readopts = rocks_util::apply_range_bounds(
1144 self.opts.readopts(),
1145 it_lower_bound,
1146 it_upper_bound,
1147 );
1148 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1149 let db_iter = db
1150 .underlying
1151 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1152 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1153 let iter = SafeIter::new(
1154 self.cf.clone(),
1155 db_iter,
1156 _timer,
1157 _perf_ctx,
1158 bytes_scanned,
1159 keys_scanned,
1160 Some(self.db_metrics.clone()),
1161 );
1162 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1163 }
1164 Storage::InMemory(db) => {
1165 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1166 }
1167 #[cfg(tidehunter)]
1168 Storage::TideHunter(db) => match &self.column_family {
1169 ColumnFamily::TideHunter((ks, prefix)) => {
1170 let mut iter = db.iterator(*ks);
1171 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1172 iter.reverse();
1173 Ok(Box::new(transform_th_iterator(
1174 iter,
1175 prefix,
1176 self.start_iter_timer(),
1177 )))
1178 }
1179 _ => unreachable!("storage backend invariant violation"),
1180 },
1181 }
1182 }
1183}
1184
1185pub enum StorageWriteBatch {
1186 Rocks(rocksdb::WriteBatch),
1187 InMemory(InMemoryBatch),
1188 #[cfg(tidehunter)]
1189 TideHunter(tidehunter::batch::WriteBatch),
1190}
1191
1192struct EntryHeader {
1196 offset: usize,
1198 cf_name_len: usize,
1199 key_len: usize,
1200 is_put: bool,
1201}
1202
1203#[derive(Default)]
1209pub struct StagedBatch {
1210 data: Vec<u8>,
1211 entries: Vec<EntryHeader>,
1212}
1213
1214impl StagedBatch {
1215 pub fn new() -> Self {
1216 Self {
1217 data: Vec::with_capacity(1024),
1218 entries: Vec::with_capacity(16),
1219 }
1220 }
1221
1222 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1223 &mut self,
1224 db: &DBMap<K, V>,
1225 new_vals: impl IntoIterator<Item = (J, U)>,
1226 ) -> Result<&mut Self, TypedStoreError> {
1227 let cf_name = db.cf_name();
1228 new_vals
1229 .into_iter()
1230 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1231 let offset = self.data.len();
1232 self.data.extend_from_slice(cf_name.as_bytes());
1233 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1234 bcs::serialize_into(&mut self.data, v.borrow())
1235 .map_err(typed_store_err_from_bcs_err)?;
1236 self.entries.push(EntryHeader {
1237 offset,
1238 cf_name_len: cf_name.len(),
1239 key_len,
1240 is_put: true,
1241 });
1242 Ok(())
1243 })?;
1244 Ok(self)
1245 }
1246
1247 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1248 &mut self,
1249 db: &DBMap<K, V>,
1250 purged_vals: impl IntoIterator<Item = J>,
1251 ) -> Result<(), TypedStoreError> {
1252 let cf_name = db.cf_name();
1253 purged_vals
1254 .into_iter()
1255 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1256 let offset = self.data.len();
1257 self.data.extend_from_slice(cf_name.as_bytes());
1258 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1259 self.entries.push(EntryHeader {
1260 offset,
1261 cf_name_len: cf_name.len(),
1262 key_len,
1263 is_put: false,
1264 });
1265 Ok(())
1266 })?;
1267 Ok(())
1268 }
1269
1270 pub fn size_in_bytes(&self) -> usize {
1271 self.data.len()
1272 }
1273}
1274
1275pub struct DBBatch {
1327 database: Arc<Database>,
1328 batch: StorageWriteBatch,
1329 db_metrics: Arc<DBMetrics>,
1330 write_sample_interval: SamplingInterval,
1331}
1332
1333impl DBBatch {
1334 pub fn new(
1338 dbref: &Arc<Database>,
1339 batch: StorageWriteBatch,
1340 db_metrics: &Arc<DBMetrics>,
1341 write_sample_interval: &SamplingInterval,
1342 ) -> Self {
1343 DBBatch {
1344 database: dbref.clone(),
1345 batch,
1346 db_metrics: db_metrics.clone(),
1347 write_sample_interval: write_sample_interval.clone(),
1348 }
1349 }
1350
1351 #[instrument(level = "trace", skip_all, err)]
1353 pub fn write(self) -> Result<(), TypedStoreError> {
1354 let mut write_options = rocksdb::WriteOptions::default();
1355
1356 if write_sync_enabled() {
1357 write_options.set_sync(true);
1358 }
1359
1360 self.write_opt(write_options)
1361 }
1362
1363 #[instrument(level = "trace", skip_all, err)]
1365 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1366 let db_name = self.database.db_name();
1367 let timer = self
1368 .db_metrics
1369 .op_metrics
1370 .rocksdb_batch_commit_latency_seconds
1371 .with_label_values(&[&db_name])
1372 .start_timer();
1373 let batch_size = self.size_in_bytes();
1374
1375 let perf_ctx = if self.write_sample_interval.sample() {
1376 Some(RocksDBPerfContext)
1377 } else {
1378 None
1379 };
1380
1381 self.database
1382 .write_opt_internal(self.batch, &write_options)?;
1383
1384 self.db_metrics
1385 .op_metrics
1386 .rocksdb_batch_commit_bytes
1387 .with_label_values(&[&db_name])
1388 .observe(batch_size as f64);
1389
1390 if perf_ctx.is_some() {
1391 self.db_metrics
1392 .write_perf_ctx_metrics
1393 .report_metrics(&db_name);
1394 }
1395 let elapsed = timer.stop_and_record();
1396 if elapsed > 1.0 {
1397 warn!(?elapsed, ?db_name, "very slow batch write");
1398 self.db_metrics
1399 .op_metrics
1400 .rocksdb_very_slow_batch_writes_count
1401 .with_label_values(&[&db_name])
1402 .inc();
1403 self.db_metrics
1404 .op_metrics
1405 .rocksdb_very_slow_batch_writes_duration_ms
1406 .with_label_values(&[&db_name])
1407 .inc_by((elapsed * 1000.0) as u64);
1408 }
1409 Ok(())
1410 }
1411
1412 pub fn size_in_bytes(&self) -> usize {
1413 match self.batch {
1414 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1415 StorageWriteBatch::InMemory(_) => 0,
1416 #[cfg(tidehunter)]
1418 StorageWriteBatch::TideHunter(_) => 0,
1419 }
1420 }
1421
1422 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1424 for raw_batch in raw_batches {
1425 let data = &raw_batch.data;
1426 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1427 let end = raw_batch
1428 .entries
1429 .get(i + 1)
1430 .map_or(data.len(), |next| next.offset);
1431 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1432 let key_start = hdr.offset + hdr.cf_name_len;
1433 let key = &data[key_start..key_start + hdr.key_len];
1434 let cf_name = std::str::from_utf8(cf_bytes)
1436 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1437
1438 if hdr.is_put {
1439 let value = &data[key_start + hdr.key_len..end];
1440 match &mut self.batch {
1441 StorageWriteBatch::Rocks(b) => {
1442 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1443 }
1444 StorageWriteBatch::InMemory(b) => {
1445 b.put_cf(cf_name, key, value);
1446 }
1447 #[cfg(tidehunter)]
1448 _ => {
1449 return Err(TypedStoreError::RocksDBError(
1450 "concat not supported for TideHunter".to_string(),
1451 ));
1452 }
1453 }
1454 } else {
1455 match &mut self.batch {
1456 StorageWriteBatch::Rocks(b) => {
1457 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1458 }
1459 StorageWriteBatch::InMemory(b) => {
1460 b.delete_cf(cf_name, key);
1461 }
1462 #[cfg(tidehunter)]
1463 _ => {
1464 return Err(TypedStoreError::RocksDBError(
1465 "concat not supported for TideHunter".to_string(),
1466 ));
1467 }
1468 }
1469 }
1470 }
1471 }
1472 Ok(self)
1473 }
1474
1475 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1476 &mut self,
1477 db: &DBMap<K, V>,
1478 purged_vals: impl IntoIterator<Item = J>,
1479 ) -> Result<(), TypedStoreError> {
1480 if !Arc::ptr_eq(&db.db, &self.database) {
1481 return Err(TypedStoreError::CrossDBBatch);
1482 }
1483
1484 purged_vals
1485 .into_iter()
1486 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1487 let k_buf = be_fix_int_ser(k.borrow());
1488 match (&mut self.batch, &db.column_family) {
1489 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1490 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1491 }
1492 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1493 b.delete_cf(name, k_buf)
1494 }
1495 #[cfg(tidehunter)]
1496 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1497 b.delete(*ks, transform_th_key(&k_buf, prefix))
1498 }
1499 _ => Err(TypedStoreError::RocksDBError(
1500 "typed store invariant violation".to_string(),
1501 ))?,
1502 }
1503 Ok(())
1504 })?;
1505 Ok(())
1506 }
1507
1508 pub fn schedule_delete_range<K: Serialize, V>(
1518 &mut self,
1519 db: &DBMap<K, V>,
1520 from: &K,
1521 to: &K,
1522 ) -> Result<(), TypedStoreError> {
1523 if !Arc::ptr_eq(&db.db, &self.database) {
1524 return Err(TypedStoreError::CrossDBBatch);
1525 }
1526
1527 let from_buf = be_fix_int_ser(from);
1528 let to_buf = be_fix_int_ser(to);
1529
1530 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1531 b.delete_range_cf(
1532 &rocks_cf_from_db(&self.database, db.cf_name())?,
1533 from_buf,
1534 to_buf,
1535 );
1536 }
1537 Ok(())
1538 }
1539
1540 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1542 &mut self,
1543 db: &DBMap<K, V>,
1544 new_vals: impl IntoIterator<Item = (J, U)>,
1545 ) -> Result<&mut Self, TypedStoreError> {
1546 if !Arc::ptr_eq(&db.db, &self.database) {
1547 return Err(TypedStoreError::CrossDBBatch);
1548 }
1549 let mut total = 0usize;
1550 new_vals
1551 .into_iter()
1552 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1553 let k_buf = be_fix_int_ser(k.borrow());
1554 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1555 total += k_buf.len() + v_buf.len();
1556 if db.opts.log_value_hash {
1557 let key_hash = default_hash(&k_buf);
1558 let value_hash = default_hash(&v_buf);
1559 debug!(
1560 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1561 db.cf_name(),
1562 key_hash,
1563 value_hash
1564 );
1565 }
1566 match (&mut self.batch, &db.column_family) {
1567 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1568 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1569 }
1570 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1571 b.put_cf(name, k_buf, v_buf)
1572 }
1573 #[cfg(tidehunter)]
1574 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1575 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1576 }
1577 _ => Err(TypedStoreError::RocksDBError(
1578 "typed store invariant violation".to_string(),
1579 ))?,
1580 }
1581 Ok(())
1582 })?;
1583 self.db_metrics
1584 .op_metrics
1585 .rocksdb_batch_put_bytes
1586 .with_label_values(&[&db.cf])
1587 .observe(total as f64);
1588 Ok(self)
1589 }
1590
1591 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1592 &mut self,
1593 db: &DBMap<K, V>,
1594 new_vals: impl IntoIterator<Item = (J, U)>,
1595 ) -> Result<&mut Self, TypedStoreError> {
1596 if !Arc::ptr_eq(&db.db, &self.database) {
1597 return Err(TypedStoreError::CrossDBBatch);
1598 }
1599 new_vals
1600 .into_iter()
1601 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1602 let k_buf = be_fix_int_ser(k.borrow());
1603 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1604 match &mut self.batch {
1605 StorageWriteBatch::Rocks(b) => b.merge_cf(
1606 &rocks_cf_from_db(&self.database, db.cf_name())?,
1607 k_buf,
1608 v_buf,
1609 ),
1610 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1611 }
1612 Ok(())
1613 })?;
1614 Ok(self)
1615 }
1616}
1617
1618impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1619where
1620 K: Serialize + DeserializeOwned,
1621 V: Serialize + DeserializeOwned,
1622{
1623 type Error = TypedStoreError;
1624
1625 #[instrument(level = "trace", skip_all, err)]
1626 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1627 let key_buf = be_fix_int_ser(key);
1628 let readopts = self.opts.readopts();
1629 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1630 && self
1631 .db
1632 .get(&self.column_family, &key_buf, &readopts)?
1633 .is_some())
1634 }
1635
1636 #[instrument(level = "trace", skip_all, err)]
1637 fn multi_contains_keys<J>(
1638 &self,
1639 keys: impl IntoIterator<Item = J>,
1640 ) -> Result<Vec<bool>, Self::Error>
1641 where
1642 J: Borrow<K>,
1643 {
1644 let values = self.multi_get_pinned(keys)?;
1645 Ok(values.into_iter().map(|v| v.is_some()).collect())
1646 }
1647
1648 #[instrument(level = "trace", skip_all, err)]
1649 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1650 let _timer = self
1651 .db_metrics
1652 .op_metrics
1653 .rocksdb_get_latency_seconds
1654 .with_label_values(&[&self.cf])
1655 .start_timer();
1656 let perf_ctx = if self.get_sample_interval.sample() {
1657 Some(RocksDBPerfContext)
1658 } else {
1659 None
1660 };
1661 let key_buf = be_fix_int_ser(key);
1662 let res = self
1663 .db
1664 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1665 self.db_metrics
1666 .op_metrics
1667 .rocksdb_get_bytes
1668 .with_label_values(&[&self.cf])
1669 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1670 if perf_ctx.is_some() {
1671 self.db_metrics
1672 .read_perf_ctx_metrics
1673 .report_metrics(&self.cf);
1674 }
1675 match res {
1676 Some(data) => {
1677 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1678 if value.is_err() {
1679 let key_hash = default_hash(&key_buf);
1680 let value_hash = default_hash(&data);
1681 debug_fatal!(
1682 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1683 self.cf_name(),
1684 key_hash,
1685 value_hash,
1686 value.as_ref().err().unwrap()
1687 );
1688 }
1689 Ok(Some(value?))
1690 }
1691 None => Ok(None),
1692 }
1693 }
1694
1695 #[instrument(level = "trace", skip_all, err)]
1696 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1697 let timer = self
1698 .db_metrics
1699 .op_metrics
1700 .rocksdb_put_latency_seconds
1701 .with_label_values(&[&self.cf])
1702 .start_timer();
1703 let perf_ctx = if self.write_sample_interval.sample() {
1704 Some(RocksDBPerfContext)
1705 } else {
1706 None
1707 };
1708 let key_buf = be_fix_int_ser(key);
1709 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1710 self.db_metrics
1711 .op_metrics
1712 .rocksdb_put_bytes
1713 .with_label_values(&[&self.cf])
1714 .observe((key_buf.len() + value_buf.len()) as f64);
1715 if perf_ctx.is_some() {
1716 self.db_metrics
1717 .write_perf_ctx_metrics
1718 .report_metrics(&self.cf);
1719 }
1720 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1721
1722 let elapsed = timer.stop_and_record();
1723 if elapsed > 1.0 {
1724 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1725 self.db_metrics
1726 .op_metrics
1727 .rocksdb_very_slow_puts_count
1728 .with_label_values(&[&self.cf])
1729 .inc();
1730 self.db_metrics
1731 .op_metrics
1732 .rocksdb_very_slow_puts_duration_ms
1733 .with_label_values(&[&self.cf])
1734 .inc_by((elapsed * 1000.0) as u64);
1735 }
1736
1737 Ok(())
1738 }
1739
1740 #[instrument(level = "trace", skip_all, err)]
1741 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1742 let _timer = self
1743 .db_metrics
1744 .op_metrics
1745 .rocksdb_delete_latency_seconds
1746 .with_label_values(&[&self.cf])
1747 .start_timer();
1748 let perf_ctx = if self.write_sample_interval.sample() {
1749 Some(RocksDBPerfContext)
1750 } else {
1751 None
1752 };
1753 let key_buf = be_fix_int_ser(key);
1754 self.db.delete_cf(&self.column_family, key_buf)?;
1755 self.db_metrics
1756 .op_metrics
1757 .rocksdb_deletes
1758 .with_label_values(&[&self.cf])
1759 .inc();
1760 if perf_ctx.is_some() {
1761 self.db_metrics
1762 .write_perf_ctx_metrics
1763 .report_metrics(&self.cf);
1764 }
1765 Ok(())
1766 }
1767
1768 #[instrument(level = "trace", skip_all, err)]
1777 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1778 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1779 let last_key = self
1780 .reversed_safe_iter_with_bounds(None, None)?
1781 .next()
1782 .transpose()?
1783 .map(|(k, _v)| k);
1784 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1785 let mut batch = self.batch();
1786 batch.schedule_delete_range(self, &first_key, &last_key)?;
1787 batch.write()?;
1788 }
1789 Ok(())
1790 }
1791
1792 fn is_empty(&self) -> bool {
1793 self.safe_iter().next().is_none()
1794 }
1795
1796 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1797 match &self.db.storage {
1798 Storage::Rocks(db) => {
1799 let db_iter = db
1800 .underlying
1801 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1802 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1803 Box::new(SafeIter::new(
1804 self.cf.clone(),
1805 db_iter,
1806 _timer,
1807 _perf_ctx,
1808 bytes_scanned,
1809 keys_scanned,
1810 Some(self.db_metrics.clone()),
1811 ))
1812 }
1813 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1814 #[cfg(tidehunter)]
1815 Storage::TideHunter(db) => match &self.column_family {
1816 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1817 db.iterator(*ks),
1818 prefix,
1819 self.start_iter_timer(),
1820 )),
1821 _ => unreachable!("storage backend invariant violation"),
1822 },
1823 }
1824 }
1825
1826 fn safe_iter_with_bounds(
1827 &'a self,
1828 lower_bound: Option<K>,
1829 upper_bound: Option<K>,
1830 ) -> DbIterator<'a, (K, V)> {
1831 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1832 match &self.db.storage {
1833 Storage::Rocks(db) => {
1834 let readopts =
1835 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1836 let db_iter = db
1837 .underlying
1838 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1839 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1840 Box::new(SafeIter::new(
1841 self.cf.clone(),
1842 db_iter,
1843 _timer,
1844 _perf_ctx,
1845 bytes_scanned,
1846 keys_scanned,
1847 Some(self.db_metrics.clone()),
1848 ))
1849 }
1850 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1851 #[cfg(tidehunter)]
1852 Storage::TideHunter(db) => match &self.column_family {
1853 ColumnFamily::TideHunter((ks, prefix)) => {
1854 let mut iter = db.iterator(*ks);
1855 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1856 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1857 }
1858 _ => unreachable!("storage backend invariant violation"),
1859 },
1860 }
1861 }
1862
1863 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1864 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1865 match &self.db.storage {
1866 Storage::Rocks(db) => {
1867 let readopts =
1868 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1869 let db_iter = db
1870 .underlying
1871 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1872 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1873 Box::new(SafeIter::new(
1874 self.cf.clone(),
1875 db_iter,
1876 _timer,
1877 _perf_ctx,
1878 bytes_scanned,
1879 keys_scanned,
1880 Some(self.db_metrics.clone()),
1881 ))
1882 }
1883 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1884 #[cfg(tidehunter)]
1885 Storage::TideHunter(db) => match &self.column_family {
1886 ColumnFamily::TideHunter((ks, prefix)) => {
1887 let mut iter = db.iterator(*ks);
1888 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1889 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1890 }
1891 _ => unreachable!("storage backend invariant violation"),
1892 },
1893 }
1894 }
1895
1896 #[instrument(level = "trace", skip_all, err)]
1898 fn multi_get<J>(
1899 &self,
1900 keys: impl IntoIterator<Item = J>,
1901 ) -> Result<Vec<Option<V>>, TypedStoreError>
1902 where
1903 J: Borrow<K>,
1904 {
1905 let results = self.multi_get_pinned(keys)?;
1906 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1907 .into_iter()
1908 .map(|value_byte| match value_byte {
1909 Some(data) => Ok(Some(
1910 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1911 )),
1912 None => Ok(None),
1913 })
1914 .collect();
1915
1916 values_parsed
1917 }
1918
1919 #[instrument(level = "trace", skip_all, err)]
1921 fn multi_insert<J, U>(
1922 &self,
1923 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1924 ) -> Result<(), Self::Error>
1925 where
1926 J: Borrow<K>,
1927 U: Borrow<V>,
1928 {
1929 let mut batch = self.batch();
1930 batch.insert_batch(self, key_val_pairs)?;
1931 batch.write()
1932 }
1933
1934 #[instrument(level = "trace", skip_all, err)]
1936 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1937 where
1938 J: Borrow<K>,
1939 {
1940 let mut batch = self.batch();
1941 batch.delete_batch(self, keys)?;
1942 batch.write()
1943 }
1944
1945 #[instrument(level = "trace", skip_all, err)]
1947 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1948 if let Storage::Rocks(rocks) = &self.db.storage {
1949 rocks
1950 .underlying
1951 .try_catch_up_with_primary()
1952 .map_err(typed_store_err_from_rocks_err)?;
1953 }
1954 Ok(())
1955 }
1956}
1957
1958#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1960pub fn open_cf_opts<P: AsRef<Path>>(
1961 path: P,
1962 db_options: Option<rocksdb::Options>,
1963 metric_conf: MetricConf,
1964 opt_cfs: &[(&str, rocksdb::Options)],
1965) -> Result<Arc<Database>, TypedStoreError> {
1966 let path = path.as_ref();
1967 ensure_database_type(path, StorageType::Rocks)
1968 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1969 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1978 nondeterministic!({
1979 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1980 options.create_if_missing(true);
1981 options.create_missing_column_families(true);
1982 let rocksdb = {
1983 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1984 &options,
1985 path,
1986 cfs.into_iter()
1987 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1988 )
1989 .map_err(typed_store_err_from_rocks_err)?
1990 };
1991 Ok(Arc::new(Database::new(
1992 Storage::Rocks(RocksDB {
1993 underlying: rocksdb,
1994 }),
1995 metric_conf,
1996 None,
1997 )))
1998 })
1999}
2000
2001pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2003 primary_path: P,
2004 secondary_path: Option<P>,
2005 db_options: Option<rocksdb::Options>,
2006 metric_conf: MetricConf,
2007 opt_cfs: &[(&str, rocksdb::Options)],
2008) -> Result<Arc<Database>, TypedStoreError> {
2009 let primary_path = primary_path.as_ref();
2010 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2011 nondeterministic!({
2013 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2015
2016 fdlimit::raise_fd_limit();
2017 options.set_max_open_files(-1);
2019
2020 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2021 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2022 .ok()
2023 .unwrap_or_default();
2024
2025 let default_db_options = default_db_options();
2026 for cf_key in cfs.iter() {
2028 if !opt_cfs.contains_key(&cf_key[..]) {
2029 opt_cfs.insert(cf_key, default_db_options.options.clone());
2030 }
2031 }
2032
2033 let primary_path = primary_path.to_path_buf();
2034 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2035 let mut s = primary_path.clone();
2036 s.pop();
2037 s.push("SECONDARY");
2038 s.as_path().to_path_buf()
2039 });
2040
2041 ensure_database_type(&primary_path, StorageType::Rocks)
2042 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2043 ensure_database_type(&secondary_path, StorageType::Rocks)
2044 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2045
2046 let rocksdb = {
2047 options.create_if_missing(true);
2048 options.create_missing_column_families(true);
2049 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2050 &options,
2051 &primary_path,
2052 &secondary_path,
2053 opt_cfs
2054 .iter()
2055 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2056 )
2057 .map_err(typed_store_err_from_rocks_err)?;
2058 db.try_catch_up_with_primary()
2059 .map_err(typed_store_err_from_rocks_err)?;
2060 db
2061 };
2062 Ok(Arc::new(Database::new(
2063 Storage::Rocks(RocksDB {
2064 underlying: rocksdb,
2065 }),
2066 metric_conf,
2067 None,
2068 )))
2069 })
2070}
2071
2072pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2078 #[cfg(tidehunter)]
2079 if is_tidehunter_db(&path) {
2080 return safe_drop_tidehunter_db(path, timeout).await;
2081 }
2082 safe_drop_rocksdb(path, timeout).await
2083}
2084
2085async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2086 let mut backoff = backoff::ExponentialBackoff {
2087 max_elapsed_time: Some(timeout),
2088 ..Default::default()
2089 };
2090 loop {
2091 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2092 Ok(()) => return Ok(()),
2093 Err(err) => match backoff.next_backoff() {
2094 Some(duration) => tokio::time::sleep(duration).await,
2095 None => return Err(std::io::Error::other(err)),
2096 },
2097 }
2098 }
2099}
2100
2101#[cfg(tidehunter)]
2102fn is_tidehunter_db(path: &Path) -> bool {
2103 path.join("shape.yaml").exists()
2106}
2107
2108#[cfg(tidehunter)]
2109async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2110 let mut backoff = backoff::ExponentialBackoff {
2111 max_elapsed_time: Some(timeout),
2112 ..Default::default()
2113 };
2114 loop {
2115 match TideHunterDb::drop_db(&path) {
2116 Ok(()) => return Ok(()),
2117 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2118 match backoff.next_backoff() {
2119 Some(duration) => tokio::time::sleep(duration).await,
2120 None => {
2121 warn!(
2122 "Database at {:?} is still locked after timeout ({:?})",
2123 path, timeout
2124 );
2125 return Err(err);
2126 }
2127 }
2128 }
2129 Err(err) => return Err(err),
2130 }
2131 }
2132}
2133
2134fn populate_missing_cfs(
2135 input_cfs: &[(&str, rocksdb::Options)],
2136 path: &Path,
2137) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2138 let mut cfs = vec![];
2139 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2140 let existing_cfs =
2141 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2142 .ok()
2143 .unwrap_or_default();
2144
2145 for cf_name in existing_cfs {
2146 if !input_cf_index.contains(&cf_name[..]) {
2147 cfs.push((cf_name, rocksdb::Options::default()));
2148 }
2149 }
2150 cfs.extend(
2151 input_cfs
2152 .iter()
2153 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2154 );
2155 Ok(cfs)
2156}
2157
2158fn default_hash(value: &[u8]) -> Digest<32> {
2159 let mut hasher = fastcrypto::hash::Blake2b256::default();
2160 hasher.update(value);
2161 hasher.finalize()
2162}