1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, ensure_database_type, iterator_bounds, iterator_bounds_with_range,
21};
22use crate::{DbIterator, StorageType, TypedStoreError};
23use crate::{
24 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
25 traits::{Map, TableSummary},
26};
27use backoff::backoff::Backoff;
28use fastcrypto::hash::{Digest, HashFunction};
29use mysten_common::debug_fatal;
30use mysten_metrics::RegistryID;
31use prometheus::{Histogram, HistogramTimer};
32use rocksdb::properties::num_files_at_level;
33use rocksdb::{
34 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
35 properties,
36};
37use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
38use serde::{Serialize, de::DeserializeOwned};
39use std::ops::{Bound, Deref};
40use std::{
41 borrow::Borrow,
42 marker::PhantomData,
43 ops::RangeBounds,
44 path::{Path, PathBuf},
45 sync::{Arc, OnceLock},
46 time::Duration,
47};
48use std::{collections::HashSet, ffi::CStr};
49use sui_macros::{fail_point, nondeterministic};
50#[cfg(tidehunter)]
51use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
52use tokio::sync::oneshot;
53use tracing::{debug, error, instrument, warn};
54
55const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
58 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
59
60static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
61
62fn write_sync_enabled() -> bool {
63 *WRITE_SYNC_ENABLED
64 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
65}
66
67pub fn init_write_sync(enabled: Option<bool>) {
70 if let Some(value) = enabled {
71 let _ = WRITE_SYNC_ENABLED.set(value);
72 }
73}
74
75#[cfg(test)]
76mod tests;
77
78#[derive(Debug)]
79pub struct RocksDB {
80 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
81}
82
83impl Drop for RocksDB {
84 fn drop(&mut self) {
85 self.underlying.cancel_all_background_work(true);
86 }
87}
88
89#[derive(Clone)]
90pub enum ColumnFamily {
91 Rocks(String),
92 InMemory(String),
93 #[cfg(tidehunter)]
94 TideHunter((KeySpace, Option<Vec<u8>>)),
95}
96
97impl std::fmt::Debug for ColumnFamily {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 match self {
100 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
101 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
102 #[cfg(tidehunter)]
103 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
104 }
105 }
106}
107
108impl ColumnFamily {
109 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
110 match &self {
111 ColumnFamily::Rocks(name) => rocks_db
112 .underlying
113 .cf_handle(name)
114 .expect("Map-keying column family should have been checked at DB creation"),
115 _ => unreachable!("invariant is checked by the caller"),
116 }
117 }
118}
119
120pub enum Storage {
121 Rocks(RocksDB),
122 InMemory(InMemoryDB),
123 #[cfg(tidehunter)]
124 TideHunter(Arc<TideHunterDb>),
125}
126
127impl std::fmt::Debug for Storage {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
131 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
132 #[cfg(tidehunter)]
133 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
134 }
135 }
136}
137
138#[derive(Debug)]
139pub struct Database {
140 storage: Storage,
141 metric_conf: MetricConf,
142 registry_id: Option<RegistryID>,
143}
144
145impl Drop for Database {
146 fn drop(&mut self) {
147 let metrics = DBMetrics::get();
148 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
149 if let Some(registry_id) = self.registry_id {
150 metrics.registry_serivce.remove(registry_id);
151 }
152 }
153}
154
155enum GetResult<'a> {
156 Rocks(DBPinnableSlice<'a>),
157 InMemory(Vec<u8>),
158 #[cfg(tidehunter)]
159 TideHunter(tidehunter::minibytes::Bytes),
160}
161
162impl Deref for GetResult<'_> {
163 type Target = [u8];
164 fn deref(&self) -> &[u8] {
165 match self {
166 GetResult::Rocks(d) => d.deref(),
167 GetResult::InMemory(d) => d.deref(),
168 #[cfg(tidehunter)]
169 GetResult::TideHunter(d) => d.deref(),
170 }
171 }
172}
173
174impl Database {
175 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
176 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
177 Self {
178 storage,
179 metric_conf,
180 registry_id,
181 }
182 }
183
184 pub fn flush(&self) -> Result<(), TypedStoreError> {
186 match &self.storage {
187 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
188 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
189 }),
190 Storage::InMemory(_) => {
191 Ok(())
193 }
194 #[cfg(tidehunter)]
195 Storage::TideHunter(_) => {
196 Ok(())
198 }
199 }
200 }
201
202 fn get<K: AsRef<[u8]>>(
203 &self,
204 cf: &ColumnFamily,
205 key: K,
206 readopts: &ReadOptions,
207 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
208 match (&self.storage, cf) {
209 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
210 .underlying
211 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
212 .map_err(typed_store_err_from_rocks_err)?
213 .map(GetResult::Rocks)),
214 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
215 Ok(db.get(cf_name, key).map(GetResult::InMemory))
216 }
217 #[cfg(tidehunter)]
218 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
219 .get(*ks, &transform_th_key(key.as_ref(), prefix))
220 .map_err(typed_store_error_from_th_error)?
221 .map(GetResult::TideHunter)),
222
223 _ => Err(TypedStoreError::RocksDBError(
224 "typed store invariant violation".to_string(),
225 )),
226 }
227 }
228
229 fn multi_get<I, K>(
230 &self,
231 cf: &ColumnFamily,
232 keys: I,
233 readopts: &ReadOptions,
234 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
235 where
236 I: IntoIterator<Item = K>,
237 K: AsRef<[u8]>,
238 {
239 match (&self.storage, cf) {
240 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
241 let keys_vec: Vec<K> = keys.into_iter().collect();
242 let res = db.underlying.batched_multi_get_cf_opt(
243 &cf.rocks_cf(db),
244 keys_vec.iter(),
245 false,
246 readopts,
247 );
248 res.into_iter()
249 .map(|r| {
250 r.map_err(typed_store_err_from_rocks_err)
251 .map(|item| item.map(GetResult::Rocks))
252 })
253 .collect()
254 }
255 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
256 .multi_get(cf_name, keys)
257 .into_iter()
258 .map(|r| Ok(r.map(GetResult::InMemory)))
259 .collect(),
260 #[cfg(tidehunter)]
261 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
262 let res = keys.into_iter().map(|k| {
263 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
264 .map_err(typed_store_error_from_th_error)
265 });
266 res.into_iter()
267 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
268 .collect()
269 }
270 _ => unreachable!("typed store invariant violation"),
271 }
272 }
273
274 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
275 match &self.storage {
276 Storage::Rocks(db) => db.underlying.drop_cf(name),
277 Storage::InMemory(db) => {
278 db.drop_cf(name);
279 Ok(())
280 }
281 #[cfg(tidehunter)]
282 Storage::TideHunter(_) => {
283 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
284 }
285 }
286 }
287
288 pub fn delete_file_in_range<K: AsRef<[u8]>>(
289 &self,
290 cf: &impl AsColumnFamilyRef,
291 from: K,
292 to: K,
293 ) -> Result<(), rocksdb::Error> {
294 match &self.storage {
295 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
296 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
297 }
298 }
299
300 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
301 fail_point!("delete-cf-before");
302 let ret = match (&self.storage, cf) {
303 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
304 .underlying
305 .delete_cf(&cf.rocks_cf(db), key)
306 .map_err(typed_store_err_from_rocks_err),
307 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
308 db.delete(cf_name, key.as_ref());
309 Ok(())
310 }
311 #[cfg(tidehunter)]
312 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
313 .remove(*ks, transform_th_key(key.as_ref(), prefix))
314 .map_err(typed_store_error_from_th_error),
315 _ => Err(TypedStoreError::RocksDBError(
316 "typed store invariant violation".to_string(),
317 )),
318 };
319 fail_point!("delete-cf-after");
320 #[allow(clippy::let_and_return)]
321 ret
322 }
323
324 pub fn path_for_pruning(&self) -> &Path {
325 match &self.storage {
326 Storage::Rocks(rocks) => rocks.underlying.path(),
327 _ => unimplemented!("method is only supported for rocksdb backend"),
328 }
329 }
330
331 fn put_cf(
332 &self,
333 cf: &ColumnFamily,
334 key: Vec<u8>,
335 value: Vec<u8>,
336 ) -> Result<(), TypedStoreError> {
337 fail_point!("put-cf-before");
338 let ret = match (&self.storage, cf) {
339 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
340 .underlying
341 .put_cf(&cf.rocks_cf(db), key, value)
342 .map_err(typed_store_err_from_rocks_err),
343 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
344 db.put(cf_name, key, value);
345 Ok(())
346 }
347 #[cfg(tidehunter)]
348 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
349 .insert(*ks, transform_th_key(&key, prefix), value)
350 .map_err(typed_store_error_from_th_error),
351 _ => Err(TypedStoreError::RocksDBError(
352 "typed store invariant violation".to_string(),
353 )),
354 };
355 fail_point!("put-cf-after");
356 #[allow(clippy::let_and_return)]
357 ret
358 }
359
360 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
361 &self,
362 cf_name: &str,
363 key: K,
364 readopts: &ReadOptions,
365 ) -> bool {
366 match &self.storage {
367 Storage::Rocks(rocks) => {
370 rocks
371 .underlying
372 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
373 }
374 _ => true,
375 }
376 }
377
378 pub(crate) fn write_opt_internal(
379 &self,
380 batch: StorageWriteBatch,
381 write_options: &rocksdb::WriteOptions,
382 ) -> Result<(), TypedStoreError> {
383 fail_point!("batch-write-before");
384 let ret = match (&self.storage, batch) {
385 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
386 .underlying
387 .write_opt(batch, write_options)
388 .map_err(typed_store_err_from_rocks_err),
389 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
390 db.write(batch);
392 Ok(())
393 }
394 #[cfg(tidehunter)]
395 (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
396 db.write_batch(batch)
398 .map_err(typed_store_error_from_th_error)
399 }
400 _ => Err(TypedStoreError::RocksDBError(
401 "using invalid batch type for the database".to_string(),
402 )),
403 };
404 fail_point!("batch-write-after");
405 #[allow(clippy::let_and_return)]
406 ret
407 }
408
409 #[cfg(tidehunter)]
410 pub fn start_relocation(&self) -> anyhow::Result<()> {
411 if let Storage::TideHunter(db) = &self.storage {
412 db.start_relocation()?;
413 }
414 Ok(())
415 }
416
417 #[cfg(tidehunter)]
418 pub fn drop_cells_in_range(
419 &self,
420 ks: KeySpace,
421 from_inclusive: &[u8],
422 to_inclusive: &[u8],
423 ) -> anyhow::Result<()> {
424 if let Storage::TideHunter(db) = &self.storage {
425 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
426 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
427 } else {
428 panic!("drop_cells_in_range called on non-TideHunter storage");
429 }
430 Ok(())
431 }
432
433 pub fn compact_range_cf<K: AsRef<[u8]>>(
434 &self,
435 cf_name: &str,
436 start: Option<K>,
437 end: Option<K>,
438 ) {
439 if let Storage::Rocks(rocksdb) = &self.storage {
440 rocksdb
441 .underlying
442 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
443 }
444 }
445
446 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
447 if let Storage::Rocks(rocks) = &self.storage {
449 let checkpoint =
450 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
451 checkpoint
452 .create_checkpoint(path)
453 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
454 }
455 Ok(())
456 }
457
458 pub fn get_sampling_interval(&self) -> SamplingInterval {
459 self.metric_conf.read_sample_interval.new_from_self()
460 }
461
462 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
463 self.metric_conf.read_sample_interval.new_from_self()
464 }
465
466 pub fn write_sampling_interval(&self) -> SamplingInterval {
467 self.metric_conf.write_sample_interval.new_from_self()
468 }
469
470 pub fn iter_sampling_interval(&self) -> SamplingInterval {
471 self.metric_conf.iter_sample_interval.new_from_self()
472 }
473
474 fn db_name(&self) -> String {
475 let name = &self.metric_conf.db_name;
476 if name.is_empty() {
477 "default".to_string()
478 } else {
479 name.clone()
480 }
481 }
482
483 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
484 match &self.storage {
485 Storage::Rocks(rocks) => rocks.underlying.live_files(),
486 _ => Ok(vec![]),
487 }
488 }
489}
490
491fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
492 rocks_db
493 .underlying
494 .cf_handle(cf_name)
495 .expect("Map-keying column family should have been checked at DB creation")
496}
497
498fn rocks_cf_from_db<'a>(
499 db: &'a Database,
500 cf_name: &str,
501) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
502 match &db.storage {
503 Storage::Rocks(rocksdb) => Ok(rocksdb
504 .underlying
505 .cf_handle(cf_name)
506 .expect("Map-keying column family should have been checked at DB creation")),
507 _ => Err(TypedStoreError::RocksDBError(
508 "using invalid batch type for the database".to_string(),
509 )),
510 }
511}
512
513#[derive(Debug, Default)]
514pub struct MetricConf {
515 pub db_name: String,
516 pub read_sample_interval: SamplingInterval,
517 pub write_sample_interval: SamplingInterval,
518 pub iter_sample_interval: SamplingInterval,
519}
520
521impl MetricConf {
522 pub fn new(db_name: &str) -> Self {
523 if db_name.is_empty() {
524 error!("A meaningful db name should be used for metrics reporting.")
525 }
526 Self {
527 db_name: db_name.to_string(),
528 read_sample_interval: SamplingInterval::default(),
529 write_sample_interval: SamplingInterval::default(),
530 iter_sample_interval: SamplingInterval::default(),
531 }
532 }
533
534 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
535 Self {
536 db_name: self.db_name,
537 read_sample_interval: read_interval,
538 write_sample_interval: SamplingInterval::default(),
539 iter_sample_interval: SamplingInterval::default(),
540 }
541 }
542}
543const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
544const METRICS_ERROR: i64 = -1;
545
546#[derive(Clone, Debug)]
548pub struct DBMap<K, V> {
549 pub db: Arc<Database>,
550 _phantom: PhantomData<fn(K) -> V>,
551 column_family: ColumnFamily,
552 cf: String,
554 pub opts: ReadWriteOptions,
555 db_metrics: Arc<DBMetrics>,
556 get_sample_interval: SamplingInterval,
557 multiget_sample_interval: SamplingInterval,
558 write_sample_interval: SamplingInterval,
559 iter_sample_interval: SamplingInterval,
560 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
561}
562
563unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
564
565impl<K, V> DBMap<K, V> {
566 pub(crate) fn new(
567 db: Arc<Database>,
568 opts: &ReadWriteOptions,
569 opt_cf: &str,
570 column_family: ColumnFamily,
571 is_deprecated: bool,
572 ) -> Self {
573 let db_cloned = Arc::downgrade(&db.clone());
574 let db_metrics = DBMetrics::get();
575 let db_metrics_cloned = db_metrics.clone();
576 let cf = opt_cf.to_string();
577
578 let (sender, mut recv) = tokio::sync::oneshot::channel();
579 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
580 tokio::task::spawn(async move {
581 let mut interval =
582 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
583 loop {
584 tokio::select! {
585 _ = interval.tick() => {
586 if let Some(db) = db_cloned.upgrade() {
587 let cf = cf.clone();
588 let db_metrics = db_metrics.clone();
589 if let Err(e) = tokio::task::spawn_blocking(move || {
590 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
591 }).await {
592 error!("Failed to log metrics with error: {}", e);
593 }
594 }
595 }
596 _ = &mut recv => break,
597 }
598 }
599 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
600 });
601 }
602 DBMap {
603 db: db.clone(),
604 opts: opts.clone(),
605 _phantom: PhantomData,
606 column_family,
607 cf: opt_cf.to_string(),
608 db_metrics: db_metrics_cloned,
609 _metrics_task_cancel_handle: Arc::new(sender),
610 get_sample_interval: db.get_sampling_interval(),
611 multiget_sample_interval: db.multiget_sampling_interval(),
612 write_sample_interval: db.write_sampling_interval(),
613 iter_sample_interval: db.iter_sampling_interval(),
614 }
615 }
616
617 #[instrument(level = "debug", skip(db), err)]
620 pub fn reopen(
621 db: &Arc<Database>,
622 opt_cf: Option<&str>,
623 rw_options: &ReadWriteOptions,
624 is_deprecated: bool,
625 ) -> Result<Self, TypedStoreError> {
626 let cf_key = opt_cf
627 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
628 .to_owned();
629 Ok(DBMap::new(
630 db.clone(),
631 rw_options,
632 &cf_key,
633 ColumnFamily::Rocks(cf_key.to_string()),
634 is_deprecated,
635 ))
636 }
637
638 #[cfg(tidehunter)]
639 pub fn reopen_th(
640 db: Arc<Database>,
641 cf_name: &str,
642 ks: KeySpace,
643 prefix: Option<Vec<u8>>,
644 ) -> Self {
645 DBMap::new(
646 db,
647 &ReadWriteOptions::default(),
648 cf_name,
649 ColumnFamily::TideHunter((ks, prefix.clone())),
650 false,
651 )
652 }
653
654 pub fn cf_name(&self) -> &str {
655 &self.cf
656 }
657
658 pub fn batch(&self) -> DBBatch {
659 let batch = match &self.db.storage {
660 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
661 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
662 #[cfg(tidehunter)]
663 Storage::TideHunter(_) => {
664 StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
665 }
666 };
667 DBBatch::new(
668 &self.db,
669 batch,
670 &self.db_metrics,
671 &self.write_sample_interval,
672 )
673 }
674
675 pub fn flush(&self) -> Result<(), TypedStoreError> {
676 self.db.flush()
677 }
678
679 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
680 let from_buf = be_fix_int_ser(start);
681 let to_buf = be_fix_int_ser(end);
682 self.db
683 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
684 Ok(())
685 }
686
687 pub fn compact_range_raw(
688 &self,
689 cf_name: &str,
690 start: Vec<u8>,
691 end: Vec<u8>,
692 ) -> Result<(), TypedStoreError> {
693 self.db.compact_range_cf(cf_name, Some(start), Some(end));
694 Ok(())
695 }
696
697 #[cfg(tidehunter)]
698 pub fn drop_cells_in_range<J: Serialize>(
699 &self,
700 from_inclusive: &J,
701 to_inclusive: &J,
702 ) -> Result<(), TypedStoreError>
703 where
704 K: Serialize,
705 {
706 let from_buf = be_fix_int_ser(from_inclusive);
707 let to_buf = be_fix_int_ser(to_inclusive);
708 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
709 self.db
710 .drop_cells_in_range(*ks, &from_buf, &to_buf)
711 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
712 }
713 Ok(())
714 }
715
716 fn multi_get_pinned<J>(
718 &self,
719 keys: impl IntoIterator<Item = J>,
720 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
721 where
722 J: Borrow<K>,
723 K: Serialize,
724 {
725 let _timer = self
726 .db_metrics
727 .op_metrics
728 .rocksdb_multiget_latency_seconds
729 .with_label_values(&[&self.cf])
730 .start_timer();
731 let perf_ctx = if self.multiget_sample_interval.sample() {
732 Some(RocksDBPerfContext)
733 } else {
734 None
735 };
736 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
737 let results: Result<Vec<_>, TypedStoreError> = self
738 .db
739 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
740 .into_iter()
741 .collect();
742 let entries = results?;
743 let entry_size = entries
744 .iter()
745 .flatten()
746 .map(|entry| entry.len())
747 .sum::<usize>();
748 self.db_metrics
749 .op_metrics
750 .rocksdb_multiget_bytes
751 .with_label_values(&[&self.cf])
752 .observe(entry_size as f64);
753 if perf_ctx.is_some() {
754 self.db_metrics
755 .read_perf_ctx_metrics
756 .report_metrics(&self.cf);
757 }
758 Ok(entries)
759 }
760
761 fn get_rocksdb_int_property(
762 rocksdb: &RocksDB,
763 cf: &impl AsColumnFamilyRef,
764 property_name: &std::ffi::CStr,
765 ) -> Result<i64, TypedStoreError> {
766 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
767 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
768 Ok(None) => Ok(0),
769 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
770 }
771 }
772
773 fn report_rocksdb_metrics(
774 database: &Arc<Database>,
775 cf_name: &str,
776 db_metrics: &Arc<DBMetrics>,
777 ) {
778 let Storage::Rocks(rocksdb) = &database.storage else {
779 return;
780 };
781
782 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
783 tracing::warn!(
784 "unable to report metrics for cf {cf_name:?} in db {:?}",
785 database.db_name()
786 );
787 return;
788 };
789
790 db_metrics
791 .cf_metrics
792 .rocksdb_total_sst_files_size
793 .with_label_values(&[cf_name])
794 .set(
795 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
796 .unwrap_or(METRICS_ERROR),
797 );
798 db_metrics
799 .cf_metrics
800 .rocksdb_total_blob_files_size
801 .with_label_values(&[cf_name])
802 .set(
803 Self::get_rocksdb_int_property(
804 rocksdb,
805 &cf,
806 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
807 )
808 .unwrap_or(METRICS_ERROR),
809 );
810 let total_num_files: i64 = (0..=6)
813 .map(|level| {
814 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
815 .unwrap_or(METRICS_ERROR)
816 })
817 .sum();
818 db_metrics
819 .cf_metrics
820 .rocksdb_total_num_files
821 .with_label_values(&[cf_name])
822 .set(total_num_files);
823 db_metrics
824 .cf_metrics
825 .rocksdb_num_level0_files
826 .with_label_values(&[cf_name])
827 .set(
828 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
829 .unwrap_or(METRICS_ERROR),
830 );
831 db_metrics
832 .cf_metrics
833 .rocksdb_current_size_active_mem_tables
834 .with_label_values(&[cf_name])
835 .set(
836 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
837 .unwrap_or(METRICS_ERROR),
838 );
839 db_metrics
840 .cf_metrics
841 .rocksdb_size_all_mem_tables
842 .with_label_values(&[cf_name])
843 .set(
844 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
845 .unwrap_or(METRICS_ERROR),
846 );
847 db_metrics
848 .cf_metrics
849 .rocksdb_num_snapshots
850 .with_label_values(&[cf_name])
851 .set(
852 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
853 .unwrap_or(METRICS_ERROR),
854 );
855 db_metrics
856 .cf_metrics
857 .rocksdb_oldest_snapshot_time
858 .with_label_values(&[cf_name])
859 .set(
860 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
861 .unwrap_or(METRICS_ERROR),
862 );
863 db_metrics
864 .cf_metrics
865 .rocksdb_actual_delayed_write_rate
866 .with_label_values(&[cf_name])
867 .set(
868 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
869 .unwrap_or(METRICS_ERROR),
870 );
871 db_metrics
872 .cf_metrics
873 .rocksdb_is_write_stopped
874 .with_label_values(&[cf_name])
875 .set(
876 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
877 .unwrap_or(METRICS_ERROR),
878 );
879 db_metrics
880 .cf_metrics
881 .rocksdb_block_cache_capacity
882 .with_label_values(&[cf_name])
883 .set(
884 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
885 .unwrap_or(METRICS_ERROR),
886 );
887 db_metrics
888 .cf_metrics
889 .rocksdb_block_cache_usage
890 .with_label_values(&[cf_name])
891 .set(
892 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
893 .unwrap_or(METRICS_ERROR),
894 );
895 db_metrics
896 .cf_metrics
897 .rocksdb_block_cache_pinned_usage
898 .with_label_values(&[cf_name])
899 .set(
900 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
901 .unwrap_or(METRICS_ERROR),
902 );
903 db_metrics
904 .cf_metrics
905 .rocksdb_estimate_table_readers_mem
906 .with_label_values(&[cf_name])
907 .set(
908 Self::get_rocksdb_int_property(
909 rocksdb,
910 &cf,
911 properties::ESTIMATE_TABLE_READERS_MEM,
912 )
913 .unwrap_or(METRICS_ERROR),
914 );
915 db_metrics
916 .cf_metrics
917 .rocksdb_estimated_num_keys
918 .with_label_values(&[cf_name])
919 .set(
920 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
921 .unwrap_or(METRICS_ERROR),
922 );
923 db_metrics
924 .cf_metrics
925 .rocksdb_num_immutable_mem_tables
926 .with_label_values(&[cf_name])
927 .set(
928 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
929 .unwrap_or(METRICS_ERROR),
930 );
931 db_metrics
932 .cf_metrics
933 .rocksdb_mem_table_flush_pending
934 .with_label_values(&[cf_name])
935 .set(
936 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
937 .unwrap_or(METRICS_ERROR),
938 );
939 db_metrics
940 .cf_metrics
941 .rocksdb_compaction_pending
942 .with_label_values(&[cf_name])
943 .set(
944 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
945 .unwrap_or(METRICS_ERROR),
946 );
947 db_metrics
948 .cf_metrics
949 .rocksdb_estimate_pending_compaction_bytes
950 .with_label_values(&[cf_name])
951 .set(
952 Self::get_rocksdb_int_property(
953 rocksdb,
954 &cf,
955 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
956 )
957 .unwrap_or(METRICS_ERROR),
958 );
959 db_metrics
960 .cf_metrics
961 .rocksdb_num_running_compactions
962 .with_label_values(&[cf_name])
963 .set(
964 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
965 .unwrap_or(METRICS_ERROR),
966 );
967 db_metrics
968 .cf_metrics
969 .rocksdb_num_running_flushes
970 .with_label_values(&[cf_name])
971 .set(
972 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
973 .unwrap_or(METRICS_ERROR),
974 );
975 db_metrics
976 .cf_metrics
977 .rocksdb_estimate_oldest_key_time
978 .with_label_values(&[cf_name])
979 .set(
980 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
981 .unwrap_or(METRICS_ERROR),
982 );
983 db_metrics
984 .cf_metrics
985 .rocksdb_background_errors
986 .with_label_values(&[cf_name])
987 .set(
988 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
989 .unwrap_or(METRICS_ERROR),
990 );
991 db_metrics
992 .cf_metrics
993 .rocksdb_base_level
994 .with_label_values(&[cf_name])
995 .set(
996 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
997 .unwrap_or(METRICS_ERROR),
998 );
999 }
1000
1001 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1002 self.db.checkpoint(path)
1003 }
1004
1005 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1006 where
1007 K: Serialize + DeserializeOwned,
1008 V: Serialize + DeserializeOwned,
1009 {
1010 let mut num_keys = 0;
1011 let mut key_bytes_total = 0;
1012 let mut value_bytes_total = 0;
1013 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1014 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1015 for item in self.safe_iter() {
1016 let (key, value) = item?;
1017 num_keys += 1;
1018 let key_len = be_fix_int_ser(key.borrow()).len();
1019 let value_len = bcs::to_bytes(value.borrow())?.len();
1020 key_bytes_total += key_len;
1021 value_bytes_total += value_len;
1022 key_hist.record(key_len as u64)?;
1023 value_hist.record(value_len as u64)?;
1024 }
1025 Ok(TableSummary {
1026 num_keys,
1027 key_bytes_total,
1028 value_bytes_total,
1029 key_hist,
1030 value_hist,
1031 })
1032 }
1033
1034 fn start_iter_timer(&self) -> HistogramTimer {
1035 self.db_metrics
1036 .op_metrics
1037 .rocksdb_iter_latency_seconds
1038 .with_label_values(&[&self.cf])
1039 .start_timer()
1040 }
1041
1042 fn create_iter_context(
1044 &self,
1045 ) -> (
1046 Option<HistogramTimer>,
1047 Option<Histogram>,
1048 Option<Histogram>,
1049 Option<RocksDBPerfContext>,
1050 ) {
1051 let timer = self.start_iter_timer();
1052 let bytes_scanned = self
1053 .db_metrics
1054 .op_metrics
1055 .rocksdb_iter_bytes
1056 .with_label_values(&[&self.cf]);
1057 let keys_scanned = self
1058 .db_metrics
1059 .op_metrics
1060 .rocksdb_iter_keys
1061 .with_label_values(&[&self.cf]);
1062 let perf_ctx = if self.iter_sample_interval.sample() {
1063 Some(RocksDBPerfContext)
1064 } else {
1065 None
1066 };
1067 (
1068 Some(timer),
1069 Some(bytes_scanned),
1070 Some(keys_scanned),
1071 perf_ctx,
1072 )
1073 }
1074
1075 #[allow(clippy::complexity)]
1078 pub fn reversed_safe_iter_with_bounds(
1079 &self,
1080 lower_bound: Option<K>,
1081 upper_bound: Option<K>,
1082 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1083 where
1084 K: Serialize + DeserializeOwned,
1085 V: Serialize + DeserializeOwned,
1086 {
1087 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1088 lower_bound
1089 .as_ref()
1090 .map(Bound::Included)
1091 .unwrap_or(Bound::Unbounded),
1092 upper_bound
1093 .as_ref()
1094 .map(Bound::Included)
1095 .unwrap_or(Bound::Unbounded),
1096 ));
1097 match &self.db.storage {
1098 Storage::Rocks(db) => {
1099 let readopts = rocks_util::apply_range_bounds(
1100 self.opts.readopts(),
1101 it_lower_bound,
1102 it_upper_bound,
1103 );
1104 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1105 let db_iter = db
1106 .underlying
1107 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1108 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1109 let iter = SafeIter::new(
1110 self.cf.clone(),
1111 db_iter,
1112 _timer,
1113 _perf_ctx,
1114 bytes_scanned,
1115 keys_scanned,
1116 Some(self.db_metrics.clone()),
1117 );
1118 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1119 }
1120 Storage::InMemory(db) => {
1121 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1122 }
1123 #[cfg(tidehunter)]
1124 Storage::TideHunter(db) => match &self.column_family {
1125 ColumnFamily::TideHunter((ks, prefix)) => {
1126 let mut iter = db.iterator(*ks);
1127 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1128 iter.reverse();
1129 Ok(Box::new(transform_th_iterator(
1130 iter,
1131 prefix,
1132 self.start_iter_timer(),
1133 )))
1134 }
1135 _ => unreachable!("storage backend invariant violation"),
1136 },
1137 }
1138 }
1139}
1140
1141pub enum StorageWriteBatch {
1142 Rocks(rocksdb::WriteBatch),
1143 InMemory(InMemoryBatch),
1144 #[cfg(tidehunter)]
1145 TideHunter(tidehunter::batch::WriteBatch),
1146}
1147
1148pub struct DBBatch {
1200 database: Arc<Database>,
1201 batch: StorageWriteBatch,
1202 db_metrics: Arc<DBMetrics>,
1203 write_sample_interval: SamplingInterval,
1204}
1205
1206impl DBBatch {
1207 pub fn new(
1211 dbref: &Arc<Database>,
1212 batch: StorageWriteBatch,
1213 db_metrics: &Arc<DBMetrics>,
1214 write_sample_interval: &SamplingInterval,
1215 ) -> Self {
1216 DBBatch {
1217 database: dbref.clone(),
1218 batch,
1219 db_metrics: db_metrics.clone(),
1220 write_sample_interval: write_sample_interval.clone(),
1221 }
1222 }
1223
1224 #[instrument(level = "trace", skip_all, err)]
1226 pub fn write(self) -> Result<(), TypedStoreError> {
1227 let mut write_options = rocksdb::WriteOptions::default();
1228
1229 if write_sync_enabled() {
1230 write_options.set_sync(true);
1231 }
1232
1233 self.write_opt(write_options)
1234 }
1235
1236 #[instrument(level = "trace", skip_all, err)]
1238 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1239 let db_name = self.database.db_name();
1240 let timer = self
1241 .db_metrics
1242 .op_metrics
1243 .rocksdb_batch_commit_latency_seconds
1244 .with_label_values(&[&db_name])
1245 .start_timer();
1246 let batch_size = self.size_in_bytes();
1247
1248 let perf_ctx = if self.write_sample_interval.sample() {
1249 Some(RocksDBPerfContext)
1250 } else {
1251 None
1252 };
1253
1254 self.database
1255 .write_opt_internal(self.batch, &write_options)?;
1256
1257 self.db_metrics
1258 .op_metrics
1259 .rocksdb_batch_commit_bytes
1260 .with_label_values(&[&db_name])
1261 .observe(batch_size as f64);
1262
1263 if perf_ctx.is_some() {
1264 self.db_metrics
1265 .write_perf_ctx_metrics
1266 .report_metrics(&db_name);
1267 }
1268 let elapsed = timer.stop_and_record();
1269 if elapsed > 1.0 {
1270 warn!(?elapsed, ?db_name, "very slow batch write");
1271 self.db_metrics
1272 .op_metrics
1273 .rocksdb_very_slow_batch_writes_count
1274 .with_label_values(&[&db_name])
1275 .inc();
1276 self.db_metrics
1277 .op_metrics
1278 .rocksdb_very_slow_batch_writes_duration_ms
1279 .with_label_values(&[&db_name])
1280 .inc_by((elapsed * 1000.0) as u64);
1281 }
1282 Ok(())
1283 }
1284
1285 pub fn size_in_bytes(&self) -> usize {
1286 match self.batch {
1287 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1288 StorageWriteBatch::InMemory(_) => 0,
1289 #[cfg(tidehunter)]
1291 StorageWriteBatch::TideHunter(_) => 0,
1292 }
1293 }
1294
1295 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1296 &mut self,
1297 db: &DBMap<K, V>,
1298 purged_vals: impl IntoIterator<Item = J>,
1299 ) -> Result<(), TypedStoreError> {
1300 if !Arc::ptr_eq(&db.db, &self.database) {
1301 return Err(TypedStoreError::CrossDBBatch);
1302 }
1303
1304 purged_vals
1305 .into_iter()
1306 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1307 let k_buf = be_fix_int_ser(k.borrow());
1308 match (&mut self.batch, &db.column_family) {
1309 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1310 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1311 }
1312 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1313 b.delete_cf(name, k_buf)
1314 }
1315 #[cfg(tidehunter)]
1316 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1317 b.delete(*ks, transform_th_key(&k_buf, prefix))
1318 }
1319 _ => Err(TypedStoreError::RocksDBError(
1320 "typed store invariant violation".to_string(),
1321 ))?,
1322 }
1323 Ok(())
1324 })?;
1325 Ok(())
1326 }
1327
1328 pub fn schedule_delete_range<K: Serialize, V>(
1338 &mut self,
1339 db: &DBMap<K, V>,
1340 from: &K,
1341 to: &K,
1342 ) -> Result<(), TypedStoreError> {
1343 if !Arc::ptr_eq(&db.db, &self.database) {
1344 return Err(TypedStoreError::CrossDBBatch);
1345 }
1346
1347 let from_buf = be_fix_int_ser(from);
1348 let to_buf = be_fix_int_ser(to);
1349
1350 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1351 b.delete_range_cf(
1352 &rocks_cf_from_db(&self.database, db.cf_name())?,
1353 from_buf,
1354 to_buf,
1355 );
1356 }
1357 Ok(())
1358 }
1359
1360 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1362 &mut self,
1363 db: &DBMap<K, V>,
1364 new_vals: impl IntoIterator<Item = (J, U)>,
1365 ) -> Result<&mut Self, TypedStoreError> {
1366 if !Arc::ptr_eq(&db.db, &self.database) {
1367 return Err(TypedStoreError::CrossDBBatch);
1368 }
1369 let mut total = 0usize;
1370 new_vals
1371 .into_iter()
1372 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1373 let k_buf = be_fix_int_ser(k.borrow());
1374 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1375 total += k_buf.len() + v_buf.len();
1376 if db.opts.log_value_hash {
1377 let key_hash = default_hash(&k_buf);
1378 let value_hash = default_hash(&v_buf);
1379 debug!(
1380 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1381 db.cf_name(),
1382 key_hash,
1383 value_hash
1384 );
1385 }
1386 match (&mut self.batch, &db.column_family) {
1387 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1388 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1389 }
1390 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1391 b.put_cf(name, k_buf, v_buf)
1392 }
1393 #[cfg(tidehunter)]
1394 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1395 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1396 }
1397 _ => Err(TypedStoreError::RocksDBError(
1398 "typed store invariant violation".to_string(),
1399 ))?,
1400 }
1401 Ok(())
1402 })?;
1403 self.db_metrics
1404 .op_metrics
1405 .rocksdb_batch_put_bytes
1406 .with_label_values(&[&db.cf])
1407 .observe(total as f64);
1408 Ok(self)
1409 }
1410
1411 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1412 &mut self,
1413 db: &DBMap<K, V>,
1414 new_vals: impl IntoIterator<Item = (J, U)>,
1415 ) -> Result<&mut Self, TypedStoreError> {
1416 if !Arc::ptr_eq(&db.db, &self.database) {
1417 return Err(TypedStoreError::CrossDBBatch);
1418 }
1419 new_vals
1420 .into_iter()
1421 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1422 let k_buf = be_fix_int_ser(k.borrow());
1423 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1424 match &mut self.batch {
1425 StorageWriteBatch::Rocks(b) => b.merge_cf(
1426 &rocks_cf_from_db(&self.database, db.cf_name())?,
1427 k_buf,
1428 v_buf,
1429 ),
1430 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1431 }
1432 Ok(())
1433 })?;
1434 Ok(self)
1435 }
1436}
1437
1438impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1439where
1440 K: Serialize + DeserializeOwned,
1441 V: Serialize + DeserializeOwned,
1442{
1443 type Error = TypedStoreError;
1444
1445 #[instrument(level = "trace", skip_all, err)]
1446 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1447 let key_buf = be_fix_int_ser(key);
1448 let readopts = self.opts.readopts();
1449 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1450 && self
1451 .db
1452 .get(&self.column_family, &key_buf, &readopts)?
1453 .is_some())
1454 }
1455
1456 #[instrument(level = "trace", skip_all, err)]
1457 fn multi_contains_keys<J>(
1458 &self,
1459 keys: impl IntoIterator<Item = J>,
1460 ) -> Result<Vec<bool>, Self::Error>
1461 where
1462 J: Borrow<K>,
1463 {
1464 let values = self.multi_get_pinned(keys)?;
1465 Ok(values.into_iter().map(|v| v.is_some()).collect())
1466 }
1467
1468 #[instrument(level = "trace", skip_all, err)]
1469 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1470 let _timer = self
1471 .db_metrics
1472 .op_metrics
1473 .rocksdb_get_latency_seconds
1474 .with_label_values(&[&self.cf])
1475 .start_timer();
1476 let perf_ctx = if self.get_sample_interval.sample() {
1477 Some(RocksDBPerfContext)
1478 } else {
1479 None
1480 };
1481 let key_buf = be_fix_int_ser(key);
1482 let res = self
1483 .db
1484 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1485 self.db_metrics
1486 .op_metrics
1487 .rocksdb_get_bytes
1488 .with_label_values(&[&self.cf])
1489 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1490 if perf_ctx.is_some() {
1491 self.db_metrics
1492 .read_perf_ctx_metrics
1493 .report_metrics(&self.cf);
1494 }
1495 match res {
1496 Some(data) => {
1497 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1498 if value.is_err() {
1499 let key_hash = default_hash(&key_buf);
1500 let value_hash = default_hash(&data);
1501 debug_fatal!(
1502 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1503 self.cf_name(),
1504 key_hash,
1505 value_hash,
1506 value.as_ref().err().unwrap()
1507 );
1508 }
1509 Ok(Some(value?))
1510 }
1511 None => Ok(None),
1512 }
1513 }
1514
1515 #[instrument(level = "trace", skip_all, err)]
1516 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1517 let timer = self
1518 .db_metrics
1519 .op_metrics
1520 .rocksdb_put_latency_seconds
1521 .with_label_values(&[&self.cf])
1522 .start_timer();
1523 let perf_ctx = if self.write_sample_interval.sample() {
1524 Some(RocksDBPerfContext)
1525 } else {
1526 None
1527 };
1528 let key_buf = be_fix_int_ser(key);
1529 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1530 self.db_metrics
1531 .op_metrics
1532 .rocksdb_put_bytes
1533 .with_label_values(&[&self.cf])
1534 .observe((key_buf.len() + value_buf.len()) as f64);
1535 if perf_ctx.is_some() {
1536 self.db_metrics
1537 .write_perf_ctx_metrics
1538 .report_metrics(&self.cf);
1539 }
1540 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1541
1542 let elapsed = timer.stop_and_record();
1543 if elapsed > 1.0 {
1544 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1545 self.db_metrics
1546 .op_metrics
1547 .rocksdb_very_slow_puts_count
1548 .with_label_values(&[&self.cf])
1549 .inc();
1550 self.db_metrics
1551 .op_metrics
1552 .rocksdb_very_slow_puts_duration_ms
1553 .with_label_values(&[&self.cf])
1554 .inc_by((elapsed * 1000.0) as u64);
1555 }
1556
1557 Ok(())
1558 }
1559
1560 #[instrument(level = "trace", skip_all, err)]
1561 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1562 let _timer = self
1563 .db_metrics
1564 .op_metrics
1565 .rocksdb_delete_latency_seconds
1566 .with_label_values(&[&self.cf])
1567 .start_timer();
1568 let perf_ctx = if self.write_sample_interval.sample() {
1569 Some(RocksDBPerfContext)
1570 } else {
1571 None
1572 };
1573 let key_buf = be_fix_int_ser(key);
1574 self.db.delete_cf(&self.column_family, key_buf)?;
1575 self.db_metrics
1576 .op_metrics
1577 .rocksdb_deletes
1578 .with_label_values(&[&self.cf])
1579 .inc();
1580 if perf_ctx.is_some() {
1581 self.db_metrics
1582 .write_perf_ctx_metrics
1583 .report_metrics(&self.cf);
1584 }
1585 Ok(())
1586 }
1587
1588 #[instrument(level = "trace", skip_all, err)]
1597 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1598 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1599 let last_key = self
1600 .reversed_safe_iter_with_bounds(None, None)?
1601 .next()
1602 .transpose()?
1603 .map(|(k, _v)| k);
1604 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1605 let mut batch = self.batch();
1606 batch.schedule_delete_range(self, &first_key, &last_key)?;
1607 batch.write()?;
1608 }
1609 Ok(())
1610 }
1611
1612 fn is_empty(&self) -> bool {
1613 self.safe_iter().next().is_none()
1614 }
1615
1616 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1617 match &self.db.storage {
1618 Storage::Rocks(db) => {
1619 let db_iter = db
1620 .underlying
1621 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1622 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1623 Box::new(SafeIter::new(
1624 self.cf.clone(),
1625 db_iter,
1626 _timer,
1627 _perf_ctx,
1628 bytes_scanned,
1629 keys_scanned,
1630 Some(self.db_metrics.clone()),
1631 ))
1632 }
1633 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1634 #[cfg(tidehunter)]
1635 Storage::TideHunter(db) => match &self.column_family {
1636 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1637 db.iterator(*ks),
1638 prefix,
1639 self.start_iter_timer(),
1640 )),
1641 _ => unreachable!("storage backend invariant violation"),
1642 },
1643 }
1644 }
1645
1646 fn safe_iter_with_bounds(
1647 &'a self,
1648 lower_bound: Option<K>,
1649 upper_bound: Option<K>,
1650 ) -> DbIterator<'a, (K, V)> {
1651 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1652 match &self.db.storage {
1653 Storage::Rocks(db) => {
1654 let readopts =
1655 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1656 let db_iter = db
1657 .underlying
1658 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1659 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1660 Box::new(SafeIter::new(
1661 self.cf.clone(),
1662 db_iter,
1663 _timer,
1664 _perf_ctx,
1665 bytes_scanned,
1666 keys_scanned,
1667 Some(self.db_metrics.clone()),
1668 ))
1669 }
1670 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1671 #[cfg(tidehunter)]
1672 Storage::TideHunter(db) => match &self.column_family {
1673 ColumnFamily::TideHunter((ks, prefix)) => {
1674 let mut iter = db.iterator(*ks);
1675 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1676 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1677 }
1678 _ => unreachable!("storage backend invariant violation"),
1679 },
1680 }
1681 }
1682
1683 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1684 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1685 match &self.db.storage {
1686 Storage::Rocks(db) => {
1687 let readopts =
1688 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1689 let db_iter = db
1690 .underlying
1691 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1692 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1693 Box::new(SafeIter::new(
1694 self.cf.clone(),
1695 db_iter,
1696 _timer,
1697 _perf_ctx,
1698 bytes_scanned,
1699 keys_scanned,
1700 Some(self.db_metrics.clone()),
1701 ))
1702 }
1703 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1704 #[cfg(tidehunter)]
1705 Storage::TideHunter(db) => match &self.column_family {
1706 ColumnFamily::TideHunter((ks, prefix)) => {
1707 let mut iter = db.iterator(*ks);
1708 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1709 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1710 }
1711 _ => unreachable!("storage backend invariant violation"),
1712 },
1713 }
1714 }
1715
1716 #[instrument(level = "trace", skip_all, err)]
1718 fn multi_get<J>(
1719 &self,
1720 keys: impl IntoIterator<Item = J>,
1721 ) -> Result<Vec<Option<V>>, TypedStoreError>
1722 where
1723 J: Borrow<K>,
1724 {
1725 let results = self.multi_get_pinned(keys)?;
1726 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1727 .into_iter()
1728 .map(|value_byte| match value_byte {
1729 Some(data) => Ok(Some(
1730 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1731 )),
1732 None => Ok(None),
1733 })
1734 .collect();
1735
1736 values_parsed
1737 }
1738
1739 #[instrument(level = "trace", skip_all, err)]
1741 fn multi_insert<J, U>(
1742 &self,
1743 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1744 ) -> Result<(), Self::Error>
1745 where
1746 J: Borrow<K>,
1747 U: Borrow<V>,
1748 {
1749 let mut batch = self.batch();
1750 batch.insert_batch(self, key_val_pairs)?;
1751 batch.write()
1752 }
1753
1754 #[instrument(level = "trace", skip_all, err)]
1756 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1757 where
1758 J: Borrow<K>,
1759 {
1760 let mut batch = self.batch();
1761 batch.delete_batch(self, keys)?;
1762 batch.write()
1763 }
1764
1765 #[instrument(level = "trace", skip_all, err)]
1767 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1768 if let Storage::Rocks(rocks) = &self.db.storage {
1769 rocks
1770 .underlying
1771 .try_catch_up_with_primary()
1772 .map_err(typed_store_err_from_rocks_err)?;
1773 }
1774 Ok(())
1775 }
1776}
1777
1778#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1780pub fn open_cf_opts<P: AsRef<Path>>(
1781 path: P,
1782 db_options: Option<rocksdb::Options>,
1783 metric_conf: MetricConf,
1784 opt_cfs: &[(&str, rocksdb::Options)],
1785) -> Result<Arc<Database>, TypedStoreError> {
1786 let path = path.as_ref();
1787 ensure_database_type(path, StorageType::Rocks)
1788 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1789 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1798 nondeterministic!({
1799 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1800 options.create_if_missing(true);
1801 options.create_missing_column_families(true);
1802 let rocksdb = {
1803 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1804 &options,
1805 path,
1806 cfs.into_iter()
1807 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1808 )
1809 .map_err(typed_store_err_from_rocks_err)?
1810 };
1811 Ok(Arc::new(Database::new(
1812 Storage::Rocks(RocksDB {
1813 underlying: rocksdb,
1814 }),
1815 metric_conf,
1816 None,
1817 )))
1818 })
1819}
1820
1821pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1823 primary_path: P,
1824 secondary_path: Option<P>,
1825 db_options: Option<rocksdb::Options>,
1826 metric_conf: MetricConf,
1827 opt_cfs: &[(&str, rocksdb::Options)],
1828) -> Result<Arc<Database>, TypedStoreError> {
1829 let primary_path = primary_path.as_ref();
1830 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1831 nondeterministic!({
1833 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1835
1836 fdlimit::raise_fd_limit();
1837 options.set_max_open_files(-1);
1839
1840 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1841 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1842 .ok()
1843 .unwrap_or_default();
1844
1845 let default_db_options = default_db_options();
1846 for cf_key in cfs.iter() {
1848 if !opt_cfs.contains_key(&cf_key[..]) {
1849 opt_cfs.insert(cf_key, default_db_options.options.clone());
1850 }
1851 }
1852
1853 let primary_path = primary_path.to_path_buf();
1854 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1855 let mut s = primary_path.clone();
1856 s.pop();
1857 s.push("SECONDARY");
1858 s.as_path().to_path_buf()
1859 });
1860
1861 ensure_database_type(&primary_path, StorageType::Rocks)
1862 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1863 ensure_database_type(&secondary_path, StorageType::Rocks)
1864 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1865
1866 let rocksdb = {
1867 options.create_if_missing(true);
1868 options.create_missing_column_families(true);
1869 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1870 &options,
1871 &primary_path,
1872 &secondary_path,
1873 opt_cfs
1874 .iter()
1875 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1876 )
1877 .map_err(typed_store_err_from_rocks_err)?;
1878 db.try_catch_up_with_primary()
1879 .map_err(typed_store_err_from_rocks_err)?;
1880 db
1881 };
1882 Ok(Arc::new(Database::new(
1883 Storage::Rocks(RocksDB {
1884 underlying: rocksdb,
1885 }),
1886 metric_conf,
1887 None,
1888 )))
1889 })
1890}
1891
1892#[cfg(not(tidehunter))]
1894pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1895 let mut backoff = backoff::ExponentialBackoff {
1896 max_elapsed_time: Some(timeout),
1897 ..Default::default()
1898 };
1899 loop {
1900 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1901 Ok(()) => return Ok(()),
1902 Err(err) => match backoff.next_backoff() {
1903 Some(duration) => tokio::time::sleep(duration).await,
1904 None => return Err(err),
1905 },
1906 }
1907 }
1908}
1909
1910#[cfg(tidehunter)]
1911pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1912 std::fs::remove_dir_all(path)
1913}
1914
1915fn populate_missing_cfs(
1916 input_cfs: &[(&str, rocksdb::Options)],
1917 path: &Path,
1918) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1919 let mut cfs = vec![];
1920 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1921 let existing_cfs =
1922 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1923 .ok()
1924 .unwrap_or_default();
1925
1926 for cf_name in existing_cfs {
1927 if !input_cf_index.contains(&cf_name[..]) {
1928 cfs.push((cf_name, rocksdb::Options::default()));
1929 }
1930 }
1931 cfs.extend(
1932 input_cfs
1933 .iter()
1934 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1935 );
1936 Ok(cfs)
1937}
1938
1939fn default_hash(value: &[u8]) -> Digest<32> {
1940 let mut hasher = fastcrypto::hash::Blake2b256::default();
1941 hasher.update(value);
1942 hasher.finalize()
1943}