1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{be_fix_int_ser, iterator_bounds, iterator_bounds_with_range};
20use crate::{DbIterator, TypedStoreError};
21use crate::{
22 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
23 traits::{Map, TableSummary},
24};
25use backoff::backoff::Backoff;
26use fastcrypto::hash::{Digest, HashFunction};
27use mysten_common::debug_fatal;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::properties::num_files_at_level;
30use rocksdb::{
31 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
32 properties,
33};
34use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
35use serde::{Serialize, de::DeserializeOwned};
36use std::ops::{Bound, Deref};
37use std::{
38 borrow::Borrow,
39 marker::PhantomData,
40 ops::RangeBounds,
41 path::{Path, PathBuf},
42 sync::Arc,
43 time::Duration,
44};
45use std::{collections::HashSet, ffi::CStr};
46use sui_macros::{fail_point, nondeterministic};
47#[cfg(tidehunter)]
48use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
49use tokio::sync::oneshot;
50use tracing::{debug, error, instrument, warn};
51
52const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
55 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
56
57#[cfg(test)]
58mod tests;
59
60#[derive(Debug)]
61pub struct RocksDB {
62 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
63}
64
65impl Drop for RocksDB {
66 fn drop(&mut self) {
67 self.underlying.cancel_all_background_work(true);
68 }
69}
70
71#[derive(Clone)]
72pub enum ColumnFamily {
73 Rocks(String),
74 InMemory(String),
75 #[cfg(tidehunter)]
76 TideHunter((KeySpace, Option<Vec<u8>>)),
77}
78
79impl std::fmt::Debug for ColumnFamily {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
83 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
84 #[cfg(tidehunter)]
85 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
86 }
87 }
88}
89
90impl ColumnFamily {
91 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
92 match &self {
93 ColumnFamily::Rocks(name) => rocks_db
94 .underlying
95 .cf_handle(name)
96 .expect("Map-keying column family should have been checked at DB creation"),
97 _ => unreachable!("invariant is checked by the caller"),
98 }
99 }
100}
101
102pub enum Storage {
103 Rocks(RocksDB),
104 InMemory(InMemoryDB),
105 #[cfg(tidehunter)]
106 TideHunter(Arc<TideHunterDb>),
107}
108
109impl std::fmt::Debug for Storage {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 match self {
112 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
113 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
114 #[cfg(tidehunter)]
115 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
116 }
117 }
118}
119
120#[derive(Debug)]
121pub struct Database {
122 storage: Storage,
123 metric_conf: MetricConf,
124}
125
126impl Drop for Database {
127 fn drop(&mut self) {
128 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
129 }
130}
131
132enum GetResult<'a> {
133 Rocks(DBPinnableSlice<'a>),
134 InMemory(Vec<u8>),
135 #[cfg(tidehunter)]
136 TideHunter(tidehunter::minibytes::Bytes),
137}
138
139impl Deref for GetResult<'_> {
140 type Target = [u8];
141 fn deref(&self) -> &[u8] {
142 match self {
143 GetResult::Rocks(d) => d.deref(),
144 GetResult::InMemory(d) => d.deref(),
145 #[cfg(tidehunter)]
146 GetResult::TideHunter(d) => d.deref(),
147 }
148 }
149}
150
151impl Database {
152 pub fn new(storage: Storage, metric_conf: MetricConf) -> Self {
153 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154 Self {
155 storage,
156 metric_conf,
157 }
158 }
159
160 pub fn flush(&self) -> Result<(), TypedStoreError> {
162 match &self.storage {
163 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
164 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
165 }),
166 Storage::InMemory(_) => {
167 Ok(())
169 }
170 #[cfg(tidehunter)]
171 Storage::TideHunter(_) => {
172 Ok(())
174 }
175 }
176 }
177
178 fn get<K: AsRef<[u8]>>(
179 &self,
180 cf: &ColumnFamily,
181 key: K,
182 readopts: &ReadOptions,
183 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
184 match (&self.storage, cf) {
185 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
186 .underlying
187 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
188 .map_err(typed_store_err_from_rocks_err)?
189 .map(GetResult::Rocks)),
190 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
191 Ok(db.get(cf_name, key).map(GetResult::InMemory))
192 }
193 #[cfg(tidehunter)]
194 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
195 .get(*ks, &transform_th_key(key.as_ref(), prefix))
196 .map_err(typed_store_error_from_th_error)?
197 .map(GetResult::TideHunter)),
198
199 _ => Err(TypedStoreError::RocksDBError(
200 "typed store invariant violation".to_string(),
201 )),
202 }
203 }
204
205 fn multi_get<I, K>(
206 &self,
207 cf: &ColumnFamily,
208 keys: I,
209 readopts: &ReadOptions,
210 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
211 where
212 I: IntoIterator<Item = K>,
213 K: AsRef<[u8]>,
214 {
215 match (&self.storage, cf) {
216 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
217 let keys_vec: Vec<K> = keys.into_iter().collect();
218 let res = db.underlying.batched_multi_get_cf_opt(
219 &cf.rocks_cf(db),
220 keys_vec.iter(),
221 false,
222 readopts,
223 );
224 res.into_iter()
225 .map(|r| {
226 r.map_err(typed_store_err_from_rocks_err)
227 .map(|item| item.map(GetResult::Rocks))
228 })
229 .collect()
230 }
231 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
232 .multi_get(cf_name, keys)
233 .into_iter()
234 .map(|r| Ok(r.map(GetResult::InMemory)))
235 .collect(),
236 #[cfg(tidehunter)]
237 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
238 let res = keys.into_iter().map(|k| {
239 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
240 .map_err(typed_store_error_from_th_error)
241 });
242 res.into_iter()
243 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
244 .collect()
245 }
246 _ => unreachable!("typed store invariant violation"),
247 }
248 }
249
250 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
251 match &self.storage {
252 Storage::Rocks(db) => db.underlying.drop_cf(name),
253 Storage::InMemory(db) => {
254 db.drop_cf(name);
255 Ok(())
256 }
257 #[cfg(tidehunter)]
258 Storage::TideHunter(_) => {
259 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
260 }
261 }
262 }
263
264 pub fn delete_file_in_range<K: AsRef<[u8]>>(
265 &self,
266 cf: &impl AsColumnFamilyRef,
267 from: K,
268 to: K,
269 ) -> Result<(), rocksdb::Error> {
270 match &self.storage {
271 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
272 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
273 }
274 }
275
276 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
277 fail_point!("delete-cf-before");
278 let ret = match (&self.storage, cf) {
279 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
280 .underlying
281 .delete_cf(&cf.rocks_cf(db), key)
282 .map_err(typed_store_err_from_rocks_err),
283 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
284 db.delete(cf_name, key.as_ref());
285 Ok(())
286 }
287 #[cfg(tidehunter)]
288 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
289 .remove(*ks, transform_th_key(key.as_ref(), prefix))
290 .map_err(typed_store_error_from_th_error),
291 _ => Err(TypedStoreError::RocksDBError(
292 "typed store invariant violation".to_string(),
293 )),
294 };
295 fail_point!("delete-cf-after");
296 #[allow(clippy::let_and_return)]
297 ret
298 }
299
300 pub fn path_for_pruning(&self) -> &Path {
301 match &self.storage {
302 Storage::Rocks(rocks) => rocks.underlying.path(),
303 _ => unimplemented!("method is only supported for rocksdb backend"),
304 }
305 }
306
307 fn put_cf(
308 &self,
309 cf: &ColumnFamily,
310 key: Vec<u8>,
311 value: Vec<u8>,
312 ) -> Result<(), TypedStoreError> {
313 fail_point!("put-cf-before");
314 let ret = match (&self.storage, cf) {
315 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
316 .underlying
317 .put_cf(&cf.rocks_cf(db), key, value)
318 .map_err(typed_store_err_from_rocks_err),
319 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
320 db.put(cf_name, key, value);
321 Ok(())
322 }
323 #[cfg(tidehunter)]
324 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
325 .insert(*ks, transform_th_key(&key, prefix), value)
326 .map_err(typed_store_error_from_th_error),
327 _ => Err(TypedStoreError::RocksDBError(
328 "typed store invariant violation".to_string(),
329 )),
330 };
331 fail_point!("put-cf-after");
332 #[allow(clippy::let_and_return)]
333 ret
334 }
335
336 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
337 &self,
338 cf_name: &str,
339 key: K,
340 readopts: &ReadOptions,
341 ) -> bool {
342 match &self.storage {
343 Storage::Rocks(rocks) => {
346 rocks
347 .underlying
348 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
349 }
350 _ => true,
351 }
352 }
353
354 pub(crate) fn write_opt_internal(
355 &self,
356 batch: StorageWriteBatch,
357 write_options: &rocksdb::WriteOptions,
358 ) -> Result<(), TypedStoreError> {
359 fail_point!("batch-write-before");
360 let ret = match (&self.storage, batch) {
361 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
362 .underlying
363 .write_opt(batch, write_options)
364 .map_err(typed_store_err_from_rocks_err),
365 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
366 db.write(batch);
368 Ok(())
369 }
370 #[cfg(tidehunter)]
371 (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
372 db.write_batch(batch)
374 .map_err(typed_store_error_from_th_error)
375 }
376 _ => Err(TypedStoreError::RocksDBError(
377 "using invalid batch type for the database".to_string(),
378 )),
379 };
380 fail_point!("batch-write-after");
381 #[allow(clippy::let_and_return)]
382 ret
383 }
384
385 #[cfg(tidehunter)]
386 pub fn start_relocation(&self) -> anyhow::Result<()> {
387 if let Storage::TideHunter(db) = &self.storage {
388 db.start_relocation()?;
389 }
390 Ok(())
391 }
392
393 #[cfg(tidehunter)]
394 pub fn drop_cells_in_range(
395 &self,
396 ks: KeySpace,
397 from_inclusive: &[u8],
398 to_inclusive: &[u8],
399 ) -> anyhow::Result<()> {
400 if let Storage::TideHunter(db) = &self.storage {
401 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
402 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
403 } else {
404 panic!("drop_cells_in_range called on non-TideHunter storage");
405 }
406 Ok(())
407 }
408
409 pub fn compact_range_cf<K: AsRef<[u8]>>(
410 &self,
411 cf_name: &str,
412 start: Option<K>,
413 end: Option<K>,
414 ) {
415 if let Storage::Rocks(rocksdb) = &self.storage {
416 rocksdb
417 .underlying
418 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
419 }
420 }
421
422 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
423 if let Storage::Rocks(rocks) = &self.storage {
425 let checkpoint =
426 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
427 checkpoint
428 .create_checkpoint(path)
429 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
430 }
431 Ok(())
432 }
433
434 pub fn get_sampling_interval(&self) -> SamplingInterval {
435 self.metric_conf.read_sample_interval.new_from_self()
436 }
437
438 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
439 self.metric_conf.read_sample_interval.new_from_self()
440 }
441
442 pub fn write_sampling_interval(&self) -> SamplingInterval {
443 self.metric_conf.write_sample_interval.new_from_self()
444 }
445
446 pub fn iter_sampling_interval(&self) -> SamplingInterval {
447 self.metric_conf.iter_sample_interval.new_from_self()
448 }
449
450 fn db_name(&self) -> String {
451 let name = &self.metric_conf.db_name;
452 if name.is_empty() {
453 "default".to_string()
454 } else {
455 name.clone()
456 }
457 }
458
459 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
460 match &self.storage {
461 Storage::Rocks(rocks) => rocks.underlying.live_files(),
462 _ => Ok(vec![]),
463 }
464 }
465}
466
467fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
468 rocks_db
469 .underlying
470 .cf_handle(cf_name)
471 .expect("Map-keying column family should have been checked at DB creation")
472}
473
474fn rocks_cf_from_db<'a>(
475 db: &'a Database,
476 cf_name: &str,
477) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
478 match &db.storage {
479 Storage::Rocks(rocksdb) => Ok(rocksdb
480 .underlying
481 .cf_handle(cf_name)
482 .expect("Map-keying column family should have been checked at DB creation")),
483 _ => Err(TypedStoreError::RocksDBError(
484 "using invalid batch type for the database".to_string(),
485 )),
486 }
487}
488
489#[derive(Debug, Default)]
490pub struct MetricConf {
491 pub db_name: String,
492 pub read_sample_interval: SamplingInterval,
493 pub write_sample_interval: SamplingInterval,
494 pub iter_sample_interval: SamplingInterval,
495}
496
497impl MetricConf {
498 pub fn new(db_name: &str) -> Self {
499 if db_name.is_empty() {
500 error!("A meaningful db name should be used for metrics reporting.")
501 }
502 Self {
503 db_name: db_name.to_string(),
504 read_sample_interval: SamplingInterval::default(),
505 write_sample_interval: SamplingInterval::default(),
506 iter_sample_interval: SamplingInterval::default(),
507 }
508 }
509
510 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
511 Self {
512 db_name: self.db_name,
513 read_sample_interval: read_interval,
514 write_sample_interval: SamplingInterval::default(),
515 iter_sample_interval: SamplingInterval::default(),
516 }
517 }
518}
519const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
520const METRICS_ERROR: i64 = -1;
521
522#[derive(Clone, Debug)]
524pub struct DBMap<K, V> {
525 pub db: Arc<Database>,
526 _phantom: PhantomData<fn(K) -> V>,
527 column_family: ColumnFamily,
528 cf: String,
530 pub opts: ReadWriteOptions,
531 db_metrics: Arc<DBMetrics>,
532 get_sample_interval: SamplingInterval,
533 multiget_sample_interval: SamplingInterval,
534 write_sample_interval: SamplingInterval,
535 iter_sample_interval: SamplingInterval,
536 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
537}
538
539unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
540
541impl<K, V> DBMap<K, V> {
542 pub(crate) fn new(
543 db: Arc<Database>,
544 opts: &ReadWriteOptions,
545 opt_cf: &str,
546 column_family: ColumnFamily,
547 is_deprecated: bool,
548 ) -> Self {
549 let db_cloned = Arc::downgrade(&db.clone());
550 let db_metrics = DBMetrics::get();
551 let db_metrics_cloned = db_metrics.clone();
552 let cf = opt_cf.to_string();
553
554 let (sender, mut recv) = tokio::sync::oneshot::channel();
555 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
556 tokio::task::spawn(async move {
557 let mut interval =
558 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
559 loop {
560 tokio::select! {
561 _ = interval.tick() => {
562 if let Some(db) = db_cloned.upgrade() {
563 let cf = cf.clone();
564 let db_metrics = db_metrics.clone();
565 if let Err(e) = tokio::task::spawn_blocking(move || {
566 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
567 }).await {
568 error!("Failed to log metrics with error: {}", e);
569 }
570 }
571 }
572 _ = &mut recv => break,
573 }
574 }
575 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
576 });
577 }
578 DBMap {
579 db: db.clone(),
580 opts: opts.clone(),
581 _phantom: PhantomData,
582 column_family,
583 cf: opt_cf.to_string(),
584 db_metrics: db_metrics_cloned,
585 _metrics_task_cancel_handle: Arc::new(sender),
586 get_sample_interval: db.get_sampling_interval(),
587 multiget_sample_interval: db.multiget_sampling_interval(),
588 write_sample_interval: db.write_sampling_interval(),
589 iter_sample_interval: db.iter_sampling_interval(),
590 }
591 }
592
593 #[instrument(level = "debug", skip(db), err)]
596 pub fn reopen(
597 db: &Arc<Database>,
598 opt_cf: Option<&str>,
599 rw_options: &ReadWriteOptions,
600 is_deprecated: bool,
601 ) -> Result<Self, TypedStoreError> {
602 let cf_key = opt_cf
603 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
604 .to_owned();
605 Ok(DBMap::new(
606 db.clone(),
607 rw_options,
608 &cf_key,
609 ColumnFamily::Rocks(cf_key.to_string()),
610 is_deprecated,
611 ))
612 }
613
614 #[cfg(tidehunter)]
615 pub fn reopen_th(
616 db: Arc<Database>,
617 cf_name: &str,
618 ks: KeySpace,
619 prefix: Option<Vec<u8>>,
620 ) -> Self {
621 DBMap::new(
622 db,
623 &ReadWriteOptions::default(),
624 cf_name,
625 ColumnFamily::TideHunter((ks, prefix.clone())),
626 false,
627 )
628 }
629
630 pub fn cf_name(&self) -> &str {
631 &self.cf
632 }
633
634 pub fn batch(&self) -> DBBatch {
635 let batch = match &self.db.storage {
636 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
637 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
638 #[cfg(tidehunter)]
639 Storage::TideHunter(_) => {
640 StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
641 }
642 };
643 DBBatch::new(
644 &self.db,
645 batch,
646 &self.opts,
647 &self.db_metrics,
648 &self.write_sample_interval,
649 )
650 }
651
652 pub fn flush(&self) -> Result<(), TypedStoreError> {
653 self.db.flush()
654 }
655
656 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
657 let from_buf = be_fix_int_ser(start);
658 let to_buf = be_fix_int_ser(end);
659 self.db
660 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
661 Ok(())
662 }
663
664 pub fn compact_range_raw(
665 &self,
666 cf_name: &str,
667 start: Vec<u8>,
668 end: Vec<u8>,
669 ) -> Result<(), TypedStoreError> {
670 self.db.compact_range_cf(cf_name, Some(start), Some(end));
671 Ok(())
672 }
673
674 #[cfg(tidehunter)]
675 pub fn drop_cells_in_range<J: Serialize>(
676 &self,
677 from_inclusive: &J,
678 to_inclusive: &J,
679 ) -> Result<(), TypedStoreError>
680 where
681 K: Serialize,
682 {
683 let from_buf = be_fix_int_ser(from_inclusive);
684 let to_buf = be_fix_int_ser(to_inclusive);
685 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
686 self.db
687 .drop_cells_in_range(*ks, &from_buf, &to_buf)
688 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
689 }
690 Ok(())
691 }
692
693 fn multi_get_pinned<J>(
695 &self,
696 keys: impl IntoIterator<Item = J>,
697 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
698 where
699 J: Borrow<K>,
700 K: Serialize,
701 {
702 let _timer = self
703 .db_metrics
704 .op_metrics
705 .rocksdb_multiget_latency_seconds
706 .with_label_values(&[&self.cf])
707 .start_timer();
708 let perf_ctx = if self.multiget_sample_interval.sample() {
709 Some(RocksDBPerfContext)
710 } else {
711 None
712 };
713 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
714 let results: Result<Vec<_>, TypedStoreError> = self
715 .db
716 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
717 .into_iter()
718 .collect();
719 let entries = results?;
720 let entry_size = entries
721 .iter()
722 .flatten()
723 .map(|entry| entry.len())
724 .sum::<usize>();
725 self.db_metrics
726 .op_metrics
727 .rocksdb_multiget_bytes
728 .with_label_values(&[&self.cf])
729 .observe(entry_size as f64);
730 if perf_ctx.is_some() {
731 self.db_metrics
732 .read_perf_ctx_metrics
733 .report_metrics(&self.cf);
734 }
735 Ok(entries)
736 }
737
738 fn get_rocksdb_int_property(
739 rocksdb: &RocksDB,
740 cf: &impl AsColumnFamilyRef,
741 property_name: &std::ffi::CStr,
742 ) -> Result<i64, TypedStoreError> {
743 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
744 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
745 Ok(None) => Ok(0),
746 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
747 }
748 }
749
750 fn report_rocksdb_metrics(
751 database: &Arc<Database>,
752 cf_name: &str,
753 db_metrics: &Arc<DBMetrics>,
754 ) {
755 let Storage::Rocks(rocksdb) = &database.storage else {
756 return;
757 };
758
759 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
760 tracing::warn!(
761 "unable to report metrics for cf {cf_name:?} in db {:?}",
762 database.db_name()
763 );
764 return;
765 };
766
767 db_metrics
768 .cf_metrics
769 .rocksdb_total_sst_files_size
770 .with_label_values(&[cf_name])
771 .set(
772 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
773 .unwrap_or(METRICS_ERROR),
774 );
775 db_metrics
776 .cf_metrics
777 .rocksdb_total_blob_files_size
778 .with_label_values(&[cf_name])
779 .set(
780 Self::get_rocksdb_int_property(
781 rocksdb,
782 &cf,
783 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
784 )
785 .unwrap_or(METRICS_ERROR),
786 );
787 let total_num_files: i64 = (0..=6)
790 .map(|level| {
791 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
792 .unwrap_or(METRICS_ERROR)
793 })
794 .sum();
795 db_metrics
796 .cf_metrics
797 .rocksdb_total_num_files
798 .with_label_values(&[cf_name])
799 .set(total_num_files);
800 db_metrics
801 .cf_metrics
802 .rocksdb_num_level0_files
803 .with_label_values(&[cf_name])
804 .set(
805 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
806 .unwrap_or(METRICS_ERROR),
807 );
808 db_metrics
809 .cf_metrics
810 .rocksdb_current_size_active_mem_tables
811 .with_label_values(&[cf_name])
812 .set(
813 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
814 .unwrap_or(METRICS_ERROR),
815 );
816 db_metrics
817 .cf_metrics
818 .rocksdb_size_all_mem_tables
819 .with_label_values(&[cf_name])
820 .set(
821 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
822 .unwrap_or(METRICS_ERROR),
823 );
824 db_metrics
825 .cf_metrics
826 .rocksdb_num_snapshots
827 .with_label_values(&[cf_name])
828 .set(
829 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
830 .unwrap_or(METRICS_ERROR),
831 );
832 db_metrics
833 .cf_metrics
834 .rocksdb_oldest_snapshot_time
835 .with_label_values(&[cf_name])
836 .set(
837 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
838 .unwrap_or(METRICS_ERROR),
839 );
840 db_metrics
841 .cf_metrics
842 .rocksdb_actual_delayed_write_rate
843 .with_label_values(&[cf_name])
844 .set(
845 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
846 .unwrap_or(METRICS_ERROR),
847 );
848 db_metrics
849 .cf_metrics
850 .rocksdb_is_write_stopped
851 .with_label_values(&[cf_name])
852 .set(
853 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
854 .unwrap_or(METRICS_ERROR),
855 );
856 db_metrics
857 .cf_metrics
858 .rocksdb_block_cache_capacity
859 .with_label_values(&[cf_name])
860 .set(
861 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
862 .unwrap_or(METRICS_ERROR),
863 );
864 db_metrics
865 .cf_metrics
866 .rocksdb_block_cache_usage
867 .with_label_values(&[cf_name])
868 .set(
869 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
870 .unwrap_or(METRICS_ERROR),
871 );
872 db_metrics
873 .cf_metrics
874 .rocksdb_block_cache_pinned_usage
875 .with_label_values(&[cf_name])
876 .set(
877 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
878 .unwrap_or(METRICS_ERROR),
879 );
880 db_metrics
881 .cf_metrics
882 .rocksdb_estimate_table_readers_mem
883 .with_label_values(&[cf_name])
884 .set(
885 Self::get_rocksdb_int_property(
886 rocksdb,
887 &cf,
888 properties::ESTIMATE_TABLE_READERS_MEM,
889 )
890 .unwrap_or(METRICS_ERROR),
891 );
892 db_metrics
893 .cf_metrics
894 .rocksdb_estimated_num_keys
895 .with_label_values(&[cf_name])
896 .set(
897 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
898 .unwrap_or(METRICS_ERROR),
899 );
900 db_metrics
901 .cf_metrics
902 .rocksdb_num_immutable_mem_tables
903 .with_label_values(&[cf_name])
904 .set(
905 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
906 .unwrap_or(METRICS_ERROR),
907 );
908 db_metrics
909 .cf_metrics
910 .rocksdb_mem_table_flush_pending
911 .with_label_values(&[cf_name])
912 .set(
913 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
914 .unwrap_or(METRICS_ERROR),
915 );
916 db_metrics
917 .cf_metrics
918 .rocksdb_compaction_pending
919 .with_label_values(&[cf_name])
920 .set(
921 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
922 .unwrap_or(METRICS_ERROR),
923 );
924 db_metrics
925 .cf_metrics
926 .rocksdb_estimate_pending_compaction_bytes
927 .with_label_values(&[cf_name])
928 .set(
929 Self::get_rocksdb_int_property(
930 rocksdb,
931 &cf,
932 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
933 )
934 .unwrap_or(METRICS_ERROR),
935 );
936 db_metrics
937 .cf_metrics
938 .rocksdb_num_running_compactions
939 .with_label_values(&[cf_name])
940 .set(
941 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
942 .unwrap_or(METRICS_ERROR),
943 );
944 db_metrics
945 .cf_metrics
946 .rocksdb_num_running_flushes
947 .with_label_values(&[cf_name])
948 .set(
949 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
950 .unwrap_or(METRICS_ERROR),
951 );
952 db_metrics
953 .cf_metrics
954 .rocksdb_estimate_oldest_key_time
955 .with_label_values(&[cf_name])
956 .set(
957 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
958 .unwrap_or(METRICS_ERROR),
959 );
960 db_metrics
961 .cf_metrics
962 .rocksdb_background_errors
963 .with_label_values(&[cf_name])
964 .set(
965 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
966 .unwrap_or(METRICS_ERROR),
967 );
968 db_metrics
969 .cf_metrics
970 .rocksdb_base_level
971 .with_label_values(&[cf_name])
972 .set(
973 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
974 .unwrap_or(METRICS_ERROR),
975 );
976 }
977
978 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
979 self.db.checkpoint(path)
980 }
981
982 pub fn table_summary(&self) -> eyre::Result<TableSummary>
983 where
984 K: Serialize + DeserializeOwned,
985 V: Serialize + DeserializeOwned,
986 {
987 let mut num_keys = 0;
988 let mut key_bytes_total = 0;
989 let mut value_bytes_total = 0;
990 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
991 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
992 for item in self.safe_iter() {
993 let (key, value) = item?;
994 num_keys += 1;
995 let key_len = be_fix_int_ser(key.borrow()).len();
996 let value_len = bcs::to_bytes(value.borrow())?.len();
997 key_bytes_total += key_len;
998 value_bytes_total += value_len;
999 key_hist.record(key_len as u64)?;
1000 value_hist.record(value_len as u64)?;
1001 }
1002 Ok(TableSummary {
1003 num_keys,
1004 key_bytes_total,
1005 value_bytes_total,
1006 key_hist,
1007 value_hist,
1008 })
1009 }
1010
1011 fn start_iter_timer(&self) -> HistogramTimer {
1012 self.db_metrics
1013 .op_metrics
1014 .rocksdb_iter_latency_seconds
1015 .with_label_values(&[&self.cf])
1016 .start_timer()
1017 }
1018
1019 fn create_iter_context(
1021 &self,
1022 ) -> (
1023 Option<HistogramTimer>,
1024 Option<Histogram>,
1025 Option<Histogram>,
1026 Option<RocksDBPerfContext>,
1027 ) {
1028 let timer = self.start_iter_timer();
1029 let bytes_scanned = self
1030 .db_metrics
1031 .op_metrics
1032 .rocksdb_iter_bytes
1033 .with_label_values(&[&self.cf]);
1034 let keys_scanned = self
1035 .db_metrics
1036 .op_metrics
1037 .rocksdb_iter_keys
1038 .with_label_values(&[&self.cf]);
1039 let perf_ctx = if self.iter_sample_interval.sample() {
1040 Some(RocksDBPerfContext)
1041 } else {
1042 None
1043 };
1044 (
1045 Some(timer),
1046 Some(bytes_scanned),
1047 Some(keys_scanned),
1048 perf_ctx,
1049 )
1050 }
1051
1052 #[allow(clippy::complexity)]
1055 pub fn reversed_safe_iter_with_bounds(
1056 &self,
1057 lower_bound: Option<K>,
1058 upper_bound: Option<K>,
1059 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1060 where
1061 K: Serialize + DeserializeOwned,
1062 V: Serialize + DeserializeOwned,
1063 {
1064 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1065 lower_bound
1066 .as_ref()
1067 .map(Bound::Included)
1068 .unwrap_or(Bound::Unbounded),
1069 upper_bound
1070 .as_ref()
1071 .map(Bound::Included)
1072 .unwrap_or(Bound::Unbounded),
1073 ));
1074 match &self.db.storage {
1075 Storage::Rocks(db) => {
1076 let readopts = rocks_util::apply_range_bounds(
1077 self.opts.readopts(),
1078 it_lower_bound,
1079 it_upper_bound,
1080 );
1081 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1082 let db_iter = db
1083 .underlying
1084 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1085 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1086 let iter = SafeIter::new(
1087 self.cf.clone(),
1088 db_iter,
1089 _timer,
1090 _perf_ctx,
1091 bytes_scanned,
1092 keys_scanned,
1093 Some(self.db_metrics.clone()),
1094 );
1095 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1096 }
1097 Storage::InMemory(db) => {
1098 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1099 }
1100 #[cfg(tidehunter)]
1101 Storage::TideHunter(db) => match &self.column_family {
1102 ColumnFamily::TideHunter((ks, prefix)) => {
1103 let mut iter = db.iterator(*ks);
1104 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1105 iter.reverse();
1106 Ok(Box::new(transform_th_iterator(
1107 iter,
1108 prefix,
1109 self.start_iter_timer(),
1110 )))
1111 }
1112 _ => unreachable!("storage backend invariant violation"),
1113 },
1114 }
1115 }
1116}
1117
1118pub enum StorageWriteBatch {
1119 Rocks(rocksdb::WriteBatch),
1120 InMemory(InMemoryBatch),
1121 #[cfg(tidehunter)]
1122 TideHunter(tidehunter::batch::WriteBatch),
1123}
1124
1125pub struct DBBatch {
1177 database: Arc<Database>,
1178 batch: StorageWriteBatch,
1179 options: ReadWriteOptions,
1180 db_metrics: Arc<DBMetrics>,
1181 write_sample_interval: SamplingInterval,
1182}
1183
1184impl DBBatch {
1185 pub fn new(
1189 dbref: &Arc<Database>,
1190 batch: StorageWriteBatch,
1191 options: &ReadWriteOptions,
1192 db_metrics: &Arc<DBMetrics>,
1193 write_sample_interval: &SamplingInterval,
1194 ) -> Self {
1195 DBBatch {
1196 database: dbref.clone(),
1197 batch,
1198 options: options.clone(),
1199 db_metrics: db_metrics.clone(),
1200 write_sample_interval: write_sample_interval.clone(),
1201 }
1202 }
1203
1204 #[instrument(level = "trace", skip_all, err)]
1206 pub fn write(self) -> Result<(), TypedStoreError> {
1207 self.write_opt(rocksdb::WriteOptions::default())
1208 }
1209
1210 #[instrument(level = "trace", skip_all, err)]
1212 pub fn write_opt(
1213 self,
1214 mut write_options: rocksdb::WriteOptions,
1215 ) -> Result<(), TypedStoreError> {
1216 let db_name = self.database.db_name();
1217 let timer = self
1218 .db_metrics
1219 .op_metrics
1220 .rocksdb_batch_commit_latency_seconds
1221 .with_label_values(&[&db_name])
1222 .start_timer();
1223 let batch_size = self.size_in_bytes();
1224
1225 let perf_ctx = if self.write_sample_interval.sample() {
1226 Some(RocksDBPerfContext)
1227 } else {
1228 None
1229 };
1230
1231 if self.options.sync_writes {
1232 write_options.set_sync(true);
1233 }
1234 self.database
1235 .write_opt_internal(self.batch, &write_options)?;
1236
1237 self.db_metrics
1238 .op_metrics
1239 .rocksdb_batch_commit_bytes
1240 .with_label_values(&[&db_name])
1241 .observe(batch_size as f64);
1242
1243 if perf_ctx.is_some() {
1244 self.db_metrics
1245 .write_perf_ctx_metrics
1246 .report_metrics(&db_name);
1247 }
1248 let elapsed = timer.stop_and_record();
1249 if elapsed > 1.0 {
1250 warn!(?elapsed, ?db_name, "very slow batch write");
1251 self.db_metrics
1252 .op_metrics
1253 .rocksdb_very_slow_batch_writes_count
1254 .with_label_values(&[&db_name])
1255 .inc();
1256 self.db_metrics
1257 .op_metrics
1258 .rocksdb_very_slow_batch_writes_duration_ms
1259 .with_label_values(&[&db_name])
1260 .inc_by((elapsed * 1000.0) as u64);
1261 }
1262 Ok(())
1263 }
1264
1265 pub fn size_in_bytes(&self) -> usize {
1266 match self.batch {
1267 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1268 StorageWriteBatch::InMemory(_) => 0,
1269 #[cfg(tidehunter)]
1271 StorageWriteBatch::TideHunter(_) => 0,
1272 }
1273 }
1274
1275 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1276 &mut self,
1277 db: &DBMap<K, V>,
1278 purged_vals: impl IntoIterator<Item = J>,
1279 ) -> Result<(), TypedStoreError> {
1280 if !Arc::ptr_eq(&db.db, &self.database) {
1281 return Err(TypedStoreError::CrossDBBatch);
1282 }
1283
1284 purged_vals
1285 .into_iter()
1286 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1287 let k_buf = be_fix_int_ser(k.borrow());
1288 match (&mut self.batch, &db.column_family) {
1289 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1290 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1291 }
1292 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1293 b.delete_cf(name, k_buf)
1294 }
1295 #[cfg(tidehunter)]
1296 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1297 b.delete(*ks, transform_th_key(&k_buf, prefix))
1298 }
1299 _ => Err(TypedStoreError::RocksDBError(
1300 "typed store invariant violation".to_string(),
1301 ))?,
1302 }
1303 Ok(())
1304 })?;
1305 Ok(())
1306 }
1307
1308 pub fn schedule_delete_range<K: Serialize, V>(
1318 &mut self,
1319 db: &DBMap<K, V>,
1320 from: &K,
1321 to: &K,
1322 ) -> Result<(), TypedStoreError> {
1323 if !Arc::ptr_eq(&db.db, &self.database) {
1324 return Err(TypedStoreError::CrossDBBatch);
1325 }
1326
1327 let from_buf = be_fix_int_ser(from);
1328 let to_buf = be_fix_int_ser(to);
1329
1330 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1331 b.delete_range_cf(
1332 &rocks_cf_from_db(&self.database, db.cf_name())?,
1333 from_buf,
1334 to_buf,
1335 );
1336 }
1337 Ok(())
1338 }
1339
1340 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1342 &mut self,
1343 db: &DBMap<K, V>,
1344 new_vals: impl IntoIterator<Item = (J, U)>,
1345 ) -> Result<&mut Self, TypedStoreError> {
1346 if !Arc::ptr_eq(&db.db, &self.database) {
1347 return Err(TypedStoreError::CrossDBBatch);
1348 }
1349 let mut total = 0usize;
1350 new_vals
1351 .into_iter()
1352 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1353 let k_buf = be_fix_int_ser(k.borrow());
1354 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1355 total += k_buf.len() + v_buf.len();
1356 if db.opts.log_value_hash {
1357 let key_hash = default_hash(&k_buf);
1358 let value_hash = default_hash(&v_buf);
1359 debug!(
1360 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1361 db.cf_name(),
1362 key_hash,
1363 value_hash
1364 );
1365 }
1366 match (&mut self.batch, &db.column_family) {
1367 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1368 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1369 }
1370 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1371 b.put_cf(name, k_buf, v_buf)
1372 }
1373 #[cfg(tidehunter)]
1374 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1375 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1376 }
1377 _ => Err(TypedStoreError::RocksDBError(
1378 "typed store invariant violation".to_string(),
1379 ))?,
1380 }
1381 Ok(())
1382 })?;
1383 self.db_metrics
1384 .op_metrics
1385 .rocksdb_batch_put_bytes
1386 .with_label_values(&[&db.cf])
1387 .observe(total as f64);
1388 Ok(self)
1389 }
1390
1391 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1392 &mut self,
1393 db: &DBMap<K, V>,
1394 new_vals: impl IntoIterator<Item = (J, U)>,
1395 ) -> Result<&mut Self, TypedStoreError> {
1396 if !Arc::ptr_eq(&db.db, &self.database) {
1397 return Err(TypedStoreError::CrossDBBatch);
1398 }
1399 new_vals
1400 .into_iter()
1401 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1402 let k_buf = be_fix_int_ser(k.borrow());
1403 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1404 match &mut self.batch {
1405 StorageWriteBatch::Rocks(b) => b.merge_cf(
1406 &rocks_cf_from_db(&self.database, db.cf_name())?,
1407 k_buf,
1408 v_buf,
1409 ),
1410 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1411 }
1412 Ok(())
1413 })?;
1414 Ok(self)
1415 }
1416}
1417
1418impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1419where
1420 K: Serialize + DeserializeOwned,
1421 V: Serialize + DeserializeOwned,
1422{
1423 type Error = TypedStoreError;
1424
1425 #[instrument(level = "trace", skip_all, err)]
1426 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1427 let key_buf = be_fix_int_ser(key);
1428 let readopts = self.opts.readopts();
1429 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1430 && self
1431 .db
1432 .get(&self.column_family, &key_buf, &readopts)?
1433 .is_some())
1434 }
1435
1436 #[instrument(level = "trace", skip_all, err)]
1437 fn multi_contains_keys<J>(
1438 &self,
1439 keys: impl IntoIterator<Item = J>,
1440 ) -> Result<Vec<bool>, Self::Error>
1441 where
1442 J: Borrow<K>,
1443 {
1444 let values = self.multi_get_pinned(keys)?;
1445 Ok(values.into_iter().map(|v| v.is_some()).collect())
1446 }
1447
1448 #[instrument(level = "trace", skip_all, err)]
1449 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1450 let _timer = self
1451 .db_metrics
1452 .op_metrics
1453 .rocksdb_get_latency_seconds
1454 .with_label_values(&[&self.cf])
1455 .start_timer();
1456 let perf_ctx = if self.get_sample_interval.sample() {
1457 Some(RocksDBPerfContext)
1458 } else {
1459 None
1460 };
1461 let key_buf = be_fix_int_ser(key);
1462 let res = self
1463 .db
1464 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1465 self.db_metrics
1466 .op_metrics
1467 .rocksdb_get_bytes
1468 .with_label_values(&[&self.cf])
1469 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1470 if perf_ctx.is_some() {
1471 self.db_metrics
1472 .read_perf_ctx_metrics
1473 .report_metrics(&self.cf);
1474 }
1475 match res {
1476 Some(data) => {
1477 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1478 if value.is_err() {
1479 let key_hash = default_hash(&key_buf);
1480 let value_hash = default_hash(&data);
1481 debug_fatal!(
1482 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1483 self.cf_name(),
1484 key_hash,
1485 value_hash,
1486 value.as_ref().err().unwrap()
1487 );
1488 }
1489 Ok(Some(value?))
1490 }
1491 None => Ok(None),
1492 }
1493 }
1494
1495 #[instrument(level = "trace", skip_all, err)]
1496 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1497 let timer = self
1498 .db_metrics
1499 .op_metrics
1500 .rocksdb_put_latency_seconds
1501 .with_label_values(&[&self.cf])
1502 .start_timer();
1503 let perf_ctx = if self.write_sample_interval.sample() {
1504 Some(RocksDBPerfContext)
1505 } else {
1506 None
1507 };
1508 let key_buf = be_fix_int_ser(key);
1509 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1510 self.db_metrics
1511 .op_metrics
1512 .rocksdb_put_bytes
1513 .with_label_values(&[&self.cf])
1514 .observe((key_buf.len() + value_buf.len()) as f64);
1515 if perf_ctx.is_some() {
1516 self.db_metrics
1517 .write_perf_ctx_metrics
1518 .report_metrics(&self.cf);
1519 }
1520 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1521
1522 let elapsed = timer.stop_and_record();
1523 if elapsed > 1.0 {
1524 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1525 self.db_metrics
1526 .op_metrics
1527 .rocksdb_very_slow_puts_count
1528 .with_label_values(&[&self.cf])
1529 .inc();
1530 self.db_metrics
1531 .op_metrics
1532 .rocksdb_very_slow_puts_duration_ms
1533 .with_label_values(&[&self.cf])
1534 .inc_by((elapsed * 1000.0) as u64);
1535 }
1536
1537 Ok(())
1538 }
1539
1540 #[instrument(level = "trace", skip_all, err)]
1541 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1542 let _timer = self
1543 .db_metrics
1544 .op_metrics
1545 .rocksdb_delete_latency_seconds
1546 .with_label_values(&[&self.cf])
1547 .start_timer();
1548 let perf_ctx = if self.write_sample_interval.sample() {
1549 Some(RocksDBPerfContext)
1550 } else {
1551 None
1552 };
1553 let key_buf = be_fix_int_ser(key);
1554 self.db.delete_cf(&self.column_family, key_buf)?;
1555 self.db_metrics
1556 .op_metrics
1557 .rocksdb_deletes
1558 .with_label_values(&[&self.cf])
1559 .inc();
1560 if perf_ctx.is_some() {
1561 self.db_metrics
1562 .write_perf_ctx_metrics
1563 .report_metrics(&self.cf);
1564 }
1565 Ok(())
1566 }
1567
1568 #[instrument(level = "trace", skip_all, err)]
1577 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1578 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1579 let last_key = self
1580 .reversed_safe_iter_with_bounds(None, None)?
1581 .next()
1582 .transpose()?
1583 .map(|(k, _v)| k);
1584 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1585 let mut batch = self.batch();
1586 batch.schedule_delete_range(self, &first_key, &last_key)?;
1587 batch.write()?;
1588 }
1589 Ok(())
1590 }
1591
1592 fn is_empty(&self) -> bool {
1593 self.safe_iter().next().is_none()
1594 }
1595
1596 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1597 match &self.db.storage {
1598 Storage::Rocks(db) => {
1599 let db_iter = db
1600 .underlying
1601 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1602 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1603 Box::new(SafeIter::new(
1604 self.cf.clone(),
1605 db_iter,
1606 _timer,
1607 _perf_ctx,
1608 bytes_scanned,
1609 keys_scanned,
1610 Some(self.db_metrics.clone()),
1611 ))
1612 }
1613 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1614 #[cfg(tidehunter)]
1615 Storage::TideHunter(db) => match &self.column_family {
1616 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1617 db.iterator(*ks),
1618 prefix,
1619 self.start_iter_timer(),
1620 )),
1621 _ => unreachable!("storage backend invariant violation"),
1622 },
1623 }
1624 }
1625
1626 fn safe_iter_with_bounds(
1627 &'a self,
1628 lower_bound: Option<K>,
1629 upper_bound: Option<K>,
1630 ) -> DbIterator<'a, (K, V)> {
1631 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1632 match &self.db.storage {
1633 Storage::Rocks(db) => {
1634 let readopts =
1635 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1636 let db_iter = db
1637 .underlying
1638 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1639 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1640 Box::new(SafeIter::new(
1641 self.cf.clone(),
1642 db_iter,
1643 _timer,
1644 _perf_ctx,
1645 bytes_scanned,
1646 keys_scanned,
1647 Some(self.db_metrics.clone()),
1648 ))
1649 }
1650 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1651 #[cfg(tidehunter)]
1652 Storage::TideHunter(db) => match &self.column_family {
1653 ColumnFamily::TideHunter((ks, prefix)) => {
1654 let mut iter = db.iterator(*ks);
1655 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1656 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1657 }
1658 _ => unreachable!("storage backend invariant violation"),
1659 },
1660 }
1661 }
1662
1663 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1664 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1665 match &self.db.storage {
1666 Storage::Rocks(db) => {
1667 let readopts =
1668 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1669 let db_iter = db
1670 .underlying
1671 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1672 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1673 Box::new(SafeIter::new(
1674 self.cf.clone(),
1675 db_iter,
1676 _timer,
1677 _perf_ctx,
1678 bytes_scanned,
1679 keys_scanned,
1680 Some(self.db_metrics.clone()),
1681 ))
1682 }
1683 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1684 #[cfg(tidehunter)]
1685 Storage::TideHunter(db) => match &self.column_family {
1686 ColumnFamily::TideHunter((ks, prefix)) => {
1687 let mut iter = db.iterator(*ks);
1688 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1689 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1690 }
1691 _ => unreachable!("storage backend invariant violation"),
1692 },
1693 }
1694 }
1695
1696 #[instrument(level = "trace", skip_all, err)]
1698 fn multi_get<J>(
1699 &self,
1700 keys: impl IntoIterator<Item = J>,
1701 ) -> Result<Vec<Option<V>>, TypedStoreError>
1702 where
1703 J: Borrow<K>,
1704 {
1705 let results = self.multi_get_pinned(keys)?;
1706 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1707 .into_iter()
1708 .map(|value_byte| match value_byte {
1709 Some(data) => Ok(Some(
1710 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1711 )),
1712 None => Ok(None),
1713 })
1714 .collect();
1715
1716 values_parsed
1717 }
1718
1719 #[instrument(level = "trace", skip_all, err)]
1721 fn multi_insert<J, U>(
1722 &self,
1723 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1724 ) -> Result<(), Self::Error>
1725 where
1726 J: Borrow<K>,
1727 U: Borrow<V>,
1728 {
1729 let mut batch = self.batch();
1730 batch.insert_batch(self, key_val_pairs)?;
1731 batch.write()
1732 }
1733
1734 #[instrument(level = "trace", skip_all, err)]
1736 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1737 where
1738 J: Borrow<K>,
1739 {
1740 let mut batch = self.batch();
1741 batch.delete_batch(self, keys)?;
1742 batch.write()
1743 }
1744
1745 #[instrument(level = "trace", skip_all, err)]
1747 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1748 if let Storage::Rocks(rocks) = &self.db.storage {
1749 rocks
1750 .underlying
1751 .try_catch_up_with_primary()
1752 .map_err(typed_store_err_from_rocks_err)?;
1753 }
1754 Ok(())
1755 }
1756}
1757
1758#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1760pub fn open_cf_opts<P: AsRef<Path>>(
1761 path: P,
1762 db_options: Option<rocksdb::Options>,
1763 metric_conf: MetricConf,
1764 opt_cfs: &[(&str, rocksdb::Options)],
1765) -> Result<Arc<Database>, TypedStoreError> {
1766 let path = path.as_ref();
1767 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1776 nondeterministic!({
1777 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1778 options.create_if_missing(true);
1779 options.create_missing_column_families(true);
1780 let rocksdb = {
1781 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1782 &options,
1783 path,
1784 cfs.into_iter()
1785 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1786 )
1787 .map_err(typed_store_err_from_rocks_err)?
1788 };
1789 Ok(Arc::new(Database::new(
1790 Storage::Rocks(RocksDB {
1791 underlying: rocksdb,
1792 }),
1793 metric_conf,
1794 )))
1795 })
1796}
1797
1798pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1800 primary_path: P,
1801 secondary_path: Option<P>,
1802 db_options: Option<rocksdb::Options>,
1803 metric_conf: MetricConf,
1804 opt_cfs: &[(&str, rocksdb::Options)],
1805) -> Result<Arc<Database>, TypedStoreError> {
1806 let primary_path = primary_path.as_ref();
1807 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1808 nondeterministic!({
1810 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1812
1813 fdlimit::raise_fd_limit();
1814 options.set_max_open_files(-1);
1816
1817 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1818 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1819 .ok()
1820 .unwrap_or_default();
1821
1822 let default_db_options = default_db_options();
1823 for cf_key in cfs.iter() {
1825 if !opt_cfs.contains_key(&cf_key[..]) {
1826 opt_cfs.insert(cf_key, default_db_options.options.clone());
1827 }
1828 }
1829
1830 let primary_path = primary_path.to_path_buf();
1831 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1832 let mut s = primary_path.clone();
1833 s.pop();
1834 s.push("SECONDARY");
1835 s.as_path().to_path_buf()
1836 });
1837
1838 let rocksdb = {
1839 options.create_if_missing(true);
1840 options.create_missing_column_families(true);
1841 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1842 &options,
1843 &primary_path,
1844 &secondary_path,
1845 opt_cfs
1846 .iter()
1847 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1848 )
1849 .map_err(typed_store_err_from_rocks_err)?;
1850 db.try_catch_up_with_primary()
1851 .map_err(typed_store_err_from_rocks_err)?;
1852 db
1853 };
1854 Ok(Arc::new(Database::new(
1855 Storage::Rocks(RocksDB {
1856 underlying: rocksdb,
1857 }),
1858 metric_conf,
1859 )))
1860 })
1861}
1862
1863#[cfg(not(tidehunter))]
1865pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1866 let mut backoff = backoff::ExponentialBackoff {
1867 max_elapsed_time: Some(timeout),
1868 ..Default::default()
1869 };
1870 loop {
1871 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1872 Ok(()) => return Ok(()),
1873 Err(err) => match backoff.next_backoff() {
1874 Some(duration) => tokio::time::sleep(duration).await,
1875 None => return Err(err),
1876 },
1877 }
1878 }
1879}
1880
1881#[cfg(tidehunter)]
1882pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1883 std::fs::remove_dir_all(path)
1884}
1885
1886fn populate_missing_cfs(
1887 input_cfs: &[(&str, rocksdb::Options)],
1888 path: &Path,
1889) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1890 let mut cfs = vec![];
1891 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1892 let existing_cfs =
1893 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1894 .ok()
1895 .unwrap_or_default();
1896
1897 for cf_name in existing_cfs {
1898 if !input_cf_index.contains(&cf_name[..]) {
1899 cfs.push((cf_name, rocksdb::Options::default()));
1900 }
1901 }
1902 cfs.extend(
1903 input_cfs
1904 .iter()
1905 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1906 );
1907 Ok(cfs)
1908}
1909
1910fn default_hash(value: &[u8]) -> Digest<32> {
1911 let mut hasher = fastcrypto::hash::Blake2b256::default();
1912 hasher.update(value);
1913 hasher.finalize()
1914}