1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn multi_get<I, K>(
231 &self,
232 cf: &ColumnFamily,
233 keys: I,
234 readopts: &ReadOptions,
235 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236 where
237 I: IntoIterator<Item = K>,
238 K: AsRef<[u8]>,
239 {
240 match (&self.storage, cf) {
241 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242 let keys_vec: Vec<K> = keys.into_iter().collect();
243 let res = db.underlying.batched_multi_get_cf_opt(
244 &cf.rocks_cf(db),
245 keys_vec.iter(),
246 false,
247 readopts,
248 );
249 res.into_iter()
250 .map(|r| {
251 r.map_err(typed_store_err_from_rocks_err)
252 .map(|item| item.map(GetResult::Rocks))
253 })
254 .collect()
255 }
256 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257 .multi_get(cf_name, keys)
258 .into_iter()
259 .map(|r| Ok(r.map(GetResult::InMemory)))
260 .collect(),
261 #[cfg(tidehunter)]
262 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => keys
263 .into_iter()
264 .map(|k| {
265 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
266 .map_err(typed_store_error_from_th_error)
267 .map(|item| item.map(|bytes| GetResult::TideHunter(bytes.into_owned())))
268 })
269 .collect(),
270 _ => unreachable!("typed store invariant violation"),
271 }
272 }
273
274 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
275 match &self.storage {
276 Storage::Rocks(db) => db.underlying.drop_cf(name),
277 Storage::InMemory(db) => {
278 db.drop_cf(name);
279 Ok(())
280 }
281 #[cfg(tidehunter)]
282 Storage::TideHunter(_) => {
283 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
284 }
285 }
286 }
287
288 pub fn delete_file_in_range<K: AsRef<[u8]>>(
289 &self,
290 cf: &impl AsColumnFamilyRef,
291 from: K,
292 to: K,
293 ) -> Result<(), rocksdb::Error> {
294 match &self.storage {
295 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
296 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
297 }
298 }
299
300 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
301 fail_point!("delete-cf-before");
302 let ret = match (&self.storage, cf) {
303 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
304 .underlying
305 .delete_cf(&cf.rocks_cf(db), key)
306 .map_err(typed_store_err_from_rocks_err),
307 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
308 db.delete(cf_name, key.as_ref());
309 Ok(())
310 }
311 #[cfg(tidehunter)]
312 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
313 .remove(*ks, transform_th_key(key.as_ref(), prefix))
314 .map_err(typed_store_error_from_th_error),
315 _ => Err(TypedStoreError::RocksDBError(
316 "typed store invariant violation".to_string(),
317 )),
318 };
319 fail_point!("delete-cf-after");
320 #[allow(clippy::let_and_return)]
321 ret
322 }
323
324 pub fn path_for_pruning(&self) -> &Path {
325 match &self.storage {
326 Storage::Rocks(rocks) => rocks.underlying.path(),
327 _ => unimplemented!("method is only supported for rocksdb backend"),
328 }
329 }
330
331 fn put_cf(
332 &self,
333 cf: &ColumnFamily,
334 key: Vec<u8>,
335 value: Vec<u8>,
336 ) -> Result<(), TypedStoreError> {
337 fail_point!("put-cf-before");
338 let ret = match (&self.storage, cf) {
339 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
340 .underlying
341 .put_cf(&cf.rocks_cf(db), key, value)
342 .map_err(typed_store_err_from_rocks_err),
343 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
344 db.put(cf_name, key, value);
345 Ok(())
346 }
347 #[cfg(tidehunter)]
348 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
349 .insert(*ks, transform_th_key(&key, prefix), value)
350 .map_err(typed_store_error_from_th_error),
351 _ => Err(TypedStoreError::RocksDBError(
352 "typed store invariant violation".to_string(),
353 )),
354 };
355 fail_point!("put-cf-after");
356 #[allow(clippy::let_and_return)]
357 ret
358 }
359
360 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
361 &self,
362 cf_name: &str,
363 key: K,
364 readopts: &ReadOptions,
365 ) -> bool {
366 match &self.storage {
367 Storage::Rocks(rocks) => {
370 rocks
371 .underlying
372 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
373 }
374 _ => true,
375 }
376 }
377
378 pub(crate) fn write_opt_internal(
379 &self,
380 batch: StorageWriteBatch,
381 write_options: &rocksdb::WriteOptions,
382 ) -> Result<(), TypedStoreError> {
383 fail_point!("batch-write-before");
384 let ret = match (&self.storage, batch) {
385 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
386 .underlying
387 .write_opt(batch, write_options)
388 .map_err(typed_store_err_from_rocks_err),
389 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
390 db.write(batch);
392 Ok(())
393 }
394 #[cfg(tidehunter)]
395 (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
396 batch.commit().map_err(typed_store_error_from_th_error)
398 }
399 _ => Err(TypedStoreError::RocksDBError(
400 "using invalid batch type for the database".to_string(),
401 )),
402 };
403 fail_point!("batch-write-after");
404 #[allow(clippy::let_and_return)]
405 ret
406 }
407
408 #[cfg(tidehunter)]
409 pub fn start_relocation(&self) -> anyhow::Result<()> {
410 if let Storage::TideHunter(db) = &self.storage {
411 db.start_relocation()?;
412 }
413 Ok(())
414 }
415
416 #[cfg(tidehunter)]
417 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
418 if let Storage::TideHunter(db) = &self.storage {
419 db.force_rebuild_control_region()
420 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
421 }
422 Ok(())
423 }
424
425 #[cfg(tidehunter)]
432 pub fn wait_for_tidehunter_background_threads(self: Arc<Self>) {
433 let strong = Arc::strong_count(&self);
434 if strong != 1 {
435 println!(
436 "WARNING: wait_for_tidehunter_background_threads called with Arc<Database> strong_count={} (expected 1); other clones will keep the inner tidehunter Db alive and the wait may panic on timeout",
437 strong,
438 );
439 }
440 let Storage::TideHunter(th_arc) = &self.storage else {
441 return;
442 };
443 let th_arc = th_arc.clone();
444 drop(self);
445 th_arc.wait_for_background_threads_to_finish();
446 }
447
448 #[cfg(tidehunter)]
449 pub fn drop_cells_in_range(
450 &self,
451 ks: KeySpace,
452 from_inclusive: &[u8],
453 to_inclusive: &[u8],
454 ) -> anyhow::Result<()> {
455 if let Storage::TideHunter(db) = &self.storage {
456 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
457 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
458 } else {
459 panic!("drop_cells_in_range called on non-TideHunter storage");
460 }
461 Ok(())
462 }
463
464 pub fn compact_range_cf<K: AsRef<[u8]>>(
465 &self,
466 cf_name: &str,
467 start: Option<K>,
468 end: Option<K>,
469 ) {
470 if let Storage::Rocks(rocksdb) = &self.storage {
471 rocksdb
472 .underlying
473 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
474 }
475 }
476
477 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
478 if let Storage::Rocks(rocks) = &self.storage {
480 let checkpoint =
481 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
482 checkpoint
483 .create_checkpoint(path)
484 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
485 }
486 Ok(())
487 }
488
489 pub fn get_sampling_interval(&self) -> SamplingInterval {
490 self.metric_conf.read_sample_interval.new_from_self()
491 }
492
493 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
494 self.metric_conf.read_sample_interval.new_from_self()
495 }
496
497 pub fn write_sampling_interval(&self) -> SamplingInterval {
498 self.metric_conf.write_sample_interval.new_from_self()
499 }
500
501 pub fn iter_sampling_interval(&self) -> SamplingInterval {
502 self.metric_conf.iter_sample_interval.new_from_self()
503 }
504
505 fn db_name(&self) -> String {
506 let name = &self.metric_conf.db_name;
507 if name.is_empty() {
508 "default".to_string()
509 } else {
510 name.clone()
511 }
512 }
513
514 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
515 match &self.storage {
516 Storage::Rocks(rocks) => rocks.underlying.live_files(),
517 _ => Ok(vec![]),
518 }
519 }
520}
521
522fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
523 rocks_db
524 .underlying
525 .cf_handle(cf_name)
526 .expect("Map-keying column family should have been checked at DB creation")
527}
528
529fn rocks_cf_from_db<'a>(
530 db: &'a Database,
531 cf_name: &str,
532) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
533 match &db.storage {
534 Storage::Rocks(rocksdb) => Ok(rocksdb
535 .underlying
536 .cf_handle(cf_name)
537 .expect("Map-keying column family should have been checked at DB creation")),
538 _ => Err(TypedStoreError::RocksDBError(
539 "using invalid batch type for the database".to_string(),
540 )),
541 }
542}
543
544#[derive(Debug, Default)]
545pub struct MetricConf {
546 pub db_name: String,
547 pub read_sample_interval: SamplingInterval,
548 pub write_sample_interval: SamplingInterval,
549 pub iter_sample_interval: SamplingInterval,
550 pub enable_th_batch_compression: bool,
553}
554
555impl MetricConf {
556 pub fn new(db_name: &str) -> Self {
557 if db_name.is_empty() {
558 error!("A meaningful db name should be used for metrics reporting.")
559 }
560 Self {
561 db_name: db_name.to_string(),
562 read_sample_interval: SamplingInterval::default(),
563 write_sample_interval: SamplingInterval::default(),
564 iter_sample_interval: SamplingInterval::default(),
565 enable_th_batch_compression: false,
566 }
567 }
568
569 pub fn with_sampling(mut self, read_interval: SamplingInterval) -> Self {
570 self.read_sample_interval = read_interval;
571 self
572 }
573
574 pub fn with_th_batch_compression(mut self) -> Self {
575 self.enable_th_batch_compression = true;
576 self
577 }
578}
579const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
580const METRICS_ERROR: i64 = -1;
581
582#[derive(Clone, Debug)]
584pub struct DBMap<K, V> {
585 pub db: Arc<Database>,
586 _phantom: PhantomData<fn(K) -> V>,
587 column_family: ColumnFamily,
588 cf: String,
590 pub opts: ReadWriteOptions,
591 db_metrics: Arc<DBMetrics>,
592 get_sample_interval: SamplingInterval,
593 multiget_sample_interval: SamplingInterval,
594 write_sample_interval: SamplingInterval,
595 iter_sample_interval: SamplingInterval,
596 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
597}
598
599unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
600
601impl<K, V> DBMap<K, V> {
602 pub(crate) fn new(
603 db: Arc<Database>,
604 opts: &ReadWriteOptions,
605 opt_cf: &str,
606 column_family: ColumnFamily,
607 is_deprecated: bool,
608 ) -> Self {
609 let db_cloned = Arc::downgrade(&db.clone());
610 let db_metrics = DBMetrics::get();
611 let db_metrics_cloned = db_metrics.clone();
612 let cf = opt_cf.to_string();
613
614 let (sender, mut recv) = tokio::sync::oneshot::channel();
615 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
616 tokio::task::spawn(async move {
617 let mut interval =
618 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
619 loop {
620 tokio::select! {
621 _ = interval.tick() => {
622 if let Some(db) = db_cloned.upgrade() {
623 let cf = cf.clone();
624 let db_metrics = db_metrics.clone();
625 if let Err(e) = tokio::task::spawn_blocking(move || {
626 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
627 }).await {
628 error!("Failed to log metrics with error: {}", e);
629 }
630 }
631 }
632 _ = &mut recv => break,
633 }
634 }
635 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
636 });
637 }
638 DBMap {
639 db: db.clone(),
640 opts: opts.clone(),
641 _phantom: PhantomData,
642 column_family,
643 cf: opt_cf.to_string(),
644 db_metrics: db_metrics_cloned,
645 _metrics_task_cancel_handle: Arc::new(sender),
646 get_sample_interval: db.get_sampling_interval(),
647 multiget_sample_interval: db.multiget_sampling_interval(),
648 write_sample_interval: db.write_sampling_interval(),
649 iter_sample_interval: db.iter_sampling_interval(),
650 }
651 }
652
653 #[instrument(level = "debug", skip(db), err)]
656 pub fn reopen(
657 db: &Arc<Database>,
658 opt_cf: Option<&str>,
659 rw_options: &ReadWriteOptions,
660 is_deprecated: bool,
661 ) -> Result<Self, TypedStoreError> {
662 let cf_key = opt_cf
663 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
664 .to_owned();
665 Ok(DBMap::new(
666 db.clone(),
667 rw_options,
668 &cf_key,
669 ColumnFamily::Rocks(cf_key.to_string()),
670 is_deprecated,
671 ))
672 }
673
674 #[cfg(tidehunter)]
675 pub fn reopen_th(
676 db: Arc<Database>,
677 cf_name: &str,
678 ks: KeySpace,
679 prefix: Option<Vec<u8>>,
680 ) -> Self {
681 DBMap::new(
682 db,
683 &ReadWriteOptions::default(),
684 cf_name,
685 ColumnFamily::TideHunter((ks, prefix.clone())),
686 false,
687 )
688 }
689
690 pub fn cf_name(&self) -> &str {
691 &self.cf
692 }
693
694 pub fn batch(&self) -> DBBatch {
695 let batch = match &self.db.storage {
696 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
697 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
698 #[cfg(tidehunter)]
699 Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
700 };
701 DBBatch::new(
702 &self.db,
703 batch,
704 &self.db_metrics,
705 &self.write_sample_interval,
706 )
707 }
708
709 pub fn flush(&self) -> Result<(), TypedStoreError> {
710 self.db.flush()
711 }
712
713 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
714 let from_buf = be_fix_int_ser(start);
715 let to_buf = be_fix_int_ser(end);
716 self.db
717 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
718 Ok(())
719 }
720
721 pub fn compact_range_raw(
722 &self,
723 cf_name: &str,
724 start: Vec<u8>,
725 end: Vec<u8>,
726 ) -> Result<(), TypedStoreError> {
727 self.db.compact_range_cf(cf_name, Some(start), Some(end));
728 Ok(())
729 }
730
731 #[cfg(tidehunter)]
732 pub fn drop_cells_in_range<J: Serialize>(
733 &self,
734 from_inclusive: &J,
735 to_inclusive: &J,
736 ) -> Result<(), TypedStoreError>
737 where
738 K: Serialize,
739 {
740 let from_buf = be_fix_int_ser(from_inclusive);
741 let to_buf = be_fix_int_ser(to_inclusive);
742 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
743 self.db
744 .drop_cells_in_range(*ks, &from_buf, &to_buf)
745 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
746 }
747 Ok(())
748 }
749
750 #[cfg(tidehunter)]
751 pub fn drop_cells_in_range_raw(
752 &self,
753 from_inclusive: &[u8],
754 to_inclusive: &[u8],
755 ) -> Result<(), TypedStoreError> {
756 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
757 self.db
758 .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
759 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
760 }
761 Ok(())
762 }
763
764 fn multi_get_pinned<J>(
766 &self,
767 keys: impl IntoIterator<Item = J>,
768 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
769 where
770 J: Borrow<K>,
771 K: Serialize,
772 {
773 let _timer = self
774 .db_metrics
775 .op_metrics
776 .rocksdb_multiget_latency_seconds
777 .with_label_values(&[&self.cf])
778 .start_timer();
779 let perf_ctx = if self.multiget_sample_interval.sample() {
780 Some(RocksDBPerfContext)
781 } else {
782 None
783 };
784 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
785 let results: Result<Vec<_>, TypedStoreError> = self
786 .db
787 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
788 .into_iter()
789 .collect();
790 let entries = results?;
791 let entry_size = entries
792 .iter()
793 .flatten()
794 .map(|entry| entry.len())
795 .sum::<usize>();
796 self.db_metrics
797 .op_metrics
798 .rocksdb_multiget_bytes
799 .with_label_values(&[&self.cf])
800 .observe(entry_size as f64);
801 if perf_ctx.is_some() {
802 self.db_metrics
803 .read_perf_ctx_metrics
804 .report_metrics(&self.cf);
805 }
806 Ok(entries)
807 }
808
809 fn get_rocksdb_int_property(
810 rocksdb: &RocksDB,
811 cf: &impl AsColumnFamilyRef,
812 property_name: &std::ffi::CStr,
813 ) -> Result<i64, TypedStoreError> {
814 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
815 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
816 Ok(None) => Ok(0),
817 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
818 }
819 }
820
821 fn report_rocksdb_metrics(
822 database: &Arc<Database>,
823 cf_name: &str,
824 db_metrics: &Arc<DBMetrics>,
825 ) {
826 let Storage::Rocks(rocksdb) = &database.storage else {
827 return;
828 };
829
830 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
831 tracing::warn!(
832 "unable to report metrics for cf {cf_name:?} in db {:?}",
833 database.db_name()
834 );
835 return;
836 };
837
838 db_metrics
839 .cf_metrics
840 .rocksdb_total_sst_files_size
841 .with_label_values(&[cf_name])
842 .set(
843 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
844 .unwrap_or(METRICS_ERROR),
845 );
846 db_metrics
847 .cf_metrics
848 .rocksdb_total_blob_files_size
849 .with_label_values(&[cf_name])
850 .set(
851 Self::get_rocksdb_int_property(
852 rocksdb,
853 &cf,
854 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
855 )
856 .unwrap_or(METRICS_ERROR),
857 );
858 let total_num_files: i64 = (0..=6)
861 .map(|level| {
862 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
863 .unwrap_or(METRICS_ERROR)
864 })
865 .sum();
866 db_metrics
867 .cf_metrics
868 .rocksdb_total_num_files
869 .with_label_values(&[cf_name])
870 .set(total_num_files);
871 db_metrics
872 .cf_metrics
873 .rocksdb_num_level0_files
874 .with_label_values(&[cf_name])
875 .set(
876 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
877 .unwrap_or(METRICS_ERROR),
878 );
879 db_metrics
880 .cf_metrics
881 .rocksdb_current_size_active_mem_tables
882 .with_label_values(&[cf_name])
883 .set(
884 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
885 .unwrap_or(METRICS_ERROR),
886 );
887 db_metrics
888 .cf_metrics
889 .rocksdb_size_all_mem_tables
890 .with_label_values(&[cf_name])
891 .set(
892 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
893 .unwrap_or(METRICS_ERROR),
894 );
895 db_metrics
896 .cf_metrics
897 .rocksdb_num_snapshots
898 .with_label_values(&[cf_name])
899 .set(
900 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
901 .unwrap_or(METRICS_ERROR),
902 );
903 db_metrics
904 .cf_metrics
905 .rocksdb_oldest_snapshot_time
906 .with_label_values(&[cf_name])
907 .set(
908 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
909 .unwrap_or(METRICS_ERROR),
910 );
911 db_metrics
912 .cf_metrics
913 .rocksdb_actual_delayed_write_rate
914 .with_label_values(&[cf_name])
915 .set(
916 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
917 .unwrap_or(METRICS_ERROR),
918 );
919 db_metrics
920 .cf_metrics
921 .rocksdb_is_write_stopped
922 .with_label_values(&[cf_name])
923 .set(
924 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
925 .unwrap_or(METRICS_ERROR),
926 );
927 db_metrics
928 .cf_metrics
929 .rocksdb_block_cache_capacity
930 .with_label_values(&[cf_name])
931 .set(
932 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
933 .unwrap_or(METRICS_ERROR),
934 );
935 db_metrics
936 .cf_metrics
937 .rocksdb_block_cache_usage
938 .with_label_values(&[cf_name])
939 .set(
940 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
941 .unwrap_or(METRICS_ERROR),
942 );
943 db_metrics
944 .cf_metrics
945 .rocksdb_block_cache_pinned_usage
946 .with_label_values(&[cf_name])
947 .set(
948 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
949 .unwrap_or(METRICS_ERROR),
950 );
951 db_metrics
952 .cf_metrics
953 .rocksdb_estimate_table_readers_mem
954 .with_label_values(&[cf_name])
955 .set(
956 Self::get_rocksdb_int_property(
957 rocksdb,
958 &cf,
959 properties::ESTIMATE_TABLE_READERS_MEM,
960 )
961 .unwrap_or(METRICS_ERROR),
962 );
963 db_metrics
964 .cf_metrics
965 .rocksdb_estimated_num_keys
966 .with_label_values(&[cf_name])
967 .set(
968 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
969 .unwrap_or(METRICS_ERROR),
970 );
971 db_metrics
972 .cf_metrics
973 .rocksdb_num_immutable_mem_tables
974 .with_label_values(&[cf_name])
975 .set(
976 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
977 .unwrap_or(METRICS_ERROR),
978 );
979 db_metrics
980 .cf_metrics
981 .rocksdb_mem_table_flush_pending
982 .with_label_values(&[cf_name])
983 .set(
984 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
985 .unwrap_or(METRICS_ERROR),
986 );
987 db_metrics
988 .cf_metrics
989 .rocksdb_compaction_pending
990 .with_label_values(&[cf_name])
991 .set(
992 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
993 .unwrap_or(METRICS_ERROR),
994 );
995 db_metrics
996 .cf_metrics
997 .rocksdb_estimate_pending_compaction_bytes
998 .with_label_values(&[cf_name])
999 .set(
1000 Self::get_rocksdb_int_property(
1001 rocksdb,
1002 &cf,
1003 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
1004 )
1005 .unwrap_or(METRICS_ERROR),
1006 );
1007 db_metrics
1008 .cf_metrics
1009 .rocksdb_num_running_compactions
1010 .with_label_values(&[cf_name])
1011 .set(
1012 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
1013 .unwrap_or(METRICS_ERROR),
1014 );
1015 db_metrics
1016 .cf_metrics
1017 .rocksdb_num_running_flushes
1018 .with_label_values(&[cf_name])
1019 .set(
1020 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
1021 .unwrap_or(METRICS_ERROR),
1022 );
1023 db_metrics
1024 .cf_metrics
1025 .rocksdb_estimate_oldest_key_time
1026 .with_label_values(&[cf_name])
1027 .set(
1028 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1029 .unwrap_or(METRICS_ERROR),
1030 );
1031 db_metrics
1032 .cf_metrics
1033 .rocksdb_background_errors
1034 .with_label_values(&[cf_name])
1035 .set(
1036 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1037 .unwrap_or(METRICS_ERROR),
1038 );
1039 db_metrics
1040 .cf_metrics
1041 .rocksdb_base_level
1042 .with_label_values(&[cf_name])
1043 .set(
1044 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1045 .unwrap_or(METRICS_ERROR),
1046 );
1047 }
1048
1049 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1050 self.db.checkpoint(path)
1051 }
1052
1053 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1054 where
1055 K: Serialize + DeserializeOwned,
1056 V: Serialize + DeserializeOwned,
1057 {
1058 let mut num_keys = 0;
1059 let mut key_bytes_total = 0;
1060 let mut value_bytes_total = 0;
1061 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1062 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1063 for item in self.safe_iter() {
1064 let (key, value) = item?;
1065 num_keys += 1;
1066 let key_len = be_fix_int_ser(key.borrow()).len();
1067 let value_len = bcs::to_bytes(value.borrow())?.len();
1068 key_bytes_total += key_len;
1069 value_bytes_total += value_len;
1070 key_hist.record(key_len as u64)?;
1071 value_hist.record(value_len as u64)?;
1072 }
1073 Ok(TableSummary {
1074 num_keys,
1075 key_bytes_total,
1076 value_bytes_total,
1077 key_hist,
1078 value_hist,
1079 })
1080 }
1081
1082 fn start_iter_timer(&self) -> HistogramTimer {
1083 self.db_metrics
1084 .op_metrics
1085 .rocksdb_iter_latency_seconds
1086 .with_label_values(&[&self.cf])
1087 .start_timer()
1088 }
1089
1090 fn create_iter_context(
1092 &self,
1093 ) -> (
1094 Option<HistogramTimer>,
1095 Option<Histogram>,
1096 Option<Histogram>,
1097 Option<RocksDBPerfContext>,
1098 ) {
1099 let timer = self.start_iter_timer();
1100 let bytes_scanned = self
1101 .db_metrics
1102 .op_metrics
1103 .rocksdb_iter_bytes
1104 .with_label_values(&[&self.cf]);
1105 let keys_scanned = self
1106 .db_metrics
1107 .op_metrics
1108 .rocksdb_iter_keys
1109 .with_label_values(&[&self.cf]);
1110 let perf_ctx = if self.iter_sample_interval.sample() {
1111 Some(RocksDBPerfContext)
1112 } else {
1113 None
1114 };
1115 (
1116 Some(timer),
1117 Some(bytes_scanned),
1118 Some(keys_scanned),
1119 perf_ctx,
1120 )
1121 }
1122
1123 #[allow(clippy::complexity)]
1126 pub fn reversed_safe_iter_with_bounds(
1127 &self,
1128 lower_bound: Option<K>,
1129 upper_bound: Option<K>,
1130 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1131 where
1132 K: Serialize + DeserializeOwned,
1133 V: Serialize + DeserializeOwned,
1134 {
1135 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1136 lower_bound
1137 .as_ref()
1138 .map(Bound::Included)
1139 .unwrap_or(Bound::Unbounded),
1140 upper_bound
1141 .as_ref()
1142 .map(Bound::Included)
1143 .unwrap_or(Bound::Unbounded),
1144 ));
1145 match &self.db.storage {
1146 Storage::Rocks(db) => {
1147 let readopts = rocks_util::apply_range_bounds(
1148 self.opts.readopts(),
1149 it_lower_bound,
1150 it_upper_bound,
1151 );
1152 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1153 let db_iter = db
1154 .underlying
1155 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1156 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1157 let iter = SafeIter::new(
1158 self.cf.clone(),
1159 db_iter,
1160 _timer,
1161 _perf_ctx,
1162 bytes_scanned,
1163 keys_scanned,
1164 Some(self.db_metrics.clone()),
1165 );
1166 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1167 }
1168 Storage::InMemory(db) => {
1169 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1170 }
1171 #[cfg(tidehunter)]
1172 Storage::TideHunter(db) => match &self.column_family {
1173 ColumnFamily::TideHunter((ks, prefix)) => {
1174 let mut iter = db.iterator(*ks);
1175 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound, prefix);
1176 iter.reverse();
1177 Ok(Box::new(transform_th_iterator(
1178 iter,
1179 prefix,
1180 self.start_iter_timer(),
1181 )))
1182 }
1183 _ => unreachable!("storage backend invariant violation"),
1184 },
1185 }
1186 }
1187}
1188
1189pub enum StorageWriteBatch {
1190 Rocks(rocksdb::WriteBatch),
1191 InMemory(InMemoryBatch),
1192 #[cfg(tidehunter)]
1193 TideHunter(tidehunter::batch::WriteBatch),
1194}
1195
1196struct EntryHeader {
1200 offset: usize,
1202 cf_name_len: usize,
1203 key_len: usize,
1204 is_put: bool,
1205}
1206
1207#[derive(Default)]
1213pub struct StagedBatch {
1214 data: Vec<u8>,
1215 entries: Vec<EntryHeader>,
1216}
1217
1218impl StagedBatch {
1219 pub fn new() -> Self {
1220 Self {
1221 data: Vec::with_capacity(1024),
1222 entries: Vec::with_capacity(16),
1223 }
1224 }
1225
1226 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1227 &mut self,
1228 db: &DBMap<K, V>,
1229 new_vals: impl IntoIterator<Item = (J, U)>,
1230 ) -> Result<&mut Self, TypedStoreError> {
1231 let cf_name = db.cf_name();
1232 new_vals
1233 .into_iter()
1234 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1235 let offset = self.data.len();
1236 self.data.extend_from_slice(cf_name.as_bytes());
1237 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1238 bcs::serialize_into(&mut self.data, v.borrow())
1239 .map_err(typed_store_err_from_bcs_err)?;
1240 self.entries.push(EntryHeader {
1241 offset,
1242 cf_name_len: cf_name.len(),
1243 key_len,
1244 is_put: true,
1245 });
1246 Ok(())
1247 })?;
1248 Ok(self)
1249 }
1250
1251 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1252 &mut self,
1253 db: &DBMap<K, V>,
1254 purged_vals: impl IntoIterator<Item = J>,
1255 ) -> Result<(), TypedStoreError> {
1256 let cf_name = db.cf_name();
1257 purged_vals
1258 .into_iter()
1259 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1260 let offset = self.data.len();
1261 self.data.extend_from_slice(cf_name.as_bytes());
1262 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1263 self.entries.push(EntryHeader {
1264 offset,
1265 cf_name_len: cf_name.len(),
1266 key_len,
1267 is_put: false,
1268 });
1269 Ok(())
1270 })?;
1271 Ok(())
1272 }
1273
1274 pub fn size_in_bytes(&self) -> usize {
1275 self.data.len()
1276 }
1277}
1278
1279pub struct DBBatch {
1331 database: Arc<Database>,
1332 batch: StorageWriteBatch,
1333 db_metrics: Arc<DBMetrics>,
1334 write_sample_interval: SamplingInterval,
1335}
1336
1337impl DBBatch {
1338 pub fn new(
1342 dbref: &Arc<Database>,
1343 batch: StorageWriteBatch,
1344 db_metrics: &Arc<DBMetrics>,
1345 write_sample_interval: &SamplingInterval,
1346 ) -> Self {
1347 DBBatch {
1348 database: dbref.clone(),
1349 batch,
1350 db_metrics: db_metrics.clone(),
1351 write_sample_interval: write_sample_interval.clone(),
1352 }
1353 }
1354
1355 #[instrument(level = "trace", skip_all, err)]
1357 pub fn write(self) -> Result<(), TypedStoreError> {
1358 let mut write_options = rocksdb::WriteOptions::default();
1359
1360 if write_sync_enabled() {
1361 write_options.set_sync(true);
1362 }
1363
1364 self.write_opt(write_options)
1365 }
1366
1367 #[instrument(level = "trace", skip_all, err)]
1369 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1370 let db_name = self.database.db_name();
1371 let timer = self
1372 .db_metrics
1373 .op_metrics
1374 .rocksdb_batch_commit_latency_seconds
1375 .with_label_values(&[&db_name])
1376 .start_timer();
1377 let batch_size = self.size_in_bytes();
1378
1379 let perf_ctx = if self.write_sample_interval.sample() {
1380 Some(RocksDBPerfContext)
1381 } else {
1382 None
1383 };
1384
1385 self.database
1386 .write_opt_internal(self.batch, &write_options)?;
1387
1388 self.db_metrics
1389 .op_metrics
1390 .rocksdb_batch_commit_bytes
1391 .with_label_values(&[&db_name])
1392 .observe(batch_size as f64);
1393
1394 if perf_ctx.is_some() {
1395 self.db_metrics
1396 .write_perf_ctx_metrics
1397 .report_metrics(&db_name);
1398 }
1399 let elapsed = timer.stop_and_record();
1400 if elapsed > 1.0 {
1401 warn!(?elapsed, ?db_name, "very slow batch write");
1402 self.db_metrics
1403 .op_metrics
1404 .rocksdb_very_slow_batch_writes_count
1405 .with_label_values(&[&db_name])
1406 .inc();
1407 self.db_metrics
1408 .op_metrics
1409 .rocksdb_very_slow_batch_writes_duration_ms
1410 .with_label_values(&[&db_name])
1411 .inc_by((elapsed * 1000.0) as u64);
1412 }
1413 Ok(())
1414 }
1415
1416 pub fn size_in_bytes(&self) -> usize {
1417 match self.batch {
1418 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1419 StorageWriteBatch::InMemory(_) => 0,
1420 #[cfg(tidehunter)]
1422 StorageWriteBatch::TideHunter(_) => 0,
1423 }
1424 }
1425
1426 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1428 for raw_batch in raw_batches {
1429 let data = &raw_batch.data;
1430 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1431 let end = raw_batch
1432 .entries
1433 .get(i + 1)
1434 .map_or(data.len(), |next| next.offset);
1435 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1436 let key_start = hdr.offset + hdr.cf_name_len;
1437 let key = &data[key_start..key_start + hdr.key_len];
1438 let cf_name = std::str::from_utf8(cf_bytes)
1440 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1441
1442 if hdr.is_put {
1443 let value = &data[key_start + hdr.key_len..end];
1444 match &mut self.batch {
1445 StorageWriteBatch::Rocks(b) => {
1446 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1447 }
1448 StorageWriteBatch::InMemory(b) => {
1449 b.put_cf(cf_name, key, value);
1450 }
1451 #[cfg(tidehunter)]
1452 _ => {
1453 return Err(TypedStoreError::RocksDBError(
1454 "concat not supported for TideHunter".to_string(),
1455 ));
1456 }
1457 }
1458 } else {
1459 match &mut self.batch {
1460 StorageWriteBatch::Rocks(b) => {
1461 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1462 }
1463 StorageWriteBatch::InMemory(b) => {
1464 b.delete_cf(cf_name, key);
1465 }
1466 #[cfg(tidehunter)]
1467 _ => {
1468 return Err(TypedStoreError::RocksDBError(
1469 "concat not supported for TideHunter".to_string(),
1470 ));
1471 }
1472 }
1473 }
1474 }
1475 }
1476 Ok(self)
1477 }
1478
1479 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1480 &mut self,
1481 db: &DBMap<K, V>,
1482 purged_vals: impl IntoIterator<Item = J>,
1483 ) -> Result<(), TypedStoreError> {
1484 if !Arc::ptr_eq(&db.db, &self.database) {
1485 return Err(TypedStoreError::CrossDBBatch);
1486 }
1487
1488 purged_vals
1489 .into_iter()
1490 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1491 let k_buf = be_fix_int_ser(k.borrow());
1492 match (&mut self.batch, &db.column_family) {
1493 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1494 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1495 }
1496 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1497 b.delete_cf(name, k_buf)
1498 }
1499 #[cfg(tidehunter)]
1500 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1501 b.delete(*ks, transform_th_key(&k_buf, prefix))
1502 }
1503 _ => Err(TypedStoreError::RocksDBError(
1504 "typed store invariant violation".to_string(),
1505 ))?,
1506 }
1507 Ok(())
1508 })?;
1509 Ok(())
1510 }
1511
1512 pub fn schedule_delete_range<K: Serialize, V>(
1522 &mut self,
1523 db: &DBMap<K, V>,
1524 from: &K,
1525 to: &K,
1526 ) -> Result<(), TypedStoreError> {
1527 if !Arc::ptr_eq(&db.db, &self.database) {
1528 return Err(TypedStoreError::CrossDBBatch);
1529 }
1530
1531 let from_buf = be_fix_int_ser(from);
1532 let to_buf = be_fix_int_ser(to);
1533
1534 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1535 b.delete_range_cf(
1536 &rocks_cf_from_db(&self.database, db.cf_name())?,
1537 from_buf,
1538 to_buf,
1539 );
1540 }
1541 Ok(())
1542 }
1543
1544 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1546 &mut self,
1547 db: &DBMap<K, V>,
1548 new_vals: impl IntoIterator<Item = (J, U)>,
1549 ) -> Result<&mut Self, TypedStoreError> {
1550 if !Arc::ptr_eq(&db.db, &self.database) {
1551 return Err(TypedStoreError::CrossDBBatch);
1552 }
1553 let mut total = 0usize;
1554 new_vals
1555 .into_iter()
1556 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1557 let k_buf = be_fix_int_ser(k.borrow());
1558 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1559 total += k_buf.len() + v_buf.len();
1560 if db.opts.log_value_hash {
1561 let key_hash = default_hash(&k_buf);
1562 let value_hash = default_hash(&v_buf);
1563 debug!(
1564 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1565 db.cf_name(),
1566 key_hash,
1567 value_hash
1568 );
1569 }
1570 match (&mut self.batch, &db.column_family) {
1571 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1572 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1573 }
1574 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1575 b.put_cf(name, k_buf, v_buf)
1576 }
1577 #[cfg(tidehunter)]
1578 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1579 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1580 }
1581 _ => Err(TypedStoreError::RocksDBError(
1582 "typed store invariant violation".to_string(),
1583 ))?,
1584 }
1585 Ok(())
1586 })?;
1587 self.db_metrics
1588 .op_metrics
1589 .rocksdb_batch_put_bytes
1590 .with_label_values(&[&db.cf])
1591 .observe(total as f64);
1592 Ok(self)
1593 }
1594
1595 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1596 &mut self,
1597 db: &DBMap<K, V>,
1598 new_vals: impl IntoIterator<Item = (J, U)>,
1599 ) -> Result<&mut Self, TypedStoreError> {
1600 if !Arc::ptr_eq(&db.db, &self.database) {
1601 return Err(TypedStoreError::CrossDBBatch);
1602 }
1603 new_vals
1604 .into_iter()
1605 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1606 let k_buf = be_fix_int_ser(k.borrow());
1607 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1608 match &mut self.batch {
1609 StorageWriteBatch::Rocks(b) => b.merge_cf(
1610 &rocks_cf_from_db(&self.database, db.cf_name())?,
1611 k_buf,
1612 v_buf,
1613 ),
1614 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1615 }
1616 Ok(())
1617 })?;
1618 Ok(self)
1619 }
1620}
1621
1622impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1623where
1624 K: Serialize + DeserializeOwned,
1625 V: Serialize + DeserializeOwned,
1626{
1627 type Error = TypedStoreError;
1628
1629 #[instrument(level = "trace", skip_all, err)]
1630 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1631 let key_buf = be_fix_int_ser(key);
1632 let readopts = self.opts.readopts();
1633 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1634 && self
1635 .db
1636 .get(&self.column_family, &key_buf, &readopts)?
1637 .is_some())
1638 }
1639
1640 #[instrument(level = "trace", skip_all, err)]
1641 fn multi_contains_keys<J>(
1642 &self,
1643 keys: impl IntoIterator<Item = J>,
1644 ) -> Result<Vec<bool>, Self::Error>
1645 where
1646 J: Borrow<K>,
1647 {
1648 let values = self.multi_get_pinned(keys)?;
1649 Ok(values.into_iter().map(|v| v.is_some()).collect())
1650 }
1651
1652 #[instrument(level = "trace", skip_all, err)]
1653 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1654 let _timer = self
1655 .db_metrics
1656 .op_metrics
1657 .rocksdb_get_latency_seconds
1658 .with_label_values(&[&self.cf])
1659 .start_timer();
1660 let perf_ctx = if self.get_sample_interval.sample() {
1661 Some(RocksDBPerfContext)
1662 } else {
1663 None
1664 };
1665 let key_buf = be_fix_int_ser(key);
1666 let res = self
1667 .db
1668 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1669 self.db_metrics
1670 .op_metrics
1671 .rocksdb_get_bytes
1672 .with_label_values(&[&self.cf])
1673 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1674 if perf_ctx.is_some() {
1675 self.db_metrics
1676 .read_perf_ctx_metrics
1677 .report_metrics(&self.cf);
1678 }
1679 match res {
1680 Some(data) => {
1681 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1682 if value.is_err() {
1683 let key_hash = default_hash(&key_buf);
1684 let value_hash = default_hash(&data);
1685 debug_fatal!(
1686 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1687 self.cf_name(),
1688 key_hash,
1689 value_hash,
1690 value.as_ref().err().unwrap()
1691 );
1692 }
1693 Ok(Some(value?))
1694 }
1695 None => Ok(None),
1696 }
1697 }
1698
1699 #[instrument(level = "trace", skip_all, err)]
1700 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1701 let timer = self
1702 .db_metrics
1703 .op_metrics
1704 .rocksdb_put_latency_seconds
1705 .with_label_values(&[&self.cf])
1706 .start_timer();
1707 let perf_ctx = if self.write_sample_interval.sample() {
1708 Some(RocksDBPerfContext)
1709 } else {
1710 None
1711 };
1712 let key_buf = be_fix_int_ser(key);
1713 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1714 self.db_metrics
1715 .op_metrics
1716 .rocksdb_put_bytes
1717 .with_label_values(&[&self.cf])
1718 .observe((key_buf.len() + value_buf.len()) as f64);
1719 if perf_ctx.is_some() {
1720 self.db_metrics
1721 .write_perf_ctx_metrics
1722 .report_metrics(&self.cf);
1723 }
1724 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1725
1726 let elapsed = timer.stop_and_record();
1727 if elapsed > 1.0 {
1728 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1729 self.db_metrics
1730 .op_metrics
1731 .rocksdb_very_slow_puts_count
1732 .with_label_values(&[&self.cf])
1733 .inc();
1734 self.db_metrics
1735 .op_metrics
1736 .rocksdb_very_slow_puts_duration_ms
1737 .with_label_values(&[&self.cf])
1738 .inc_by((elapsed * 1000.0) as u64);
1739 }
1740
1741 Ok(())
1742 }
1743
1744 #[instrument(level = "trace", skip_all, err)]
1745 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1746 let _timer = self
1747 .db_metrics
1748 .op_metrics
1749 .rocksdb_delete_latency_seconds
1750 .with_label_values(&[&self.cf])
1751 .start_timer();
1752 let perf_ctx = if self.write_sample_interval.sample() {
1753 Some(RocksDBPerfContext)
1754 } else {
1755 None
1756 };
1757 let key_buf = be_fix_int_ser(key);
1758 self.db.delete_cf(&self.column_family, key_buf)?;
1759 self.db_metrics
1760 .op_metrics
1761 .rocksdb_deletes
1762 .with_label_values(&[&self.cf])
1763 .inc();
1764 if perf_ctx.is_some() {
1765 self.db_metrics
1766 .write_perf_ctx_metrics
1767 .report_metrics(&self.cf);
1768 }
1769 Ok(())
1770 }
1771
1772 #[instrument(level = "trace", skip_all, err)]
1781 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1782 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1783 let last_key = self
1784 .reversed_safe_iter_with_bounds(None, None)?
1785 .next()
1786 .transpose()?
1787 .map(|(k, _v)| k);
1788 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1789 let mut batch = self.batch();
1790 batch.schedule_delete_range(self, &first_key, &last_key)?;
1791 batch.write()?;
1792 }
1793 Ok(())
1794 }
1795
1796 fn is_empty(&self) -> bool {
1797 self.safe_iter().next().is_none()
1798 }
1799
1800 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1801 match &self.db.storage {
1802 Storage::Rocks(db) => {
1803 let db_iter = db
1804 .underlying
1805 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1806 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1807 Box::new(SafeIter::new(
1808 self.cf.clone(),
1809 db_iter,
1810 _timer,
1811 _perf_ctx,
1812 bytes_scanned,
1813 keys_scanned,
1814 Some(self.db_metrics.clone()),
1815 ))
1816 }
1817 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1818 #[cfg(tidehunter)]
1819 Storage::TideHunter(db) => match &self.column_family {
1820 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1821 db.iterator(*ks),
1822 prefix,
1823 self.start_iter_timer(),
1824 )),
1825 _ => unreachable!("storage backend invariant violation"),
1826 },
1827 }
1828 }
1829
1830 fn safe_iter_with_bounds(
1831 &'a self,
1832 lower_bound: Option<K>,
1833 upper_bound: Option<K>,
1834 ) -> DbIterator<'a, (K, V)> {
1835 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1836 match &self.db.storage {
1837 Storage::Rocks(db) => {
1838 let readopts =
1839 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1840 let db_iter = db
1841 .underlying
1842 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1843 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1844 Box::new(SafeIter::new(
1845 self.cf.clone(),
1846 db_iter,
1847 _timer,
1848 _perf_ctx,
1849 bytes_scanned,
1850 keys_scanned,
1851 Some(self.db_metrics.clone()),
1852 ))
1853 }
1854 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1855 #[cfg(tidehunter)]
1856 Storage::TideHunter(db) => match &self.column_family {
1857 ColumnFamily::TideHunter((ks, prefix)) => {
1858 let mut iter = db.iterator(*ks);
1859 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1860 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1861 }
1862 _ => unreachable!("storage backend invariant violation"),
1863 },
1864 }
1865 }
1866
1867 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1868 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1869 match &self.db.storage {
1870 Storage::Rocks(db) => {
1871 let readopts =
1872 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1873 let db_iter = db
1874 .underlying
1875 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1876 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1877 Box::new(SafeIter::new(
1878 self.cf.clone(),
1879 db_iter,
1880 _timer,
1881 _perf_ctx,
1882 bytes_scanned,
1883 keys_scanned,
1884 Some(self.db_metrics.clone()),
1885 ))
1886 }
1887 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1888 #[cfg(tidehunter)]
1889 Storage::TideHunter(db) => match &self.column_family {
1890 ColumnFamily::TideHunter((ks, prefix)) => {
1891 let mut iter = db.iterator(*ks);
1892 apply_range_bounds(&mut iter, lower_bound, upper_bound, prefix);
1893 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1894 }
1895 _ => unreachable!("storage backend invariant violation"),
1896 },
1897 }
1898 }
1899
1900 #[instrument(level = "trace", skip_all, err)]
1902 fn multi_get<J>(
1903 &self,
1904 keys: impl IntoIterator<Item = J>,
1905 ) -> Result<Vec<Option<V>>, TypedStoreError>
1906 where
1907 J: Borrow<K>,
1908 {
1909 let results = self.multi_get_pinned(keys)?;
1910 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1911 .into_iter()
1912 .map(|value_byte| match value_byte {
1913 Some(data) => Ok(Some(
1914 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1915 )),
1916 None => Ok(None),
1917 })
1918 .collect();
1919
1920 values_parsed
1921 }
1922
1923 #[instrument(level = "trace", skip_all, err)]
1925 fn multi_insert<J, U>(
1926 &self,
1927 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1928 ) -> Result<(), Self::Error>
1929 where
1930 J: Borrow<K>,
1931 U: Borrow<V>,
1932 {
1933 let mut batch = self.batch();
1934 batch.insert_batch(self, key_val_pairs)?;
1935 batch.write()
1936 }
1937
1938 #[instrument(level = "trace", skip_all, err)]
1940 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1941 where
1942 J: Borrow<K>,
1943 {
1944 let mut batch = self.batch();
1945 batch.delete_batch(self, keys)?;
1946 batch.write()
1947 }
1948
1949 #[instrument(level = "trace", skip_all, err)]
1951 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1952 if let Storage::Rocks(rocks) = &self.db.storage {
1953 rocks
1954 .underlying
1955 .try_catch_up_with_primary()
1956 .map_err(typed_store_err_from_rocks_err)?;
1957 }
1958 Ok(())
1959 }
1960}
1961
1962#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1964pub fn open_cf_opts<P: AsRef<Path>>(
1965 path: P,
1966 db_options: Option<rocksdb::Options>,
1967 metric_conf: MetricConf,
1968 opt_cfs: &[(&str, rocksdb::Options)],
1969) -> Result<Arc<Database>, TypedStoreError> {
1970 let path = path.as_ref();
1971 ensure_database_type(path, StorageType::Rocks)
1972 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1973 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1982 nondeterministic!({
1983 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1984 options.create_if_missing(true);
1985 options.create_missing_column_families(true);
1986 let rocksdb = {
1987 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1988 &options,
1989 path,
1990 cfs.into_iter()
1991 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1992 )
1993 .map_err(typed_store_err_from_rocks_err)?
1994 };
1995 Ok(Arc::new(Database::new(
1996 Storage::Rocks(RocksDB {
1997 underlying: rocksdb,
1998 }),
1999 metric_conf,
2000 None,
2001 )))
2002 })
2003}
2004
2005pub fn open_cf_opts_secondary<P: AsRef<Path>>(
2007 primary_path: P,
2008 secondary_path: Option<P>,
2009 db_options: Option<rocksdb::Options>,
2010 metric_conf: MetricConf,
2011 opt_cfs: &[(&str, rocksdb::Options)],
2012) -> Result<Arc<Database>, TypedStoreError> {
2013 let primary_path = primary_path.as_ref();
2014 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
2015 nondeterministic!({
2017 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
2019
2020 fdlimit::raise_fd_limit();
2021 options.set_max_open_files(-1);
2023
2024 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
2025 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
2026 .ok()
2027 .unwrap_or_default();
2028
2029 let default_db_options = default_db_options();
2030 for cf_key in cfs.iter() {
2032 if !opt_cfs.contains_key(&cf_key[..]) {
2033 opt_cfs.insert(cf_key, default_db_options.options.clone());
2034 }
2035 }
2036
2037 let primary_path = primary_path.to_path_buf();
2038 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2039 let mut s = primary_path.clone();
2040 s.pop();
2041 s.push("SECONDARY");
2042 s.as_path().to_path_buf()
2043 });
2044
2045 ensure_database_type(&primary_path, StorageType::Rocks)
2046 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2047 ensure_database_type(&secondary_path, StorageType::Rocks)
2048 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2049
2050 let rocksdb = {
2051 options.create_if_missing(true);
2052 options.create_missing_column_families(true);
2053 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2054 &options,
2055 &primary_path,
2056 &secondary_path,
2057 opt_cfs
2058 .iter()
2059 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2060 )
2061 .map_err(typed_store_err_from_rocks_err)?;
2062 db.try_catch_up_with_primary()
2063 .map_err(typed_store_err_from_rocks_err)?;
2064 db
2065 };
2066 Ok(Arc::new(Database::new(
2067 Storage::Rocks(RocksDB {
2068 underlying: rocksdb,
2069 }),
2070 metric_conf,
2071 None,
2072 )))
2073 })
2074}
2075
2076pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2082 #[cfg(tidehunter)]
2083 if is_tidehunter_db(&path) {
2084 return safe_drop_tidehunter_db(path, timeout).await;
2085 }
2086 safe_drop_rocksdb(path, timeout).await
2087}
2088
2089async fn safe_drop_rocksdb(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2090 let mut backoff = backoff::ExponentialBackoff {
2091 max_elapsed_time: Some(timeout),
2092 ..Default::default()
2093 };
2094 loop {
2095 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2096 Ok(()) => return Ok(()),
2097 Err(err) => match backoff.next_backoff() {
2098 Some(duration) => tokio::time::sleep(duration).await,
2099 None => return Err(std::io::Error::other(err)),
2100 },
2101 }
2102 }
2103}
2104
2105#[cfg(tidehunter)]
2106fn is_tidehunter_db(path: &Path) -> bool {
2107 path.join("shape.yaml").exists()
2110}
2111
2112#[cfg(tidehunter)]
2113async fn safe_drop_tidehunter_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2114 let mut backoff = backoff::ExponentialBackoff {
2115 max_elapsed_time: Some(timeout),
2116 ..Default::default()
2117 };
2118 loop {
2119 match TideHunterDb::drop_db(&path) {
2120 Ok(()) => return Ok(()),
2121 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2122 match backoff.next_backoff() {
2123 Some(duration) => tokio::time::sleep(duration).await,
2124 None => {
2125 warn!(
2126 "Database at {:?} is still locked after timeout ({:?})",
2127 path, timeout
2128 );
2129 return Err(err);
2130 }
2131 }
2132 }
2133 Err(err) => return Err(err),
2134 }
2135 }
2136}
2137
2138fn populate_missing_cfs(
2139 input_cfs: &[(&str, rocksdb::Options)],
2140 path: &Path,
2141) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2142 let mut cfs = vec![];
2143 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2144 let existing_cfs =
2145 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2146 .ok()
2147 .unwrap_or_default();
2148
2149 for cf_name in existing_cfs {
2150 if !input_cf_index.contains(&cf_name[..]) {
2151 cfs.push((cf_name, rocksdb::Options::default()));
2152 }
2153 }
2154 cfs.extend(
2155 input_cfs
2156 .iter()
2157 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2158 );
2159 Ok(cfs)
2160}
2161
2162fn default_hash(value: &[u8]) -> Digest<32> {
2163 let mut hasher = fastcrypto::hash::Blake2b256::default();
2164 hasher.update(value);
2165 hasher.finalize()
2166}