1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn multi_get<I, K>(
231 &self,
232 cf: &ColumnFamily,
233 keys: I,
234 readopts: &ReadOptions,
235 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236 where
237 I: IntoIterator<Item = K>,
238 K: AsRef<[u8]>,
239 {
240 match (&self.storage, cf) {
241 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242 let keys_vec: Vec<K> = keys.into_iter().collect();
243 let res = db.underlying.batched_multi_get_cf_opt(
244 &cf.rocks_cf(db),
245 keys_vec.iter(),
246 false,
247 readopts,
248 );
249 res.into_iter()
250 .map(|r| {
251 r.map_err(typed_store_err_from_rocks_err)
252 .map(|item| item.map(GetResult::Rocks))
253 })
254 .collect()
255 }
256 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257 .multi_get(cf_name, keys)
258 .into_iter()
259 .map(|r| Ok(r.map(GetResult::InMemory)))
260 .collect(),
261 #[cfg(tidehunter)]
262 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263 let res = keys.into_iter().map(|k| {
264 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265 .map_err(typed_store_error_from_th_error)
266 });
267 res.into_iter()
268 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269 .collect()
270 }
271 _ => unreachable!("typed store invariant violation"),
272 }
273 }
274
275 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276 match &self.storage {
277 Storage::Rocks(db) => db.underlying.drop_cf(name),
278 Storage::InMemory(db) => {
279 db.drop_cf(name);
280 Ok(())
281 }
282 #[cfg(tidehunter)]
283 Storage::TideHunter(_) => {
284 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285 }
286 }
287 }
288
289 pub fn delete_file_in_range<K: AsRef<[u8]>>(
290 &self,
291 cf: &impl AsColumnFamilyRef,
292 from: K,
293 to: K,
294 ) -> Result<(), rocksdb::Error> {
295 match &self.storage {
296 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298 }
299 }
300
301 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302 fail_point!("delete-cf-before");
303 let ret = match (&self.storage, cf) {
304 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305 .underlying
306 .delete_cf(&cf.rocks_cf(db), key)
307 .map_err(typed_store_err_from_rocks_err),
308 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309 db.delete(cf_name, key.as_ref());
310 Ok(())
311 }
312 #[cfg(tidehunter)]
313 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314 .remove(*ks, transform_th_key(key.as_ref(), prefix))
315 .map_err(typed_store_error_from_th_error),
316 _ => Err(TypedStoreError::RocksDBError(
317 "typed store invariant violation".to_string(),
318 )),
319 };
320 fail_point!("delete-cf-after");
321 #[allow(clippy::let_and_return)]
322 ret
323 }
324
325 pub fn path_for_pruning(&self) -> &Path {
326 match &self.storage {
327 Storage::Rocks(rocks) => rocks.underlying.path(),
328 _ => unimplemented!("method is only supported for rocksdb backend"),
329 }
330 }
331
332 fn put_cf(
333 &self,
334 cf: &ColumnFamily,
335 key: Vec<u8>,
336 value: Vec<u8>,
337 ) -> Result<(), TypedStoreError> {
338 fail_point!("put-cf-before");
339 let ret = match (&self.storage, cf) {
340 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341 .underlying
342 .put_cf(&cf.rocks_cf(db), key, value)
343 .map_err(typed_store_err_from_rocks_err),
344 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345 db.put(cf_name, key, value);
346 Ok(())
347 }
348 #[cfg(tidehunter)]
349 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350 .insert(*ks, transform_th_key(&key, prefix), value)
351 .map_err(typed_store_error_from_th_error),
352 _ => Err(TypedStoreError::RocksDBError(
353 "typed store invariant violation".to_string(),
354 )),
355 };
356 fail_point!("put-cf-after");
357 #[allow(clippy::let_and_return)]
358 ret
359 }
360
361 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362 &self,
363 cf_name: &str,
364 key: K,
365 readopts: &ReadOptions,
366 ) -> bool {
367 match &self.storage {
368 Storage::Rocks(rocks) => {
371 rocks
372 .underlying
373 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374 }
375 _ => true,
376 }
377 }
378
379 pub(crate) fn write_opt_internal(
380 &self,
381 batch: StorageWriteBatch,
382 write_options: &rocksdb::WriteOptions,
383 ) -> Result<(), TypedStoreError> {
384 fail_point!("batch-write-before");
385 let ret = match (&self.storage, batch) {
386 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387 .underlying
388 .write_opt(batch, write_options)
389 .map_err(typed_store_err_from_rocks_err),
390 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391 db.write(batch);
393 Ok(())
394 }
395 #[cfg(tidehunter)]
396 (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
397 db.write_batch(batch)
399 .map_err(typed_store_error_from_th_error)
400 }
401 _ => Err(TypedStoreError::RocksDBError(
402 "using invalid batch type for the database".to_string(),
403 )),
404 };
405 fail_point!("batch-write-after");
406 #[allow(clippy::let_and_return)]
407 ret
408 }
409
410 #[cfg(tidehunter)]
411 pub fn start_relocation(&self) -> anyhow::Result<()> {
412 if let Storage::TideHunter(db) = &self.storage {
413 db.start_relocation()?;
414 }
415 Ok(())
416 }
417
418 #[cfg(tidehunter)]
419 pub fn drop_cells_in_range(
420 &self,
421 ks: KeySpace,
422 from_inclusive: &[u8],
423 to_inclusive: &[u8],
424 ) -> anyhow::Result<()> {
425 if let Storage::TideHunter(db) = &self.storage {
426 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
427 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
428 } else {
429 panic!("drop_cells_in_range called on non-TideHunter storage");
430 }
431 Ok(())
432 }
433
434 pub fn compact_range_cf<K: AsRef<[u8]>>(
435 &self,
436 cf_name: &str,
437 start: Option<K>,
438 end: Option<K>,
439 ) {
440 if let Storage::Rocks(rocksdb) = &self.storage {
441 rocksdb
442 .underlying
443 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
444 }
445 }
446
447 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
448 if let Storage::Rocks(rocks) = &self.storage {
450 let checkpoint =
451 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
452 checkpoint
453 .create_checkpoint(path)
454 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
455 }
456 Ok(())
457 }
458
459 pub fn get_sampling_interval(&self) -> SamplingInterval {
460 self.metric_conf.read_sample_interval.new_from_self()
461 }
462
463 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
464 self.metric_conf.read_sample_interval.new_from_self()
465 }
466
467 pub fn write_sampling_interval(&self) -> SamplingInterval {
468 self.metric_conf.write_sample_interval.new_from_self()
469 }
470
471 pub fn iter_sampling_interval(&self) -> SamplingInterval {
472 self.metric_conf.iter_sample_interval.new_from_self()
473 }
474
475 fn db_name(&self) -> String {
476 let name = &self.metric_conf.db_name;
477 if name.is_empty() {
478 "default".to_string()
479 } else {
480 name.clone()
481 }
482 }
483
484 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
485 match &self.storage {
486 Storage::Rocks(rocks) => rocks.underlying.live_files(),
487 _ => Ok(vec![]),
488 }
489 }
490}
491
492fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
493 rocks_db
494 .underlying
495 .cf_handle(cf_name)
496 .expect("Map-keying column family should have been checked at DB creation")
497}
498
499fn rocks_cf_from_db<'a>(
500 db: &'a Database,
501 cf_name: &str,
502) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
503 match &db.storage {
504 Storage::Rocks(rocksdb) => Ok(rocksdb
505 .underlying
506 .cf_handle(cf_name)
507 .expect("Map-keying column family should have been checked at DB creation")),
508 _ => Err(TypedStoreError::RocksDBError(
509 "using invalid batch type for the database".to_string(),
510 )),
511 }
512}
513
514#[derive(Debug, Default)]
515pub struct MetricConf {
516 pub db_name: String,
517 pub read_sample_interval: SamplingInterval,
518 pub write_sample_interval: SamplingInterval,
519 pub iter_sample_interval: SamplingInterval,
520}
521
522impl MetricConf {
523 pub fn new(db_name: &str) -> Self {
524 if db_name.is_empty() {
525 error!("A meaningful db name should be used for metrics reporting.")
526 }
527 Self {
528 db_name: db_name.to_string(),
529 read_sample_interval: SamplingInterval::default(),
530 write_sample_interval: SamplingInterval::default(),
531 iter_sample_interval: SamplingInterval::default(),
532 }
533 }
534
535 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
536 Self {
537 db_name: self.db_name,
538 read_sample_interval: read_interval,
539 write_sample_interval: SamplingInterval::default(),
540 iter_sample_interval: SamplingInterval::default(),
541 }
542 }
543}
544const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
545const METRICS_ERROR: i64 = -1;
546
547#[derive(Clone, Debug)]
549pub struct DBMap<K, V> {
550 pub db: Arc<Database>,
551 _phantom: PhantomData<fn(K) -> V>,
552 column_family: ColumnFamily,
553 cf: String,
555 pub opts: ReadWriteOptions,
556 db_metrics: Arc<DBMetrics>,
557 get_sample_interval: SamplingInterval,
558 multiget_sample_interval: SamplingInterval,
559 write_sample_interval: SamplingInterval,
560 iter_sample_interval: SamplingInterval,
561 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
562}
563
564unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
565
566impl<K, V> DBMap<K, V> {
567 pub(crate) fn new(
568 db: Arc<Database>,
569 opts: &ReadWriteOptions,
570 opt_cf: &str,
571 column_family: ColumnFamily,
572 is_deprecated: bool,
573 ) -> Self {
574 let db_cloned = Arc::downgrade(&db.clone());
575 let db_metrics = DBMetrics::get();
576 let db_metrics_cloned = db_metrics.clone();
577 let cf = opt_cf.to_string();
578
579 let (sender, mut recv) = tokio::sync::oneshot::channel();
580 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
581 tokio::task::spawn(async move {
582 let mut interval =
583 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
584 loop {
585 tokio::select! {
586 _ = interval.tick() => {
587 if let Some(db) = db_cloned.upgrade() {
588 let cf = cf.clone();
589 let db_metrics = db_metrics.clone();
590 if let Err(e) = tokio::task::spawn_blocking(move || {
591 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
592 }).await {
593 error!("Failed to log metrics with error: {}", e);
594 }
595 }
596 }
597 _ = &mut recv => break,
598 }
599 }
600 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
601 });
602 }
603 DBMap {
604 db: db.clone(),
605 opts: opts.clone(),
606 _phantom: PhantomData,
607 column_family,
608 cf: opt_cf.to_string(),
609 db_metrics: db_metrics_cloned,
610 _metrics_task_cancel_handle: Arc::new(sender),
611 get_sample_interval: db.get_sampling_interval(),
612 multiget_sample_interval: db.multiget_sampling_interval(),
613 write_sample_interval: db.write_sampling_interval(),
614 iter_sample_interval: db.iter_sampling_interval(),
615 }
616 }
617
618 #[instrument(level = "debug", skip(db), err)]
621 pub fn reopen(
622 db: &Arc<Database>,
623 opt_cf: Option<&str>,
624 rw_options: &ReadWriteOptions,
625 is_deprecated: bool,
626 ) -> Result<Self, TypedStoreError> {
627 let cf_key = opt_cf
628 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
629 .to_owned();
630 Ok(DBMap::new(
631 db.clone(),
632 rw_options,
633 &cf_key,
634 ColumnFamily::Rocks(cf_key.to_string()),
635 is_deprecated,
636 ))
637 }
638
639 #[cfg(tidehunter)]
640 pub fn reopen_th(
641 db: Arc<Database>,
642 cf_name: &str,
643 ks: KeySpace,
644 prefix: Option<Vec<u8>>,
645 ) -> Self {
646 DBMap::new(
647 db,
648 &ReadWriteOptions::default(),
649 cf_name,
650 ColumnFamily::TideHunter((ks, prefix.clone())),
651 false,
652 )
653 }
654
655 pub fn cf_name(&self) -> &str {
656 &self.cf
657 }
658
659 pub fn batch(&self) -> DBBatch {
660 let batch = match &self.db.storage {
661 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
662 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
663 #[cfg(tidehunter)]
664 Storage::TideHunter(_) => {
665 StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
666 }
667 };
668 DBBatch::new(
669 &self.db,
670 batch,
671 &self.db_metrics,
672 &self.write_sample_interval,
673 )
674 }
675
676 pub fn flush(&self) -> Result<(), TypedStoreError> {
677 self.db.flush()
678 }
679
680 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
681 let from_buf = be_fix_int_ser(start);
682 let to_buf = be_fix_int_ser(end);
683 self.db
684 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
685 Ok(())
686 }
687
688 pub fn compact_range_raw(
689 &self,
690 cf_name: &str,
691 start: Vec<u8>,
692 end: Vec<u8>,
693 ) -> Result<(), TypedStoreError> {
694 self.db.compact_range_cf(cf_name, Some(start), Some(end));
695 Ok(())
696 }
697
698 #[cfg(tidehunter)]
699 pub fn drop_cells_in_range<J: Serialize>(
700 &self,
701 from_inclusive: &J,
702 to_inclusive: &J,
703 ) -> Result<(), TypedStoreError>
704 where
705 K: Serialize,
706 {
707 let from_buf = be_fix_int_ser(from_inclusive);
708 let to_buf = be_fix_int_ser(to_inclusive);
709 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
710 self.db
711 .drop_cells_in_range(*ks, &from_buf, &to_buf)
712 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
713 }
714 Ok(())
715 }
716
717 fn multi_get_pinned<J>(
719 &self,
720 keys: impl IntoIterator<Item = J>,
721 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
722 where
723 J: Borrow<K>,
724 K: Serialize,
725 {
726 let _timer = self
727 .db_metrics
728 .op_metrics
729 .rocksdb_multiget_latency_seconds
730 .with_label_values(&[&self.cf])
731 .start_timer();
732 let perf_ctx = if self.multiget_sample_interval.sample() {
733 Some(RocksDBPerfContext)
734 } else {
735 None
736 };
737 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
738 let results: Result<Vec<_>, TypedStoreError> = self
739 .db
740 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
741 .into_iter()
742 .collect();
743 let entries = results?;
744 let entry_size = entries
745 .iter()
746 .flatten()
747 .map(|entry| entry.len())
748 .sum::<usize>();
749 self.db_metrics
750 .op_metrics
751 .rocksdb_multiget_bytes
752 .with_label_values(&[&self.cf])
753 .observe(entry_size as f64);
754 if perf_ctx.is_some() {
755 self.db_metrics
756 .read_perf_ctx_metrics
757 .report_metrics(&self.cf);
758 }
759 Ok(entries)
760 }
761
762 fn get_rocksdb_int_property(
763 rocksdb: &RocksDB,
764 cf: &impl AsColumnFamilyRef,
765 property_name: &std::ffi::CStr,
766 ) -> Result<i64, TypedStoreError> {
767 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
768 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
769 Ok(None) => Ok(0),
770 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
771 }
772 }
773
774 fn report_rocksdb_metrics(
775 database: &Arc<Database>,
776 cf_name: &str,
777 db_metrics: &Arc<DBMetrics>,
778 ) {
779 let Storage::Rocks(rocksdb) = &database.storage else {
780 return;
781 };
782
783 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
784 tracing::warn!(
785 "unable to report metrics for cf {cf_name:?} in db {:?}",
786 database.db_name()
787 );
788 return;
789 };
790
791 db_metrics
792 .cf_metrics
793 .rocksdb_total_sst_files_size
794 .with_label_values(&[cf_name])
795 .set(
796 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
797 .unwrap_or(METRICS_ERROR),
798 );
799 db_metrics
800 .cf_metrics
801 .rocksdb_total_blob_files_size
802 .with_label_values(&[cf_name])
803 .set(
804 Self::get_rocksdb_int_property(
805 rocksdb,
806 &cf,
807 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
808 )
809 .unwrap_or(METRICS_ERROR),
810 );
811 let total_num_files: i64 = (0..=6)
814 .map(|level| {
815 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
816 .unwrap_or(METRICS_ERROR)
817 })
818 .sum();
819 db_metrics
820 .cf_metrics
821 .rocksdb_total_num_files
822 .with_label_values(&[cf_name])
823 .set(total_num_files);
824 db_metrics
825 .cf_metrics
826 .rocksdb_num_level0_files
827 .with_label_values(&[cf_name])
828 .set(
829 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
830 .unwrap_or(METRICS_ERROR),
831 );
832 db_metrics
833 .cf_metrics
834 .rocksdb_current_size_active_mem_tables
835 .with_label_values(&[cf_name])
836 .set(
837 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
838 .unwrap_or(METRICS_ERROR),
839 );
840 db_metrics
841 .cf_metrics
842 .rocksdb_size_all_mem_tables
843 .with_label_values(&[cf_name])
844 .set(
845 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
846 .unwrap_or(METRICS_ERROR),
847 );
848 db_metrics
849 .cf_metrics
850 .rocksdb_num_snapshots
851 .with_label_values(&[cf_name])
852 .set(
853 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
854 .unwrap_or(METRICS_ERROR),
855 );
856 db_metrics
857 .cf_metrics
858 .rocksdb_oldest_snapshot_time
859 .with_label_values(&[cf_name])
860 .set(
861 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
862 .unwrap_or(METRICS_ERROR),
863 );
864 db_metrics
865 .cf_metrics
866 .rocksdb_actual_delayed_write_rate
867 .with_label_values(&[cf_name])
868 .set(
869 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
870 .unwrap_or(METRICS_ERROR),
871 );
872 db_metrics
873 .cf_metrics
874 .rocksdb_is_write_stopped
875 .with_label_values(&[cf_name])
876 .set(
877 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
878 .unwrap_or(METRICS_ERROR),
879 );
880 db_metrics
881 .cf_metrics
882 .rocksdb_block_cache_capacity
883 .with_label_values(&[cf_name])
884 .set(
885 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
886 .unwrap_or(METRICS_ERROR),
887 );
888 db_metrics
889 .cf_metrics
890 .rocksdb_block_cache_usage
891 .with_label_values(&[cf_name])
892 .set(
893 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
894 .unwrap_or(METRICS_ERROR),
895 );
896 db_metrics
897 .cf_metrics
898 .rocksdb_block_cache_pinned_usage
899 .with_label_values(&[cf_name])
900 .set(
901 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
902 .unwrap_or(METRICS_ERROR),
903 );
904 db_metrics
905 .cf_metrics
906 .rocksdb_estimate_table_readers_mem
907 .with_label_values(&[cf_name])
908 .set(
909 Self::get_rocksdb_int_property(
910 rocksdb,
911 &cf,
912 properties::ESTIMATE_TABLE_READERS_MEM,
913 )
914 .unwrap_or(METRICS_ERROR),
915 );
916 db_metrics
917 .cf_metrics
918 .rocksdb_estimated_num_keys
919 .with_label_values(&[cf_name])
920 .set(
921 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
922 .unwrap_or(METRICS_ERROR),
923 );
924 db_metrics
925 .cf_metrics
926 .rocksdb_num_immutable_mem_tables
927 .with_label_values(&[cf_name])
928 .set(
929 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
930 .unwrap_or(METRICS_ERROR),
931 );
932 db_metrics
933 .cf_metrics
934 .rocksdb_mem_table_flush_pending
935 .with_label_values(&[cf_name])
936 .set(
937 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
938 .unwrap_or(METRICS_ERROR),
939 );
940 db_metrics
941 .cf_metrics
942 .rocksdb_compaction_pending
943 .with_label_values(&[cf_name])
944 .set(
945 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
946 .unwrap_or(METRICS_ERROR),
947 );
948 db_metrics
949 .cf_metrics
950 .rocksdb_estimate_pending_compaction_bytes
951 .with_label_values(&[cf_name])
952 .set(
953 Self::get_rocksdb_int_property(
954 rocksdb,
955 &cf,
956 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
957 )
958 .unwrap_or(METRICS_ERROR),
959 );
960 db_metrics
961 .cf_metrics
962 .rocksdb_num_running_compactions
963 .with_label_values(&[cf_name])
964 .set(
965 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
966 .unwrap_or(METRICS_ERROR),
967 );
968 db_metrics
969 .cf_metrics
970 .rocksdb_num_running_flushes
971 .with_label_values(&[cf_name])
972 .set(
973 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
974 .unwrap_or(METRICS_ERROR),
975 );
976 db_metrics
977 .cf_metrics
978 .rocksdb_estimate_oldest_key_time
979 .with_label_values(&[cf_name])
980 .set(
981 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
982 .unwrap_or(METRICS_ERROR),
983 );
984 db_metrics
985 .cf_metrics
986 .rocksdb_background_errors
987 .with_label_values(&[cf_name])
988 .set(
989 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
990 .unwrap_or(METRICS_ERROR),
991 );
992 db_metrics
993 .cf_metrics
994 .rocksdb_base_level
995 .with_label_values(&[cf_name])
996 .set(
997 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
998 .unwrap_or(METRICS_ERROR),
999 );
1000 }
1001
1002 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1003 self.db.checkpoint(path)
1004 }
1005
1006 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1007 where
1008 K: Serialize + DeserializeOwned,
1009 V: Serialize + DeserializeOwned,
1010 {
1011 let mut num_keys = 0;
1012 let mut key_bytes_total = 0;
1013 let mut value_bytes_total = 0;
1014 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1015 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1016 for item in self.safe_iter() {
1017 let (key, value) = item?;
1018 num_keys += 1;
1019 let key_len = be_fix_int_ser(key.borrow()).len();
1020 let value_len = bcs::to_bytes(value.borrow())?.len();
1021 key_bytes_total += key_len;
1022 value_bytes_total += value_len;
1023 key_hist.record(key_len as u64)?;
1024 value_hist.record(value_len as u64)?;
1025 }
1026 Ok(TableSummary {
1027 num_keys,
1028 key_bytes_total,
1029 value_bytes_total,
1030 key_hist,
1031 value_hist,
1032 })
1033 }
1034
1035 fn start_iter_timer(&self) -> HistogramTimer {
1036 self.db_metrics
1037 .op_metrics
1038 .rocksdb_iter_latency_seconds
1039 .with_label_values(&[&self.cf])
1040 .start_timer()
1041 }
1042
1043 fn create_iter_context(
1045 &self,
1046 ) -> (
1047 Option<HistogramTimer>,
1048 Option<Histogram>,
1049 Option<Histogram>,
1050 Option<RocksDBPerfContext>,
1051 ) {
1052 let timer = self.start_iter_timer();
1053 let bytes_scanned = self
1054 .db_metrics
1055 .op_metrics
1056 .rocksdb_iter_bytes
1057 .with_label_values(&[&self.cf]);
1058 let keys_scanned = self
1059 .db_metrics
1060 .op_metrics
1061 .rocksdb_iter_keys
1062 .with_label_values(&[&self.cf]);
1063 let perf_ctx = if self.iter_sample_interval.sample() {
1064 Some(RocksDBPerfContext)
1065 } else {
1066 None
1067 };
1068 (
1069 Some(timer),
1070 Some(bytes_scanned),
1071 Some(keys_scanned),
1072 perf_ctx,
1073 )
1074 }
1075
1076 #[allow(clippy::complexity)]
1079 pub fn reversed_safe_iter_with_bounds(
1080 &self,
1081 lower_bound: Option<K>,
1082 upper_bound: Option<K>,
1083 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1084 where
1085 K: Serialize + DeserializeOwned,
1086 V: Serialize + DeserializeOwned,
1087 {
1088 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1089 lower_bound
1090 .as_ref()
1091 .map(Bound::Included)
1092 .unwrap_or(Bound::Unbounded),
1093 upper_bound
1094 .as_ref()
1095 .map(Bound::Included)
1096 .unwrap_or(Bound::Unbounded),
1097 ));
1098 match &self.db.storage {
1099 Storage::Rocks(db) => {
1100 let readopts = rocks_util::apply_range_bounds(
1101 self.opts.readopts(),
1102 it_lower_bound,
1103 it_upper_bound,
1104 );
1105 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1106 let db_iter = db
1107 .underlying
1108 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1109 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1110 let iter = SafeIter::new(
1111 self.cf.clone(),
1112 db_iter,
1113 _timer,
1114 _perf_ctx,
1115 bytes_scanned,
1116 keys_scanned,
1117 Some(self.db_metrics.clone()),
1118 );
1119 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1120 }
1121 Storage::InMemory(db) => {
1122 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1123 }
1124 #[cfg(tidehunter)]
1125 Storage::TideHunter(db) => match &self.column_family {
1126 ColumnFamily::TideHunter((ks, prefix)) => {
1127 let mut iter = db.iterator(*ks);
1128 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1129 iter.reverse();
1130 Ok(Box::new(transform_th_iterator(
1131 iter,
1132 prefix,
1133 self.start_iter_timer(),
1134 )))
1135 }
1136 _ => unreachable!("storage backend invariant violation"),
1137 },
1138 }
1139 }
1140}
1141
1142pub enum StorageWriteBatch {
1143 Rocks(rocksdb::WriteBatch),
1144 InMemory(InMemoryBatch),
1145 #[cfg(tidehunter)]
1146 TideHunter(tidehunter::batch::WriteBatch),
1147}
1148
1149struct EntryHeader {
1153 offset: usize,
1155 cf_name_len: usize,
1156 key_len: usize,
1157 is_put: bool,
1158}
1159
1160#[derive(Default)]
1166pub struct StagedBatch {
1167 data: Vec<u8>,
1168 entries: Vec<EntryHeader>,
1169}
1170
1171impl StagedBatch {
1172 pub fn new() -> Self {
1173 Self {
1174 data: Vec::with_capacity(1024),
1175 entries: Vec::with_capacity(16),
1176 }
1177 }
1178
1179 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1180 &mut self,
1181 db: &DBMap<K, V>,
1182 new_vals: impl IntoIterator<Item = (J, U)>,
1183 ) -> Result<&mut Self, TypedStoreError> {
1184 let cf_name = db.cf_name();
1185 new_vals
1186 .into_iter()
1187 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1188 let offset = self.data.len();
1189 self.data.extend_from_slice(cf_name.as_bytes());
1190 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1191 bcs::serialize_into(&mut self.data, v.borrow())
1192 .map_err(typed_store_err_from_bcs_err)?;
1193 self.entries.push(EntryHeader {
1194 offset,
1195 cf_name_len: cf_name.len(),
1196 key_len,
1197 is_put: true,
1198 });
1199 Ok(())
1200 })?;
1201 Ok(self)
1202 }
1203
1204 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1205 &mut self,
1206 db: &DBMap<K, V>,
1207 purged_vals: impl IntoIterator<Item = J>,
1208 ) -> Result<(), TypedStoreError> {
1209 let cf_name = db.cf_name();
1210 purged_vals
1211 .into_iter()
1212 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1213 let offset = self.data.len();
1214 self.data.extend_from_slice(cf_name.as_bytes());
1215 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1216 self.entries.push(EntryHeader {
1217 offset,
1218 cf_name_len: cf_name.len(),
1219 key_len,
1220 is_put: false,
1221 });
1222 Ok(())
1223 })?;
1224 Ok(())
1225 }
1226
1227 pub fn size_in_bytes(&self) -> usize {
1228 self.data.len()
1229 }
1230}
1231
1232pub struct DBBatch {
1284 database: Arc<Database>,
1285 batch: StorageWriteBatch,
1286 db_metrics: Arc<DBMetrics>,
1287 write_sample_interval: SamplingInterval,
1288}
1289
1290impl DBBatch {
1291 pub fn new(
1295 dbref: &Arc<Database>,
1296 batch: StorageWriteBatch,
1297 db_metrics: &Arc<DBMetrics>,
1298 write_sample_interval: &SamplingInterval,
1299 ) -> Self {
1300 DBBatch {
1301 database: dbref.clone(),
1302 batch,
1303 db_metrics: db_metrics.clone(),
1304 write_sample_interval: write_sample_interval.clone(),
1305 }
1306 }
1307
1308 #[instrument(level = "trace", skip_all, err)]
1310 pub fn write(self) -> Result<(), TypedStoreError> {
1311 let mut write_options = rocksdb::WriteOptions::default();
1312
1313 if write_sync_enabled() {
1314 write_options.set_sync(true);
1315 }
1316
1317 self.write_opt(write_options)
1318 }
1319
1320 #[instrument(level = "trace", skip_all, err)]
1322 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1323 let db_name = self.database.db_name();
1324 let timer = self
1325 .db_metrics
1326 .op_metrics
1327 .rocksdb_batch_commit_latency_seconds
1328 .with_label_values(&[&db_name])
1329 .start_timer();
1330 let batch_size = self.size_in_bytes();
1331
1332 let perf_ctx = if self.write_sample_interval.sample() {
1333 Some(RocksDBPerfContext)
1334 } else {
1335 None
1336 };
1337
1338 self.database
1339 .write_opt_internal(self.batch, &write_options)?;
1340
1341 self.db_metrics
1342 .op_metrics
1343 .rocksdb_batch_commit_bytes
1344 .with_label_values(&[&db_name])
1345 .observe(batch_size as f64);
1346
1347 if perf_ctx.is_some() {
1348 self.db_metrics
1349 .write_perf_ctx_metrics
1350 .report_metrics(&db_name);
1351 }
1352 let elapsed = timer.stop_and_record();
1353 if elapsed > 1.0 {
1354 warn!(?elapsed, ?db_name, "very slow batch write");
1355 self.db_metrics
1356 .op_metrics
1357 .rocksdb_very_slow_batch_writes_count
1358 .with_label_values(&[&db_name])
1359 .inc();
1360 self.db_metrics
1361 .op_metrics
1362 .rocksdb_very_slow_batch_writes_duration_ms
1363 .with_label_values(&[&db_name])
1364 .inc_by((elapsed * 1000.0) as u64);
1365 }
1366 Ok(())
1367 }
1368
1369 pub fn size_in_bytes(&self) -> usize {
1370 match self.batch {
1371 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1372 StorageWriteBatch::InMemory(_) => 0,
1373 #[cfg(tidehunter)]
1375 StorageWriteBatch::TideHunter(_) => 0,
1376 }
1377 }
1378
1379 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1381 for raw_batch in raw_batches {
1382 let data = &raw_batch.data;
1383 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1384 let end = raw_batch
1385 .entries
1386 .get(i + 1)
1387 .map_or(data.len(), |next| next.offset);
1388 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1389 let key_start = hdr.offset + hdr.cf_name_len;
1390 let key = &data[key_start..key_start + hdr.key_len];
1391 let cf_name = std::str::from_utf8(cf_bytes)
1393 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1394
1395 if hdr.is_put {
1396 let value = &data[key_start + hdr.key_len..end];
1397 match &mut self.batch {
1398 StorageWriteBatch::Rocks(b) => {
1399 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1400 }
1401 StorageWriteBatch::InMemory(b) => {
1402 b.put_cf(cf_name, key, value);
1403 }
1404 #[cfg(tidehunter)]
1405 _ => {
1406 return Err(TypedStoreError::RocksDBError(
1407 "concat not supported for TideHunter".to_string(),
1408 ));
1409 }
1410 }
1411 } else {
1412 match &mut self.batch {
1413 StorageWriteBatch::Rocks(b) => {
1414 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1415 }
1416 StorageWriteBatch::InMemory(b) => {
1417 b.delete_cf(cf_name, key);
1418 }
1419 #[cfg(tidehunter)]
1420 _ => {
1421 return Err(TypedStoreError::RocksDBError(
1422 "concat not supported for TideHunter".to_string(),
1423 ));
1424 }
1425 }
1426 }
1427 }
1428 }
1429 Ok(self)
1430 }
1431
1432 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1433 &mut self,
1434 db: &DBMap<K, V>,
1435 purged_vals: impl IntoIterator<Item = J>,
1436 ) -> Result<(), TypedStoreError> {
1437 if !Arc::ptr_eq(&db.db, &self.database) {
1438 return Err(TypedStoreError::CrossDBBatch);
1439 }
1440
1441 purged_vals
1442 .into_iter()
1443 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1444 let k_buf = be_fix_int_ser(k.borrow());
1445 match (&mut self.batch, &db.column_family) {
1446 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1447 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1448 }
1449 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1450 b.delete_cf(name, k_buf)
1451 }
1452 #[cfg(tidehunter)]
1453 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1454 b.delete(*ks, transform_th_key(&k_buf, prefix))
1455 }
1456 _ => Err(TypedStoreError::RocksDBError(
1457 "typed store invariant violation".to_string(),
1458 ))?,
1459 }
1460 Ok(())
1461 })?;
1462 Ok(())
1463 }
1464
1465 pub fn schedule_delete_range<K: Serialize, V>(
1475 &mut self,
1476 db: &DBMap<K, V>,
1477 from: &K,
1478 to: &K,
1479 ) -> Result<(), TypedStoreError> {
1480 if !Arc::ptr_eq(&db.db, &self.database) {
1481 return Err(TypedStoreError::CrossDBBatch);
1482 }
1483
1484 let from_buf = be_fix_int_ser(from);
1485 let to_buf = be_fix_int_ser(to);
1486
1487 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1488 b.delete_range_cf(
1489 &rocks_cf_from_db(&self.database, db.cf_name())?,
1490 from_buf,
1491 to_buf,
1492 );
1493 }
1494 Ok(())
1495 }
1496
1497 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1499 &mut self,
1500 db: &DBMap<K, V>,
1501 new_vals: impl IntoIterator<Item = (J, U)>,
1502 ) -> Result<&mut Self, TypedStoreError> {
1503 if !Arc::ptr_eq(&db.db, &self.database) {
1504 return Err(TypedStoreError::CrossDBBatch);
1505 }
1506 let mut total = 0usize;
1507 new_vals
1508 .into_iter()
1509 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1510 let k_buf = be_fix_int_ser(k.borrow());
1511 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1512 total += k_buf.len() + v_buf.len();
1513 if db.opts.log_value_hash {
1514 let key_hash = default_hash(&k_buf);
1515 let value_hash = default_hash(&v_buf);
1516 debug!(
1517 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1518 db.cf_name(),
1519 key_hash,
1520 value_hash
1521 );
1522 }
1523 match (&mut self.batch, &db.column_family) {
1524 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1525 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1526 }
1527 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1528 b.put_cf(name, k_buf, v_buf)
1529 }
1530 #[cfg(tidehunter)]
1531 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1532 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1533 }
1534 _ => Err(TypedStoreError::RocksDBError(
1535 "typed store invariant violation".to_string(),
1536 ))?,
1537 }
1538 Ok(())
1539 })?;
1540 self.db_metrics
1541 .op_metrics
1542 .rocksdb_batch_put_bytes
1543 .with_label_values(&[&db.cf])
1544 .observe(total as f64);
1545 Ok(self)
1546 }
1547
1548 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1549 &mut self,
1550 db: &DBMap<K, V>,
1551 new_vals: impl IntoIterator<Item = (J, U)>,
1552 ) -> Result<&mut Self, TypedStoreError> {
1553 if !Arc::ptr_eq(&db.db, &self.database) {
1554 return Err(TypedStoreError::CrossDBBatch);
1555 }
1556 new_vals
1557 .into_iter()
1558 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1559 let k_buf = be_fix_int_ser(k.borrow());
1560 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1561 match &mut self.batch {
1562 StorageWriteBatch::Rocks(b) => b.merge_cf(
1563 &rocks_cf_from_db(&self.database, db.cf_name())?,
1564 k_buf,
1565 v_buf,
1566 ),
1567 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1568 }
1569 Ok(())
1570 })?;
1571 Ok(self)
1572 }
1573}
1574
1575impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1576where
1577 K: Serialize + DeserializeOwned,
1578 V: Serialize + DeserializeOwned,
1579{
1580 type Error = TypedStoreError;
1581
1582 #[instrument(level = "trace", skip_all, err)]
1583 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1584 let key_buf = be_fix_int_ser(key);
1585 let readopts = self.opts.readopts();
1586 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1587 && self
1588 .db
1589 .get(&self.column_family, &key_buf, &readopts)?
1590 .is_some())
1591 }
1592
1593 #[instrument(level = "trace", skip_all, err)]
1594 fn multi_contains_keys<J>(
1595 &self,
1596 keys: impl IntoIterator<Item = J>,
1597 ) -> Result<Vec<bool>, Self::Error>
1598 where
1599 J: Borrow<K>,
1600 {
1601 let values = self.multi_get_pinned(keys)?;
1602 Ok(values.into_iter().map(|v| v.is_some()).collect())
1603 }
1604
1605 #[instrument(level = "trace", skip_all, err)]
1606 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1607 let _timer = self
1608 .db_metrics
1609 .op_metrics
1610 .rocksdb_get_latency_seconds
1611 .with_label_values(&[&self.cf])
1612 .start_timer();
1613 let perf_ctx = if self.get_sample_interval.sample() {
1614 Some(RocksDBPerfContext)
1615 } else {
1616 None
1617 };
1618 let key_buf = be_fix_int_ser(key);
1619 let res = self
1620 .db
1621 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1622 self.db_metrics
1623 .op_metrics
1624 .rocksdb_get_bytes
1625 .with_label_values(&[&self.cf])
1626 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1627 if perf_ctx.is_some() {
1628 self.db_metrics
1629 .read_perf_ctx_metrics
1630 .report_metrics(&self.cf);
1631 }
1632 match res {
1633 Some(data) => {
1634 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1635 if value.is_err() {
1636 let key_hash = default_hash(&key_buf);
1637 let value_hash = default_hash(&data);
1638 debug_fatal!(
1639 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1640 self.cf_name(),
1641 key_hash,
1642 value_hash,
1643 value.as_ref().err().unwrap()
1644 );
1645 }
1646 Ok(Some(value?))
1647 }
1648 None => Ok(None),
1649 }
1650 }
1651
1652 #[instrument(level = "trace", skip_all, err)]
1653 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1654 let timer = self
1655 .db_metrics
1656 .op_metrics
1657 .rocksdb_put_latency_seconds
1658 .with_label_values(&[&self.cf])
1659 .start_timer();
1660 let perf_ctx = if self.write_sample_interval.sample() {
1661 Some(RocksDBPerfContext)
1662 } else {
1663 None
1664 };
1665 let key_buf = be_fix_int_ser(key);
1666 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1667 self.db_metrics
1668 .op_metrics
1669 .rocksdb_put_bytes
1670 .with_label_values(&[&self.cf])
1671 .observe((key_buf.len() + value_buf.len()) as f64);
1672 if perf_ctx.is_some() {
1673 self.db_metrics
1674 .write_perf_ctx_metrics
1675 .report_metrics(&self.cf);
1676 }
1677 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1678
1679 let elapsed = timer.stop_and_record();
1680 if elapsed > 1.0 {
1681 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1682 self.db_metrics
1683 .op_metrics
1684 .rocksdb_very_slow_puts_count
1685 .with_label_values(&[&self.cf])
1686 .inc();
1687 self.db_metrics
1688 .op_metrics
1689 .rocksdb_very_slow_puts_duration_ms
1690 .with_label_values(&[&self.cf])
1691 .inc_by((elapsed * 1000.0) as u64);
1692 }
1693
1694 Ok(())
1695 }
1696
1697 #[instrument(level = "trace", skip_all, err)]
1698 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1699 let _timer = self
1700 .db_metrics
1701 .op_metrics
1702 .rocksdb_delete_latency_seconds
1703 .with_label_values(&[&self.cf])
1704 .start_timer();
1705 let perf_ctx = if self.write_sample_interval.sample() {
1706 Some(RocksDBPerfContext)
1707 } else {
1708 None
1709 };
1710 let key_buf = be_fix_int_ser(key);
1711 self.db.delete_cf(&self.column_family, key_buf)?;
1712 self.db_metrics
1713 .op_metrics
1714 .rocksdb_deletes
1715 .with_label_values(&[&self.cf])
1716 .inc();
1717 if perf_ctx.is_some() {
1718 self.db_metrics
1719 .write_perf_ctx_metrics
1720 .report_metrics(&self.cf);
1721 }
1722 Ok(())
1723 }
1724
1725 #[instrument(level = "trace", skip_all, err)]
1734 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1735 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1736 let last_key = self
1737 .reversed_safe_iter_with_bounds(None, None)?
1738 .next()
1739 .transpose()?
1740 .map(|(k, _v)| k);
1741 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1742 let mut batch = self.batch();
1743 batch.schedule_delete_range(self, &first_key, &last_key)?;
1744 batch.write()?;
1745 }
1746 Ok(())
1747 }
1748
1749 fn is_empty(&self) -> bool {
1750 self.safe_iter().next().is_none()
1751 }
1752
1753 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1754 match &self.db.storage {
1755 Storage::Rocks(db) => {
1756 let db_iter = db
1757 .underlying
1758 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1759 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1760 Box::new(SafeIter::new(
1761 self.cf.clone(),
1762 db_iter,
1763 _timer,
1764 _perf_ctx,
1765 bytes_scanned,
1766 keys_scanned,
1767 Some(self.db_metrics.clone()),
1768 ))
1769 }
1770 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1771 #[cfg(tidehunter)]
1772 Storage::TideHunter(db) => match &self.column_family {
1773 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1774 db.iterator(*ks),
1775 prefix,
1776 self.start_iter_timer(),
1777 )),
1778 _ => unreachable!("storage backend invariant violation"),
1779 },
1780 }
1781 }
1782
1783 fn safe_iter_with_bounds(
1784 &'a self,
1785 lower_bound: Option<K>,
1786 upper_bound: Option<K>,
1787 ) -> DbIterator<'a, (K, V)> {
1788 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1789 match &self.db.storage {
1790 Storage::Rocks(db) => {
1791 let readopts =
1792 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1793 let db_iter = db
1794 .underlying
1795 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1796 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1797 Box::new(SafeIter::new(
1798 self.cf.clone(),
1799 db_iter,
1800 _timer,
1801 _perf_ctx,
1802 bytes_scanned,
1803 keys_scanned,
1804 Some(self.db_metrics.clone()),
1805 ))
1806 }
1807 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1808 #[cfg(tidehunter)]
1809 Storage::TideHunter(db) => match &self.column_family {
1810 ColumnFamily::TideHunter((ks, prefix)) => {
1811 let mut iter = db.iterator(*ks);
1812 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1813 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1814 }
1815 _ => unreachable!("storage backend invariant violation"),
1816 },
1817 }
1818 }
1819
1820 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1821 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1822 match &self.db.storage {
1823 Storage::Rocks(db) => {
1824 let readopts =
1825 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1826 let db_iter = db
1827 .underlying
1828 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1829 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1830 Box::new(SafeIter::new(
1831 self.cf.clone(),
1832 db_iter,
1833 _timer,
1834 _perf_ctx,
1835 bytes_scanned,
1836 keys_scanned,
1837 Some(self.db_metrics.clone()),
1838 ))
1839 }
1840 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1841 #[cfg(tidehunter)]
1842 Storage::TideHunter(db) => match &self.column_family {
1843 ColumnFamily::TideHunter((ks, prefix)) => {
1844 let mut iter = db.iterator(*ks);
1845 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1846 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1847 }
1848 _ => unreachable!("storage backend invariant violation"),
1849 },
1850 }
1851 }
1852
1853 #[instrument(level = "trace", skip_all, err)]
1855 fn multi_get<J>(
1856 &self,
1857 keys: impl IntoIterator<Item = J>,
1858 ) -> Result<Vec<Option<V>>, TypedStoreError>
1859 where
1860 J: Borrow<K>,
1861 {
1862 let results = self.multi_get_pinned(keys)?;
1863 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1864 .into_iter()
1865 .map(|value_byte| match value_byte {
1866 Some(data) => Ok(Some(
1867 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1868 )),
1869 None => Ok(None),
1870 })
1871 .collect();
1872
1873 values_parsed
1874 }
1875
1876 #[instrument(level = "trace", skip_all, err)]
1878 fn multi_insert<J, U>(
1879 &self,
1880 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1881 ) -> Result<(), Self::Error>
1882 where
1883 J: Borrow<K>,
1884 U: Borrow<V>,
1885 {
1886 let mut batch = self.batch();
1887 batch.insert_batch(self, key_val_pairs)?;
1888 batch.write()
1889 }
1890
1891 #[instrument(level = "trace", skip_all, err)]
1893 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1894 where
1895 J: Borrow<K>,
1896 {
1897 let mut batch = self.batch();
1898 batch.delete_batch(self, keys)?;
1899 batch.write()
1900 }
1901
1902 #[instrument(level = "trace", skip_all, err)]
1904 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1905 if let Storage::Rocks(rocks) = &self.db.storage {
1906 rocks
1907 .underlying
1908 .try_catch_up_with_primary()
1909 .map_err(typed_store_err_from_rocks_err)?;
1910 }
1911 Ok(())
1912 }
1913}
1914
1915#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1917pub fn open_cf_opts<P: AsRef<Path>>(
1918 path: P,
1919 db_options: Option<rocksdb::Options>,
1920 metric_conf: MetricConf,
1921 opt_cfs: &[(&str, rocksdb::Options)],
1922) -> Result<Arc<Database>, TypedStoreError> {
1923 let path = path.as_ref();
1924 ensure_database_type(path, StorageType::Rocks)
1925 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1926 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1935 nondeterministic!({
1936 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1937 options.create_if_missing(true);
1938 options.create_missing_column_families(true);
1939 let rocksdb = {
1940 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1941 &options,
1942 path,
1943 cfs.into_iter()
1944 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1945 )
1946 .map_err(typed_store_err_from_rocks_err)?
1947 };
1948 Ok(Arc::new(Database::new(
1949 Storage::Rocks(RocksDB {
1950 underlying: rocksdb,
1951 }),
1952 metric_conf,
1953 None,
1954 )))
1955 })
1956}
1957
1958pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1960 primary_path: P,
1961 secondary_path: Option<P>,
1962 db_options: Option<rocksdb::Options>,
1963 metric_conf: MetricConf,
1964 opt_cfs: &[(&str, rocksdb::Options)],
1965) -> Result<Arc<Database>, TypedStoreError> {
1966 let primary_path = primary_path.as_ref();
1967 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1968 nondeterministic!({
1970 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1972
1973 fdlimit::raise_fd_limit();
1974 options.set_max_open_files(-1);
1976
1977 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1978 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1979 .ok()
1980 .unwrap_or_default();
1981
1982 let default_db_options = default_db_options();
1983 for cf_key in cfs.iter() {
1985 if !opt_cfs.contains_key(&cf_key[..]) {
1986 opt_cfs.insert(cf_key, default_db_options.options.clone());
1987 }
1988 }
1989
1990 let primary_path = primary_path.to_path_buf();
1991 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1992 let mut s = primary_path.clone();
1993 s.pop();
1994 s.push("SECONDARY");
1995 s.as_path().to_path_buf()
1996 });
1997
1998 ensure_database_type(&primary_path, StorageType::Rocks)
1999 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2000 ensure_database_type(&secondary_path, StorageType::Rocks)
2001 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
2002
2003 let rocksdb = {
2004 options.create_if_missing(true);
2005 options.create_missing_column_families(true);
2006 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2007 &options,
2008 &primary_path,
2009 &secondary_path,
2010 opt_cfs
2011 .iter()
2012 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2013 )
2014 .map_err(typed_store_err_from_rocks_err)?;
2015 db.try_catch_up_with_primary()
2016 .map_err(typed_store_err_from_rocks_err)?;
2017 db
2018 };
2019 Ok(Arc::new(Database::new(
2020 Storage::Rocks(RocksDB {
2021 underlying: rocksdb,
2022 }),
2023 metric_conf,
2024 None,
2025 )))
2026 })
2027}
2028
2029#[cfg(not(tidehunter))]
2031pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2032 let mut backoff = backoff::ExponentialBackoff {
2033 max_elapsed_time: Some(timeout),
2034 ..Default::default()
2035 };
2036 loop {
2037 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2038 Ok(()) => return Ok(()),
2039 Err(err) => match backoff.next_backoff() {
2040 Some(duration) => tokio::time::sleep(duration).await,
2041 None => return Err(err),
2042 },
2043 }
2044 }
2045}
2046
2047#[cfg(tidehunter)]
2048pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
2049 std::fs::remove_dir_all(path)
2050}
2051
2052fn populate_missing_cfs(
2053 input_cfs: &[(&str, rocksdb::Options)],
2054 path: &Path,
2055) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2056 let mut cfs = vec![];
2057 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2058 let existing_cfs =
2059 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2060 .ok()
2061 .unwrap_or_default();
2062
2063 for cf_name in existing_cfs {
2064 if !input_cf_index.contains(&cf_name[..]) {
2065 cfs.push((cf_name, rocksdb::Options::default()));
2066 }
2067 }
2068 cfs.extend(
2069 input_cfs
2070 .iter()
2071 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2072 );
2073 Ok(cfs)
2074}
2075
2076fn default_hash(value: &[u8]) -> Digest<32> {
2077 let mut hasher = fastcrypto::hash::Blake2b256::default();
2078 hasher.update(value);
2079 hasher.finalize()
2080}