1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn multi_get<I, K>(
231 &self,
232 cf: &ColumnFamily,
233 keys: I,
234 readopts: &ReadOptions,
235 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236 where
237 I: IntoIterator<Item = K>,
238 K: AsRef<[u8]>,
239 {
240 match (&self.storage, cf) {
241 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242 let keys_vec: Vec<K> = keys.into_iter().collect();
243 let res = db.underlying.batched_multi_get_cf_opt(
244 &cf.rocks_cf(db),
245 keys_vec.iter(),
246 false,
247 readopts,
248 );
249 res.into_iter()
250 .map(|r| {
251 r.map_err(typed_store_err_from_rocks_err)
252 .map(|item| item.map(GetResult::Rocks))
253 })
254 .collect()
255 }
256 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257 .multi_get(cf_name, keys)
258 .into_iter()
259 .map(|r| Ok(r.map(GetResult::InMemory)))
260 .collect(),
261 #[cfg(tidehunter)]
262 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263 let res = keys.into_iter().map(|k| {
264 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265 .map_err(typed_store_error_from_th_error)
266 });
267 res.into_iter()
268 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269 .collect()
270 }
271 _ => unreachable!("typed store invariant violation"),
272 }
273 }
274
275 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276 match &self.storage {
277 Storage::Rocks(db) => db.underlying.drop_cf(name),
278 Storage::InMemory(db) => {
279 db.drop_cf(name);
280 Ok(())
281 }
282 #[cfg(tidehunter)]
283 Storage::TideHunter(_) => {
284 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285 }
286 }
287 }
288
289 pub fn delete_file_in_range<K: AsRef<[u8]>>(
290 &self,
291 cf: &impl AsColumnFamilyRef,
292 from: K,
293 to: K,
294 ) -> Result<(), rocksdb::Error> {
295 match &self.storage {
296 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298 }
299 }
300
301 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302 fail_point!("delete-cf-before");
303 let ret = match (&self.storage, cf) {
304 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305 .underlying
306 .delete_cf(&cf.rocks_cf(db), key)
307 .map_err(typed_store_err_from_rocks_err),
308 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309 db.delete(cf_name, key.as_ref());
310 Ok(())
311 }
312 #[cfg(tidehunter)]
313 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314 .remove(*ks, transform_th_key(key.as_ref(), prefix))
315 .map_err(typed_store_error_from_th_error),
316 _ => Err(TypedStoreError::RocksDBError(
317 "typed store invariant violation".to_string(),
318 )),
319 };
320 fail_point!("delete-cf-after");
321 #[allow(clippy::let_and_return)]
322 ret
323 }
324
325 pub fn path_for_pruning(&self) -> &Path {
326 match &self.storage {
327 Storage::Rocks(rocks) => rocks.underlying.path(),
328 _ => unimplemented!("method is only supported for rocksdb backend"),
329 }
330 }
331
332 fn put_cf(
333 &self,
334 cf: &ColumnFamily,
335 key: Vec<u8>,
336 value: Vec<u8>,
337 ) -> Result<(), TypedStoreError> {
338 fail_point!("put-cf-before");
339 let ret = match (&self.storage, cf) {
340 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341 .underlying
342 .put_cf(&cf.rocks_cf(db), key, value)
343 .map_err(typed_store_err_from_rocks_err),
344 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345 db.put(cf_name, key, value);
346 Ok(())
347 }
348 #[cfg(tidehunter)]
349 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350 .insert(*ks, transform_th_key(&key, prefix), value)
351 .map_err(typed_store_error_from_th_error),
352 _ => Err(TypedStoreError::RocksDBError(
353 "typed store invariant violation".to_string(),
354 )),
355 };
356 fail_point!("put-cf-after");
357 #[allow(clippy::let_and_return)]
358 ret
359 }
360
361 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362 &self,
363 cf_name: &str,
364 key: K,
365 readopts: &ReadOptions,
366 ) -> bool {
367 match &self.storage {
368 Storage::Rocks(rocks) => {
371 rocks
372 .underlying
373 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374 }
375 _ => true,
376 }
377 }
378
379 pub(crate) fn write_opt_internal(
380 &self,
381 batch: StorageWriteBatch,
382 write_options: &rocksdb::WriteOptions,
383 ) -> Result<(), TypedStoreError> {
384 fail_point!("batch-write-before");
385 let ret = match (&self.storage, batch) {
386 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387 .underlying
388 .write_opt(batch, write_options)
389 .map_err(typed_store_err_from_rocks_err),
390 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391 db.write(batch);
393 Ok(())
394 }
395 #[cfg(tidehunter)]
396 (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397 batch.commit().map_err(typed_store_error_from_th_error)
399 }
400 _ => Err(TypedStoreError::RocksDBError(
401 "using invalid batch type for the database".to_string(),
402 )),
403 };
404 fail_point!("batch-write-after");
405 #[allow(clippy::let_and_return)]
406 ret
407 }
408
409 #[cfg(tidehunter)]
410 pub fn start_relocation(&self) -> anyhow::Result<()> {
411 if let Storage::TideHunter(db) = &self.storage {
412 db.start_relocation()?;
413 }
414 Ok(())
415 }
416
417 #[cfg(tidehunter)]
418 pub fn force_rebuild_control_region(&self) -> anyhow::Result<()> {
419 if let Storage::TideHunter(db) = &self.storage {
420 db.force_rebuild_control_region()
421 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
422 }
423 Ok(())
424 }
425
426 #[cfg(tidehunter)]
427 pub fn drop_cells_in_range(
428 &self,
429 ks: KeySpace,
430 from_inclusive: &[u8],
431 to_inclusive: &[u8],
432 ) -> anyhow::Result<()> {
433 if let Storage::TideHunter(db) = &self.storage {
434 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
435 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
436 } else {
437 panic!("drop_cells_in_range called on non-TideHunter storage");
438 }
439 Ok(())
440 }
441
442 pub fn compact_range_cf<K: AsRef<[u8]>>(
443 &self,
444 cf_name: &str,
445 start: Option<K>,
446 end: Option<K>,
447 ) {
448 if let Storage::Rocks(rocksdb) = &self.storage {
449 rocksdb
450 .underlying
451 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
452 }
453 }
454
455 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
456 if let Storage::Rocks(rocks) = &self.storage {
458 let checkpoint =
459 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
460 checkpoint
461 .create_checkpoint(path)
462 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
463 }
464 Ok(())
465 }
466
467 pub fn get_sampling_interval(&self) -> SamplingInterval {
468 self.metric_conf.read_sample_interval.new_from_self()
469 }
470
471 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
472 self.metric_conf.read_sample_interval.new_from_self()
473 }
474
475 pub fn write_sampling_interval(&self) -> SamplingInterval {
476 self.metric_conf.write_sample_interval.new_from_self()
477 }
478
479 pub fn iter_sampling_interval(&self) -> SamplingInterval {
480 self.metric_conf.iter_sample_interval.new_from_self()
481 }
482
483 fn db_name(&self) -> String {
484 let name = &self.metric_conf.db_name;
485 if name.is_empty() {
486 "default".to_string()
487 } else {
488 name.clone()
489 }
490 }
491
492 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
493 match &self.storage {
494 Storage::Rocks(rocks) => rocks.underlying.live_files(),
495 _ => Ok(vec![]),
496 }
497 }
498}
499
500fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
501 rocks_db
502 .underlying
503 .cf_handle(cf_name)
504 .expect("Map-keying column family should have been checked at DB creation")
505}
506
507fn rocks_cf_from_db<'a>(
508 db: &'a Database,
509 cf_name: &str,
510) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
511 match &db.storage {
512 Storage::Rocks(rocksdb) => Ok(rocksdb
513 .underlying
514 .cf_handle(cf_name)
515 .expect("Map-keying column family should have been checked at DB creation")),
516 _ => Err(TypedStoreError::RocksDBError(
517 "using invalid batch type for the database".to_string(),
518 )),
519 }
520}
521
522#[derive(Debug, Default)]
523pub struct MetricConf {
524 pub db_name: String,
525 pub read_sample_interval: SamplingInterval,
526 pub write_sample_interval: SamplingInterval,
527 pub iter_sample_interval: SamplingInterval,
528}
529
530impl MetricConf {
531 pub fn new(db_name: &str) -> Self {
532 if db_name.is_empty() {
533 error!("A meaningful db name should be used for metrics reporting.")
534 }
535 Self {
536 db_name: db_name.to_string(),
537 read_sample_interval: SamplingInterval::default(),
538 write_sample_interval: SamplingInterval::default(),
539 iter_sample_interval: SamplingInterval::default(),
540 }
541 }
542
543 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
544 Self {
545 db_name: self.db_name,
546 read_sample_interval: read_interval,
547 write_sample_interval: SamplingInterval::default(),
548 iter_sample_interval: SamplingInterval::default(),
549 }
550 }
551}
552const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
553const METRICS_ERROR: i64 = -1;
554
555#[derive(Clone, Debug)]
557pub struct DBMap<K, V> {
558 pub db: Arc<Database>,
559 _phantom: PhantomData<fn(K) -> V>,
560 column_family: ColumnFamily,
561 cf: String,
563 pub opts: ReadWriteOptions,
564 db_metrics: Arc<DBMetrics>,
565 get_sample_interval: SamplingInterval,
566 multiget_sample_interval: SamplingInterval,
567 write_sample_interval: SamplingInterval,
568 iter_sample_interval: SamplingInterval,
569 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
570}
571
572unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
573
574impl<K, V> DBMap<K, V> {
575 pub(crate) fn new(
576 db: Arc<Database>,
577 opts: &ReadWriteOptions,
578 opt_cf: &str,
579 column_family: ColumnFamily,
580 is_deprecated: bool,
581 ) -> Self {
582 let db_cloned = Arc::downgrade(&db.clone());
583 let db_metrics = DBMetrics::get();
584 let db_metrics_cloned = db_metrics.clone();
585 let cf = opt_cf.to_string();
586
587 let (sender, mut recv) = tokio::sync::oneshot::channel();
588 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
589 tokio::task::spawn(async move {
590 let mut interval =
591 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
592 loop {
593 tokio::select! {
594 _ = interval.tick() => {
595 if let Some(db) = db_cloned.upgrade() {
596 let cf = cf.clone();
597 let db_metrics = db_metrics.clone();
598 if let Err(e) = tokio::task::spawn_blocking(move || {
599 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
600 }).await {
601 error!("Failed to log metrics with error: {}", e);
602 }
603 }
604 }
605 _ = &mut recv => break,
606 }
607 }
608 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
609 });
610 }
611 DBMap {
612 db: db.clone(),
613 opts: opts.clone(),
614 _phantom: PhantomData,
615 column_family,
616 cf: opt_cf.to_string(),
617 db_metrics: db_metrics_cloned,
618 _metrics_task_cancel_handle: Arc::new(sender),
619 get_sample_interval: db.get_sampling_interval(),
620 multiget_sample_interval: db.multiget_sampling_interval(),
621 write_sample_interval: db.write_sampling_interval(),
622 iter_sample_interval: db.iter_sampling_interval(),
623 }
624 }
625
626 #[instrument(level = "debug", skip(db), err)]
629 pub fn reopen(
630 db: &Arc<Database>,
631 opt_cf: Option<&str>,
632 rw_options: &ReadWriteOptions,
633 is_deprecated: bool,
634 ) -> Result<Self, TypedStoreError> {
635 let cf_key = opt_cf
636 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
637 .to_owned();
638 Ok(DBMap::new(
639 db.clone(),
640 rw_options,
641 &cf_key,
642 ColumnFamily::Rocks(cf_key.to_string()),
643 is_deprecated,
644 ))
645 }
646
647 #[cfg(tidehunter)]
648 pub fn reopen_th(
649 db: Arc<Database>,
650 cf_name: &str,
651 ks: KeySpace,
652 prefix: Option<Vec<u8>>,
653 ) -> Self {
654 DBMap::new(
655 db,
656 &ReadWriteOptions::default(),
657 cf_name,
658 ColumnFamily::TideHunter((ks, prefix.clone())),
659 false,
660 )
661 }
662
663 pub fn cf_name(&self) -> &str {
664 &self.cf
665 }
666
667 pub fn batch(&self) -> DBBatch {
668 let batch = match &self.db.storage {
669 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
670 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
671 #[cfg(tidehunter)]
672 Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
673 };
674 DBBatch::new(
675 &self.db,
676 batch,
677 &self.db_metrics,
678 &self.write_sample_interval,
679 )
680 }
681
682 pub fn flush(&self) -> Result<(), TypedStoreError> {
683 self.db.flush()
684 }
685
686 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
687 let from_buf = be_fix_int_ser(start);
688 let to_buf = be_fix_int_ser(end);
689 self.db
690 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
691 Ok(())
692 }
693
694 pub fn compact_range_raw(
695 &self,
696 cf_name: &str,
697 start: Vec<u8>,
698 end: Vec<u8>,
699 ) -> Result<(), TypedStoreError> {
700 self.db.compact_range_cf(cf_name, Some(start), Some(end));
701 Ok(())
702 }
703
704 #[cfg(tidehunter)]
705 pub fn drop_cells_in_range<J: Serialize>(
706 &self,
707 from_inclusive: &J,
708 to_inclusive: &J,
709 ) -> Result<(), TypedStoreError>
710 where
711 K: Serialize,
712 {
713 let from_buf = be_fix_int_ser(from_inclusive);
714 let to_buf = be_fix_int_ser(to_inclusive);
715 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
716 self.db
717 .drop_cells_in_range(*ks, &from_buf, &to_buf)
718 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
719 }
720 Ok(())
721 }
722
723 #[cfg(tidehunter)]
724 pub fn drop_cells_in_range_raw(
725 &self,
726 from_inclusive: &[u8],
727 to_inclusive: &[u8],
728 ) -> Result<(), TypedStoreError> {
729 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
730 self.db
731 .drop_cells_in_range(*ks, from_inclusive, to_inclusive)
732 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
733 }
734 Ok(())
735 }
736
737 fn multi_get_pinned<J>(
739 &self,
740 keys: impl IntoIterator<Item = J>,
741 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
742 where
743 J: Borrow<K>,
744 K: Serialize,
745 {
746 let _timer = self
747 .db_metrics
748 .op_metrics
749 .rocksdb_multiget_latency_seconds
750 .with_label_values(&[&self.cf])
751 .start_timer();
752 let perf_ctx = if self.multiget_sample_interval.sample() {
753 Some(RocksDBPerfContext)
754 } else {
755 None
756 };
757 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
758 let results: Result<Vec<_>, TypedStoreError> = self
759 .db
760 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
761 .into_iter()
762 .collect();
763 let entries = results?;
764 let entry_size = entries
765 .iter()
766 .flatten()
767 .map(|entry| entry.len())
768 .sum::<usize>();
769 self.db_metrics
770 .op_metrics
771 .rocksdb_multiget_bytes
772 .with_label_values(&[&self.cf])
773 .observe(entry_size as f64);
774 if perf_ctx.is_some() {
775 self.db_metrics
776 .read_perf_ctx_metrics
777 .report_metrics(&self.cf);
778 }
779 Ok(entries)
780 }
781
782 fn get_rocksdb_int_property(
783 rocksdb: &RocksDB,
784 cf: &impl AsColumnFamilyRef,
785 property_name: &std::ffi::CStr,
786 ) -> Result<i64, TypedStoreError> {
787 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
788 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
789 Ok(None) => Ok(0),
790 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
791 }
792 }
793
794 fn report_rocksdb_metrics(
795 database: &Arc<Database>,
796 cf_name: &str,
797 db_metrics: &Arc<DBMetrics>,
798 ) {
799 let Storage::Rocks(rocksdb) = &database.storage else {
800 return;
801 };
802
803 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
804 tracing::warn!(
805 "unable to report metrics for cf {cf_name:?} in db {:?}",
806 database.db_name()
807 );
808 return;
809 };
810
811 db_metrics
812 .cf_metrics
813 .rocksdb_total_sst_files_size
814 .with_label_values(&[cf_name])
815 .set(
816 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
817 .unwrap_or(METRICS_ERROR),
818 );
819 db_metrics
820 .cf_metrics
821 .rocksdb_total_blob_files_size
822 .with_label_values(&[cf_name])
823 .set(
824 Self::get_rocksdb_int_property(
825 rocksdb,
826 &cf,
827 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
828 )
829 .unwrap_or(METRICS_ERROR),
830 );
831 let total_num_files: i64 = (0..=6)
834 .map(|level| {
835 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
836 .unwrap_or(METRICS_ERROR)
837 })
838 .sum();
839 db_metrics
840 .cf_metrics
841 .rocksdb_total_num_files
842 .with_label_values(&[cf_name])
843 .set(total_num_files);
844 db_metrics
845 .cf_metrics
846 .rocksdb_num_level0_files
847 .with_label_values(&[cf_name])
848 .set(
849 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
850 .unwrap_or(METRICS_ERROR),
851 );
852 db_metrics
853 .cf_metrics
854 .rocksdb_current_size_active_mem_tables
855 .with_label_values(&[cf_name])
856 .set(
857 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
858 .unwrap_or(METRICS_ERROR),
859 );
860 db_metrics
861 .cf_metrics
862 .rocksdb_size_all_mem_tables
863 .with_label_values(&[cf_name])
864 .set(
865 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
866 .unwrap_or(METRICS_ERROR),
867 );
868 db_metrics
869 .cf_metrics
870 .rocksdb_num_snapshots
871 .with_label_values(&[cf_name])
872 .set(
873 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
874 .unwrap_or(METRICS_ERROR),
875 );
876 db_metrics
877 .cf_metrics
878 .rocksdb_oldest_snapshot_time
879 .with_label_values(&[cf_name])
880 .set(
881 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
882 .unwrap_or(METRICS_ERROR),
883 );
884 db_metrics
885 .cf_metrics
886 .rocksdb_actual_delayed_write_rate
887 .with_label_values(&[cf_name])
888 .set(
889 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
890 .unwrap_or(METRICS_ERROR),
891 );
892 db_metrics
893 .cf_metrics
894 .rocksdb_is_write_stopped
895 .with_label_values(&[cf_name])
896 .set(
897 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
898 .unwrap_or(METRICS_ERROR),
899 );
900 db_metrics
901 .cf_metrics
902 .rocksdb_block_cache_capacity
903 .with_label_values(&[cf_name])
904 .set(
905 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
906 .unwrap_or(METRICS_ERROR),
907 );
908 db_metrics
909 .cf_metrics
910 .rocksdb_block_cache_usage
911 .with_label_values(&[cf_name])
912 .set(
913 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
914 .unwrap_or(METRICS_ERROR),
915 );
916 db_metrics
917 .cf_metrics
918 .rocksdb_block_cache_pinned_usage
919 .with_label_values(&[cf_name])
920 .set(
921 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
922 .unwrap_or(METRICS_ERROR),
923 );
924 db_metrics
925 .cf_metrics
926 .rocksdb_estimate_table_readers_mem
927 .with_label_values(&[cf_name])
928 .set(
929 Self::get_rocksdb_int_property(
930 rocksdb,
931 &cf,
932 properties::ESTIMATE_TABLE_READERS_MEM,
933 )
934 .unwrap_or(METRICS_ERROR),
935 );
936 db_metrics
937 .cf_metrics
938 .rocksdb_estimated_num_keys
939 .with_label_values(&[cf_name])
940 .set(
941 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
942 .unwrap_or(METRICS_ERROR),
943 );
944 db_metrics
945 .cf_metrics
946 .rocksdb_num_immutable_mem_tables
947 .with_label_values(&[cf_name])
948 .set(
949 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
950 .unwrap_or(METRICS_ERROR),
951 );
952 db_metrics
953 .cf_metrics
954 .rocksdb_mem_table_flush_pending
955 .with_label_values(&[cf_name])
956 .set(
957 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
958 .unwrap_or(METRICS_ERROR),
959 );
960 db_metrics
961 .cf_metrics
962 .rocksdb_compaction_pending
963 .with_label_values(&[cf_name])
964 .set(
965 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
966 .unwrap_or(METRICS_ERROR),
967 );
968 db_metrics
969 .cf_metrics
970 .rocksdb_estimate_pending_compaction_bytes
971 .with_label_values(&[cf_name])
972 .set(
973 Self::get_rocksdb_int_property(
974 rocksdb,
975 &cf,
976 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
977 )
978 .unwrap_or(METRICS_ERROR),
979 );
980 db_metrics
981 .cf_metrics
982 .rocksdb_num_running_compactions
983 .with_label_values(&[cf_name])
984 .set(
985 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
986 .unwrap_or(METRICS_ERROR),
987 );
988 db_metrics
989 .cf_metrics
990 .rocksdb_num_running_flushes
991 .with_label_values(&[cf_name])
992 .set(
993 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
994 .unwrap_or(METRICS_ERROR),
995 );
996 db_metrics
997 .cf_metrics
998 .rocksdb_estimate_oldest_key_time
999 .with_label_values(&[cf_name])
1000 .set(
1001 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
1002 .unwrap_or(METRICS_ERROR),
1003 );
1004 db_metrics
1005 .cf_metrics
1006 .rocksdb_background_errors
1007 .with_label_values(&[cf_name])
1008 .set(
1009 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
1010 .unwrap_or(METRICS_ERROR),
1011 );
1012 db_metrics
1013 .cf_metrics
1014 .rocksdb_base_level
1015 .with_label_values(&[cf_name])
1016 .set(
1017 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
1018 .unwrap_or(METRICS_ERROR),
1019 );
1020 }
1021
1022 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1023 self.db.checkpoint(path)
1024 }
1025
1026 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1027 where
1028 K: Serialize + DeserializeOwned,
1029 V: Serialize + DeserializeOwned,
1030 {
1031 let mut num_keys = 0;
1032 let mut key_bytes_total = 0;
1033 let mut value_bytes_total = 0;
1034 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1035 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1036 for item in self.safe_iter() {
1037 let (key, value) = item?;
1038 num_keys += 1;
1039 let key_len = be_fix_int_ser(key.borrow()).len();
1040 let value_len = bcs::to_bytes(value.borrow())?.len();
1041 key_bytes_total += key_len;
1042 value_bytes_total += value_len;
1043 key_hist.record(key_len as u64)?;
1044 value_hist.record(value_len as u64)?;
1045 }
1046 Ok(TableSummary {
1047 num_keys,
1048 key_bytes_total,
1049 value_bytes_total,
1050 key_hist,
1051 value_hist,
1052 })
1053 }
1054
1055 fn start_iter_timer(&self) -> HistogramTimer {
1056 self.db_metrics
1057 .op_metrics
1058 .rocksdb_iter_latency_seconds
1059 .with_label_values(&[&self.cf])
1060 .start_timer()
1061 }
1062
1063 fn create_iter_context(
1065 &self,
1066 ) -> (
1067 Option<HistogramTimer>,
1068 Option<Histogram>,
1069 Option<Histogram>,
1070 Option<RocksDBPerfContext>,
1071 ) {
1072 let timer = self.start_iter_timer();
1073 let bytes_scanned = self
1074 .db_metrics
1075 .op_metrics
1076 .rocksdb_iter_bytes
1077 .with_label_values(&[&self.cf]);
1078 let keys_scanned = self
1079 .db_metrics
1080 .op_metrics
1081 .rocksdb_iter_keys
1082 .with_label_values(&[&self.cf]);
1083 let perf_ctx = if self.iter_sample_interval.sample() {
1084 Some(RocksDBPerfContext)
1085 } else {
1086 None
1087 };
1088 (
1089 Some(timer),
1090 Some(bytes_scanned),
1091 Some(keys_scanned),
1092 perf_ctx,
1093 )
1094 }
1095
1096 #[allow(clippy::complexity)]
1099 pub fn reversed_safe_iter_with_bounds(
1100 &self,
1101 lower_bound: Option<K>,
1102 upper_bound: Option<K>,
1103 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1104 where
1105 K: Serialize + DeserializeOwned,
1106 V: Serialize + DeserializeOwned,
1107 {
1108 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1109 lower_bound
1110 .as_ref()
1111 .map(Bound::Included)
1112 .unwrap_or(Bound::Unbounded),
1113 upper_bound
1114 .as_ref()
1115 .map(Bound::Included)
1116 .unwrap_or(Bound::Unbounded),
1117 ));
1118 match &self.db.storage {
1119 Storage::Rocks(db) => {
1120 let readopts = rocks_util::apply_range_bounds(
1121 self.opts.readopts(),
1122 it_lower_bound,
1123 it_upper_bound,
1124 );
1125 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1126 let db_iter = db
1127 .underlying
1128 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1129 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1130 let iter = SafeIter::new(
1131 self.cf.clone(),
1132 db_iter,
1133 _timer,
1134 _perf_ctx,
1135 bytes_scanned,
1136 keys_scanned,
1137 Some(self.db_metrics.clone()),
1138 );
1139 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1140 }
1141 Storage::InMemory(db) => {
1142 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1143 }
1144 #[cfg(tidehunter)]
1145 Storage::TideHunter(db) => match &self.column_family {
1146 ColumnFamily::TideHunter((ks, prefix)) => {
1147 let mut iter = db.iterator(*ks);
1148 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1149 iter.reverse();
1150 Ok(Box::new(transform_th_iterator(
1151 iter,
1152 prefix,
1153 self.start_iter_timer(),
1154 )))
1155 }
1156 _ => unreachable!("storage backend invariant violation"),
1157 },
1158 }
1159 }
1160}
1161
1162pub enum StorageWriteBatch {
1163 Rocks(rocksdb::WriteBatch),
1164 InMemory(InMemoryBatch),
1165 #[cfg(tidehunter)]
1166 TideHunter(tidehunter::batch::WriteBatch),
1167}
1168
1169struct EntryHeader {
1173 offset: usize,
1175 cf_name_len: usize,
1176 key_len: usize,
1177 is_put: bool,
1178}
1179
1180#[derive(Default)]
1186pub struct StagedBatch {
1187 data: Vec<u8>,
1188 entries: Vec<EntryHeader>,
1189}
1190
1191impl StagedBatch {
1192 pub fn new() -> Self {
1193 Self {
1194 data: Vec::with_capacity(1024),
1195 entries: Vec::with_capacity(16),
1196 }
1197 }
1198
1199 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1200 &mut self,
1201 db: &DBMap<K, V>,
1202 new_vals: impl IntoIterator<Item = (J, U)>,
1203 ) -> Result<&mut Self, TypedStoreError> {
1204 let cf_name = db.cf_name();
1205 new_vals
1206 .into_iter()
1207 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1208 let offset = self.data.len();
1209 self.data.extend_from_slice(cf_name.as_bytes());
1210 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1211 bcs::serialize_into(&mut self.data, v.borrow())
1212 .map_err(typed_store_err_from_bcs_err)?;
1213 self.entries.push(EntryHeader {
1214 offset,
1215 cf_name_len: cf_name.len(),
1216 key_len,
1217 is_put: true,
1218 });
1219 Ok(())
1220 })?;
1221 Ok(self)
1222 }
1223
1224 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1225 &mut self,
1226 db: &DBMap<K, V>,
1227 purged_vals: impl IntoIterator<Item = J>,
1228 ) -> Result<(), TypedStoreError> {
1229 let cf_name = db.cf_name();
1230 purged_vals
1231 .into_iter()
1232 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1233 let offset = self.data.len();
1234 self.data.extend_from_slice(cf_name.as_bytes());
1235 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1236 self.entries.push(EntryHeader {
1237 offset,
1238 cf_name_len: cf_name.len(),
1239 key_len,
1240 is_put: false,
1241 });
1242 Ok(())
1243 })?;
1244 Ok(())
1245 }
1246
1247 pub fn size_in_bytes(&self) -> usize {
1248 self.data.len()
1249 }
1250}
1251
1252pub struct DBBatch {
1304 database: Arc<Database>,
1305 batch: StorageWriteBatch,
1306 db_metrics: Arc<DBMetrics>,
1307 write_sample_interval: SamplingInterval,
1308}
1309
1310impl DBBatch {
1311 pub fn new(
1315 dbref: &Arc<Database>,
1316 batch: StorageWriteBatch,
1317 db_metrics: &Arc<DBMetrics>,
1318 write_sample_interval: &SamplingInterval,
1319 ) -> Self {
1320 DBBatch {
1321 database: dbref.clone(),
1322 batch,
1323 db_metrics: db_metrics.clone(),
1324 write_sample_interval: write_sample_interval.clone(),
1325 }
1326 }
1327
1328 #[instrument(level = "trace", skip_all, err)]
1330 pub fn write(self) -> Result<(), TypedStoreError> {
1331 let mut write_options = rocksdb::WriteOptions::default();
1332
1333 if write_sync_enabled() {
1334 write_options.set_sync(true);
1335 }
1336
1337 self.write_opt(write_options)
1338 }
1339
1340 #[instrument(level = "trace", skip_all, err)]
1342 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1343 let db_name = self.database.db_name();
1344 let timer = self
1345 .db_metrics
1346 .op_metrics
1347 .rocksdb_batch_commit_latency_seconds
1348 .with_label_values(&[&db_name])
1349 .start_timer();
1350 let batch_size = self.size_in_bytes();
1351
1352 let perf_ctx = if self.write_sample_interval.sample() {
1353 Some(RocksDBPerfContext)
1354 } else {
1355 None
1356 };
1357
1358 self.database
1359 .write_opt_internal(self.batch, &write_options)?;
1360
1361 self.db_metrics
1362 .op_metrics
1363 .rocksdb_batch_commit_bytes
1364 .with_label_values(&[&db_name])
1365 .observe(batch_size as f64);
1366
1367 if perf_ctx.is_some() {
1368 self.db_metrics
1369 .write_perf_ctx_metrics
1370 .report_metrics(&db_name);
1371 }
1372 let elapsed = timer.stop_and_record();
1373 if elapsed > 1.0 {
1374 warn!(?elapsed, ?db_name, "very slow batch write");
1375 self.db_metrics
1376 .op_metrics
1377 .rocksdb_very_slow_batch_writes_count
1378 .with_label_values(&[&db_name])
1379 .inc();
1380 self.db_metrics
1381 .op_metrics
1382 .rocksdb_very_slow_batch_writes_duration_ms
1383 .with_label_values(&[&db_name])
1384 .inc_by((elapsed * 1000.0) as u64);
1385 }
1386 Ok(())
1387 }
1388
1389 pub fn size_in_bytes(&self) -> usize {
1390 match self.batch {
1391 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1392 StorageWriteBatch::InMemory(_) => 0,
1393 #[cfg(tidehunter)]
1395 StorageWriteBatch::TideHunter(_) => 0,
1396 }
1397 }
1398
1399 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1401 for raw_batch in raw_batches {
1402 let data = &raw_batch.data;
1403 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1404 let end = raw_batch
1405 .entries
1406 .get(i + 1)
1407 .map_or(data.len(), |next| next.offset);
1408 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1409 let key_start = hdr.offset + hdr.cf_name_len;
1410 let key = &data[key_start..key_start + hdr.key_len];
1411 let cf_name = std::str::from_utf8(cf_bytes)
1413 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1414
1415 if hdr.is_put {
1416 let value = &data[key_start + hdr.key_len..end];
1417 match &mut self.batch {
1418 StorageWriteBatch::Rocks(b) => {
1419 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1420 }
1421 StorageWriteBatch::InMemory(b) => {
1422 b.put_cf(cf_name, key, value);
1423 }
1424 #[cfg(tidehunter)]
1425 _ => {
1426 return Err(TypedStoreError::RocksDBError(
1427 "concat not supported for TideHunter".to_string(),
1428 ));
1429 }
1430 }
1431 } else {
1432 match &mut self.batch {
1433 StorageWriteBatch::Rocks(b) => {
1434 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1435 }
1436 StorageWriteBatch::InMemory(b) => {
1437 b.delete_cf(cf_name, key);
1438 }
1439 #[cfg(tidehunter)]
1440 _ => {
1441 return Err(TypedStoreError::RocksDBError(
1442 "concat not supported for TideHunter".to_string(),
1443 ));
1444 }
1445 }
1446 }
1447 }
1448 }
1449 Ok(self)
1450 }
1451
1452 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1453 &mut self,
1454 db: &DBMap<K, V>,
1455 purged_vals: impl IntoIterator<Item = J>,
1456 ) -> Result<(), TypedStoreError> {
1457 if !Arc::ptr_eq(&db.db, &self.database) {
1458 return Err(TypedStoreError::CrossDBBatch);
1459 }
1460
1461 purged_vals
1462 .into_iter()
1463 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1464 let k_buf = be_fix_int_ser(k.borrow());
1465 match (&mut self.batch, &db.column_family) {
1466 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1467 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1468 }
1469 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1470 b.delete_cf(name, k_buf)
1471 }
1472 #[cfg(tidehunter)]
1473 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1474 b.delete(*ks, transform_th_key(&k_buf, prefix))
1475 }
1476 _ => Err(TypedStoreError::RocksDBError(
1477 "typed store invariant violation".to_string(),
1478 ))?,
1479 }
1480 Ok(())
1481 })?;
1482 Ok(())
1483 }
1484
1485 pub fn schedule_delete_range<K: Serialize, V>(
1495 &mut self,
1496 db: &DBMap<K, V>,
1497 from: &K,
1498 to: &K,
1499 ) -> Result<(), TypedStoreError> {
1500 if !Arc::ptr_eq(&db.db, &self.database) {
1501 return Err(TypedStoreError::CrossDBBatch);
1502 }
1503
1504 let from_buf = be_fix_int_ser(from);
1505 let to_buf = be_fix_int_ser(to);
1506
1507 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1508 b.delete_range_cf(
1509 &rocks_cf_from_db(&self.database, db.cf_name())?,
1510 from_buf,
1511 to_buf,
1512 );
1513 }
1514 Ok(())
1515 }
1516
1517 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1519 &mut self,
1520 db: &DBMap<K, V>,
1521 new_vals: impl IntoIterator<Item = (J, U)>,
1522 ) -> Result<&mut Self, TypedStoreError> {
1523 if !Arc::ptr_eq(&db.db, &self.database) {
1524 return Err(TypedStoreError::CrossDBBatch);
1525 }
1526 let mut total = 0usize;
1527 new_vals
1528 .into_iter()
1529 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1530 let k_buf = be_fix_int_ser(k.borrow());
1531 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1532 total += k_buf.len() + v_buf.len();
1533 if db.opts.log_value_hash {
1534 let key_hash = default_hash(&k_buf);
1535 let value_hash = default_hash(&v_buf);
1536 debug!(
1537 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1538 db.cf_name(),
1539 key_hash,
1540 value_hash
1541 );
1542 }
1543 match (&mut self.batch, &db.column_family) {
1544 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1545 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1546 }
1547 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1548 b.put_cf(name, k_buf, v_buf)
1549 }
1550 #[cfg(tidehunter)]
1551 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1552 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1553 }
1554 _ => Err(TypedStoreError::RocksDBError(
1555 "typed store invariant violation".to_string(),
1556 ))?,
1557 }
1558 Ok(())
1559 })?;
1560 self.db_metrics
1561 .op_metrics
1562 .rocksdb_batch_put_bytes
1563 .with_label_values(&[&db.cf])
1564 .observe(total as f64);
1565 Ok(self)
1566 }
1567
1568 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1569 &mut self,
1570 db: &DBMap<K, V>,
1571 new_vals: impl IntoIterator<Item = (J, U)>,
1572 ) -> Result<&mut Self, TypedStoreError> {
1573 if !Arc::ptr_eq(&db.db, &self.database) {
1574 return Err(TypedStoreError::CrossDBBatch);
1575 }
1576 new_vals
1577 .into_iter()
1578 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1579 let k_buf = be_fix_int_ser(k.borrow());
1580 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1581 match &mut self.batch {
1582 StorageWriteBatch::Rocks(b) => b.merge_cf(
1583 &rocks_cf_from_db(&self.database, db.cf_name())?,
1584 k_buf,
1585 v_buf,
1586 ),
1587 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1588 }
1589 Ok(())
1590 })?;
1591 Ok(self)
1592 }
1593}
1594
1595impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1596where
1597 K: Serialize + DeserializeOwned,
1598 V: Serialize + DeserializeOwned,
1599{
1600 type Error = TypedStoreError;
1601
1602 #[instrument(level = "trace", skip_all, err)]
1603 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1604 let key_buf = be_fix_int_ser(key);
1605 let readopts = self.opts.readopts();
1606 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1607 && self
1608 .db
1609 .get(&self.column_family, &key_buf, &readopts)?
1610 .is_some())
1611 }
1612
1613 #[instrument(level = "trace", skip_all, err)]
1614 fn multi_contains_keys<J>(
1615 &self,
1616 keys: impl IntoIterator<Item = J>,
1617 ) -> Result<Vec<bool>, Self::Error>
1618 where
1619 J: Borrow<K>,
1620 {
1621 let values = self.multi_get_pinned(keys)?;
1622 Ok(values.into_iter().map(|v| v.is_some()).collect())
1623 }
1624
1625 #[instrument(level = "trace", skip_all, err)]
1626 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1627 let _timer = self
1628 .db_metrics
1629 .op_metrics
1630 .rocksdb_get_latency_seconds
1631 .with_label_values(&[&self.cf])
1632 .start_timer();
1633 let perf_ctx = if self.get_sample_interval.sample() {
1634 Some(RocksDBPerfContext)
1635 } else {
1636 None
1637 };
1638 let key_buf = be_fix_int_ser(key);
1639 let res = self
1640 .db
1641 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1642 self.db_metrics
1643 .op_metrics
1644 .rocksdb_get_bytes
1645 .with_label_values(&[&self.cf])
1646 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1647 if perf_ctx.is_some() {
1648 self.db_metrics
1649 .read_perf_ctx_metrics
1650 .report_metrics(&self.cf);
1651 }
1652 match res {
1653 Some(data) => {
1654 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1655 if value.is_err() {
1656 let key_hash = default_hash(&key_buf);
1657 let value_hash = default_hash(&data);
1658 debug_fatal!(
1659 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1660 self.cf_name(),
1661 key_hash,
1662 value_hash,
1663 value.as_ref().err().unwrap()
1664 );
1665 }
1666 Ok(Some(value?))
1667 }
1668 None => Ok(None),
1669 }
1670 }
1671
1672 #[instrument(level = "trace", skip_all, err)]
1673 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1674 let timer = self
1675 .db_metrics
1676 .op_metrics
1677 .rocksdb_put_latency_seconds
1678 .with_label_values(&[&self.cf])
1679 .start_timer();
1680 let perf_ctx = if self.write_sample_interval.sample() {
1681 Some(RocksDBPerfContext)
1682 } else {
1683 None
1684 };
1685 let key_buf = be_fix_int_ser(key);
1686 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1687 self.db_metrics
1688 .op_metrics
1689 .rocksdb_put_bytes
1690 .with_label_values(&[&self.cf])
1691 .observe((key_buf.len() + value_buf.len()) as f64);
1692 if perf_ctx.is_some() {
1693 self.db_metrics
1694 .write_perf_ctx_metrics
1695 .report_metrics(&self.cf);
1696 }
1697 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1698
1699 let elapsed = timer.stop_and_record();
1700 if elapsed > 1.0 {
1701 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1702 self.db_metrics
1703 .op_metrics
1704 .rocksdb_very_slow_puts_count
1705 .with_label_values(&[&self.cf])
1706 .inc();
1707 self.db_metrics
1708 .op_metrics
1709 .rocksdb_very_slow_puts_duration_ms
1710 .with_label_values(&[&self.cf])
1711 .inc_by((elapsed * 1000.0) as u64);
1712 }
1713
1714 Ok(())
1715 }
1716
1717 #[instrument(level = "trace", skip_all, err)]
1718 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1719 let _timer = self
1720 .db_metrics
1721 .op_metrics
1722 .rocksdb_delete_latency_seconds
1723 .with_label_values(&[&self.cf])
1724 .start_timer();
1725 let perf_ctx = if self.write_sample_interval.sample() {
1726 Some(RocksDBPerfContext)
1727 } else {
1728 None
1729 };
1730 let key_buf = be_fix_int_ser(key);
1731 self.db.delete_cf(&self.column_family, key_buf)?;
1732 self.db_metrics
1733 .op_metrics
1734 .rocksdb_deletes
1735 .with_label_values(&[&self.cf])
1736 .inc();
1737 if perf_ctx.is_some() {
1738 self.db_metrics
1739 .write_perf_ctx_metrics
1740 .report_metrics(&self.cf);
1741 }
1742 Ok(())
1743 }
1744
1745 #[instrument(level = "trace", skip_all, err)]
1754 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1755 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1756 let last_key = self
1757 .reversed_safe_iter_with_bounds(None, None)?
1758 .next()
1759 .transpose()?
1760 .map(|(k, _v)| k);
1761 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1762 let mut batch = self.batch();
1763 batch.schedule_delete_range(self, &first_key, &last_key)?;
1764 batch.write()?;
1765 }
1766 Ok(())
1767 }
1768
1769 fn is_empty(&self) -> bool {
1770 self.safe_iter().next().is_none()
1771 }
1772
1773 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1774 match &self.db.storage {
1775 Storage::Rocks(db) => {
1776 let db_iter = db
1777 .underlying
1778 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1779 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1780 Box::new(SafeIter::new(
1781 self.cf.clone(),
1782 db_iter,
1783 _timer,
1784 _perf_ctx,
1785 bytes_scanned,
1786 keys_scanned,
1787 Some(self.db_metrics.clone()),
1788 ))
1789 }
1790 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1791 #[cfg(tidehunter)]
1792 Storage::TideHunter(db) => match &self.column_family {
1793 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1794 db.iterator(*ks),
1795 prefix,
1796 self.start_iter_timer(),
1797 )),
1798 _ => unreachable!("storage backend invariant violation"),
1799 },
1800 }
1801 }
1802
1803 fn safe_iter_with_bounds(
1804 &'a self,
1805 lower_bound: Option<K>,
1806 upper_bound: Option<K>,
1807 ) -> DbIterator<'a, (K, V)> {
1808 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1809 match &self.db.storage {
1810 Storage::Rocks(db) => {
1811 let readopts =
1812 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1813 let db_iter = db
1814 .underlying
1815 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1816 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1817 Box::new(SafeIter::new(
1818 self.cf.clone(),
1819 db_iter,
1820 _timer,
1821 _perf_ctx,
1822 bytes_scanned,
1823 keys_scanned,
1824 Some(self.db_metrics.clone()),
1825 ))
1826 }
1827 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1828 #[cfg(tidehunter)]
1829 Storage::TideHunter(db) => match &self.column_family {
1830 ColumnFamily::TideHunter((ks, prefix)) => {
1831 let mut iter = db.iterator(*ks);
1832 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1833 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1834 }
1835 _ => unreachable!("storage backend invariant violation"),
1836 },
1837 }
1838 }
1839
1840 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1841 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1842 match &self.db.storage {
1843 Storage::Rocks(db) => {
1844 let readopts =
1845 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1846 let db_iter = db
1847 .underlying
1848 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1849 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1850 Box::new(SafeIter::new(
1851 self.cf.clone(),
1852 db_iter,
1853 _timer,
1854 _perf_ctx,
1855 bytes_scanned,
1856 keys_scanned,
1857 Some(self.db_metrics.clone()),
1858 ))
1859 }
1860 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1861 #[cfg(tidehunter)]
1862 Storage::TideHunter(db) => match &self.column_family {
1863 ColumnFamily::TideHunter((ks, prefix)) => {
1864 let mut iter = db.iterator(*ks);
1865 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1866 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1867 }
1868 _ => unreachable!("storage backend invariant violation"),
1869 },
1870 }
1871 }
1872
1873 #[instrument(level = "trace", skip_all, err)]
1875 fn multi_get<J>(
1876 &self,
1877 keys: impl IntoIterator<Item = J>,
1878 ) -> Result<Vec<Option<V>>, TypedStoreError>
1879 where
1880 J: Borrow<K>,
1881 {
1882 let results = self.multi_get_pinned(keys)?;
1883 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1884 .into_iter()
1885 .map(|value_byte| match value_byte {
1886 Some(data) => Ok(Some(
1887 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1888 )),
1889 None => Ok(None),
1890 })
1891 .collect();
1892
1893 values_parsed
1894 }
1895
1896 #[instrument(level = "trace", skip_all, err)]
1898 fn multi_insert<J, U>(
1899 &self,
1900 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1901 ) -> Result<(), Self::Error>
1902 where
1903 J: Borrow<K>,
1904 U: Borrow<V>,
1905 {
1906 let mut batch = self.batch();
1907 batch.insert_batch(self, key_val_pairs)?;
1908 batch.write()
1909 }
1910
1911 #[instrument(level = "trace", skip_all, err)]
1913 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1914 where
1915 J: Borrow<K>,
1916 {
1917 let mut batch = self.batch();
1918 batch.delete_batch(self, keys)?;
1919 batch.write()
1920 }
1921
1922 #[instrument(level = "trace", skip_all, err)]
1924 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1925 if let Storage::Rocks(rocks) = &self.db.storage {
1926 rocks
1927 .underlying
1928 .try_catch_up_with_primary()
1929 .map_err(typed_store_err_from_rocks_err)?;
1930 }
1931 Ok(())
1932 }
1933}
1934
1935#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1937pub fn open_cf_opts<P: AsRef<Path>>(
1938 path: P,
1939 db_options: Option<rocksdb::Options>,
1940 metric_conf: MetricConf,
1941 opt_cfs: &[(&str, rocksdb::Options)],
1942) -> Result<Arc<Database>, TypedStoreError> {
1943 let path = path.as_ref();
1944 ensure_database_type(path, StorageType::Rocks)
1945 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1946 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1955 nondeterministic!({
1956 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1957 options.create_if_missing(true);
1958 options.create_missing_column_families(true);
1959 let rocksdb = {
1960 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1961 &options,
1962 path,
1963 cfs.into_iter()
1964 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1965 )
1966 .map_err(typed_store_err_from_rocks_err)?
1967 };
1968 Ok(Arc::new(Database::new(
1969 Storage::Rocks(RocksDB {
1970 underlying: rocksdb,
1971 }),
1972 metric_conf,
1973 None,
1974 )))
1975 })
1976}
1977
1978pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1980 primary_path: P,
1981 secondary_path: Option<P>,
1982 db_options: Option<rocksdb::Options>,
1983 metric_conf: MetricConf,
1984 opt_cfs: &[(&str, rocksdb::Options)],
1985) -> Result<Arc<Database>, TypedStoreError> {
1986 let primary_path = primary_path.as_ref();
1987 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1988 nondeterministic!({
1990 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1992
1993 fdlimit::raise_fd_limit();
1994 options.set_max_open_files(-1);
1996
1997 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1998 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1999 .ok()
2000 .unwrap_or_default();
2001
2002 let default_db_options = default_db_options();
2003 for cf_key in cfs.iter() {
2005 if !opt_cfs.contains_key(&cf_key[..]) {
2006 opt_cfs.insert(cf_key, default_db_options.options.clone());
2007 }
2008 }
2009
2010 let primary_path = primary_path.to_path_buf();
2011 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
2012 let mut s = primary_path.clone();
2013 s.pop();
2014 s.push("SECONDARY");
2015 s.as_path().to_path_buf()
2016 });
2017
2018 ensure_database_type(&primary_path, StorageType::Rocks)
2019 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2020 ensure_database_type(&secondary_path, StorageType::Rocks)
2021 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2022
2023 let rocksdb = {
2024 options.create_if_missing(true);
2025 options.create_missing_column_families(true);
2026 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2027 &options,
2028 &primary_path,
2029 &secondary_path,
2030 opt_cfs
2031 .iter()
2032 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2033 )
2034 .map_err(typed_store_err_from_rocks_err)?;
2035 db.try_catch_up_with_primary()
2036 .map_err(typed_store_err_from_rocks_err)?;
2037 db
2038 };
2039 Ok(Arc::new(Database::new(
2040 Storage::Rocks(RocksDB {
2041 underlying: rocksdb,
2042 }),
2043 metric_conf,
2044 None,
2045 )))
2046 })
2047}
2048
2049#[cfg(not(tidehunter))]
2051pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2052 let mut backoff = backoff::ExponentialBackoff {
2053 max_elapsed_time: Some(timeout),
2054 ..Default::default()
2055 };
2056 loop {
2057 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2058 Ok(()) => return Ok(()),
2059 Err(err) => match backoff.next_backoff() {
2060 Some(duration) => tokio::time::sleep(duration).await,
2061 None => return Err(err),
2062 },
2063 }
2064 }
2065}
2066
2067#[cfg(tidehunter)]
2068pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), std::io::Error> {
2069 let mut backoff = backoff::ExponentialBackoff {
2070 max_elapsed_time: Some(timeout),
2071 ..Default::default()
2072 };
2073 loop {
2074 match TideHunterDb::drop_db(&path) {
2075 Ok(()) => return Ok(()),
2076 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
2077 match backoff.next_backoff() {
2078 Some(duration) => tokio::time::sleep(duration).await,
2079 None => {
2080 warn!(
2081 "Database at {:?} is still locked after timeout ({:?})",
2082 path, timeout
2083 );
2084 return Err(err);
2085 }
2086 }
2087 }
2088 Err(err) => return Err(err),
2089 }
2090 }
2091}
2092
2093fn populate_missing_cfs(
2094 input_cfs: &[(&str, rocksdb::Options)],
2095 path: &Path,
2096) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2097 let mut cfs = vec![];
2098 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2099 let existing_cfs =
2100 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2101 .ok()
2102 .unwrap_or_default();
2103
2104 for cf_name in existing_cfs {
2105 if !input_cf_index.contains(&cf_name[..]) {
2106 cfs.push((cf_name, rocksdb::Options::default()));
2107 }
2108 }
2109 cfs.extend(
2110 input_cfs
2111 .iter()
2112 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2113 );
2114 Ok(cfs)
2115}
2116
2117fn default_hash(value: &[u8]) -> Digest<32> {
2118 let mut hasher = fastcrypto::hash::Blake2b256::default();
2119 hasher.update(value);
2120 hasher.finalize()
2121}