1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{be_fix_int_ser, iterator_bounds, iterator_bounds_with_range};
20use crate::{DbIterator, TypedStoreError};
21use crate::{
22 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
23 traits::{Map, TableSummary},
24};
25use backoff::backoff::Backoff;
26use fastcrypto::hash::{Digest, HashFunction};
27use mysten_common::debug_fatal;
28use prometheus::{Histogram, HistogramTimer};
29use rocksdb::properties::num_files_at_level;
30use rocksdb::{
31 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
32 properties,
33};
34use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
35use serde::{Serialize, de::DeserializeOwned};
36use std::ops::{Bound, Deref};
37use std::{
38 borrow::Borrow,
39 marker::PhantomData,
40 ops::RangeBounds,
41 path::{Path, PathBuf},
42 sync::Arc,
43 time::Duration,
44};
45use std::{collections::HashSet, ffi::CStr};
46use sui_macros::{fail_point, nondeterministic};
47#[cfg(tidehunter)]
48use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
49use tokio::sync::oneshot;
50use tracing::{debug, error, instrument, warn};
51
52const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
55 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
56
57#[cfg(test)]
58mod tests;
59
60#[derive(Debug)]
61pub struct RocksDB {
62 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
63}
64
65impl Drop for RocksDB {
66 fn drop(&mut self) {
67 self.underlying.cancel_all_background_work(true);
68 }
69}
70
71#[derive(Clone)]
72pub enum ColumnFamily {
73 Rocks(String),
74 InMemory(String),
75 #[cfg(tidehunter)]
76 TideHunter((KeySpace, Option<Vec<u8>>)),
77}
78
79impl std::fmt::Debug for ColumnFamily {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
83 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
84 #[cfg(tidehunter)]
85 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
86 }
87 }
88}
89
90impl ColumnFamily {
91 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
92 match &self {
93 ColumnFamily::Rocks(name) => rocks_db
94 .underlying
95 .cf_handle(name)
96 .expect("Map-keying column family should have been checked at DB creation"),
97 _ => unreachable!("invariant is checked by the caller"),
98 }
99 }
100}
101
102pub enum Storage {
103 Rocks(RocksDB),
104 InMemory(InMemoryDB),
105 #[cfg(tidehunter)]
106 TideHunter(Arc<TideHunterDb>),
107}
108
109impl std::fmt::Debug for Storage {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 match self {
112 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
113 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
114 #[cfg(tidehunter)]
115 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
116 }
117 }
118}
119
120#[derive(Debug)]
121pub struct Database {
122 storage: Storage,
123 metric_conf: MetricConf,
124}
125
126impl Drop for Database {
127 fn drop(&mut self) {
128 DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
129 }
130}
131
132enum GetResult<'a> {
133 Rocks(DBPinnableSlice<'a>),
134 InMemory(Vec<u8>),
135 #[cfg(tidehunter)]
136 TideHunter(tidehunter::minibytes::Bytes),
137}
138
139impl Deref for GetResult<'_> {
140 type Target = [u8];
141 fn deref(&self) -> &[u8] {
142 match self {
143 GetResult::Rocks(d) => d.deref(),
144 GetResult::InMemory(d) => d.deref(),
145 #[cfg(tidehunter)]
146 GetResult::TideHunter(d) => d.deref(),
147 }
148 }
149}
150
151impl Database {
152 pub fn new(storage: Storage, metric_conf: MetricConf) -> Self {
153 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
154 Self {
155 storage,
156 metric_conf,
157 }
158 }
159
160 pub fn flush(&self) -> Result<(), TypedStoreError> {
162 match &self.storage {
163 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
164 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
165 }),
166 Storage::InMemory(_) => {
167 Ok(())
169 }
170 #[cfg(tidehunter)]
171 Storage::TideHunter(_) => {
172 Ok(())
174 }
175 }
176 }
177
178 fn get<K: AsRef<[u8]>>(
179 &self,
180 cf: &ColumnFamily,
181 key: K,
182 readopts: &ReadOptions,
183 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
184 match (&self.storage, cf) {
185 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
186 .underlying
187 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
188 .map_err(typed_store_err_from_rocks_err)?
189 .map(GetResult::Rocks)),
190 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
191 Ok(db.get(cf_name, key).map(GetResult::InMemory))
192 }
193 #[cfg(tidehunter)]
194 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
195 .get(*ks, &transform_th_key(key.as_ref(), prefix))
196 .map_err(typed_store_error_from_th_error)?
197 .map(GetResult::TideHunter)),
198
199 _ => Err(TypedStoreError::RocksDBError(
200 "typed store invariant violation".to_string(),
201 )),
202 }
203 }
204
205 fn multi_get<I, K>(
206 &self,
207 cf: &ColumnFamily,
208 keys: I,
209 readopts: &ReadOptions,
210 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
211 where
212 I: IntoIterator<Item = K>,
213 K: AsRef<[u8]>,
214 {
215 match (&self.storage, cf) {
216 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
217 let keys_vec: Vec<K> = keys.into_iter().collect();
218 let res = db.underlying.batched_multi_get_cf_opt(
219 &cf.rocks_cf(db),
220 keys_vec.iter(),
221 false,
222 readopts,
223 );
224 res.into_iter()
225 .map(|r| {
226 r.map_err(typed_store_err_from_rocks_err)
227 .map(|item| item.map(GetResult::Rocks))
228 })
229 .collect()
230 }
231 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
232 .multi_get(cf_name, keys)
233 .into_iter()
234 .map(|r| Ok(r.map(GetResult::InMemory)))
235 .collect(),
236 #[cfg(tidehunter)]
237 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
238 let res = keys.into_iter().map(|k| {
239 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
240 .map_err(typed_store_error_from_th_error)
241 });
242 res.into_iter()
243 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
244 .collect()
245 }
246 _ => unreachable!("typed store invariant violation"),
247 }
248 }
249
250 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
251 match &self.storage {
252 Storage::Rocks(db) => db.underlying.drop_cf(name),
253 Storage::InMemory(db) => {
254 db.drop_cf(name);
255 Ok(())
256 }
257 #[cfg(tidehunter)]
258 Storage::TideHunter(_) => {
259 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
260 }
261 }
262 }
263
264 pub fn delete_file_in_range<K: AsRef<[u8]>>(
265 &self,
266 cf: &impl AsColumnFamilyRef,
267 from: K,
268 to: K,
269 ) -> Result<(), rocksdb::Error> {
270 match &self.storage {
271 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
272 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
273 }
274 }
275
276 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
277 fail_point!("delete-cf-before");
278 let ret = match (&self.storage, cf) {
279 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
280 .underlying
281 .delete_cf(&cf.rocks_cf(db), key)
282 .map_err(typed_store_err_from_rocks_err),
283 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
284 db.delete(cf_name, key.as_ref());
285 Ok(())
286 }
287 #[cfg(tidehunter)]
288 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
289 .remove(*ks, transform_th_key(key.as_ref(), prefix))
290 .map_err(typed_store_error_from_th_error),
291 _ => Err(TypedStoreError::RocksDBError(
292 "typed store invariant violation".to_string(),
293 )),
294 };
295 fail_point!("delete-cf-after");
296 #[allow(clippy::let_and_return)]
297 ret
298 }
299
300 pub fn path_for_pruning(&self) -> &Path {
301 match &self.storage {
302 Storage::Rocks(rocks) => rocks.underlying.path(),
303 _ => unimplemented!("method is only supported for rocksdb backend"),
304 }
305 }
306
307 fn put_cf(
308 &self,
309 cf: &ColumnFamily,
310 key: Vec<u8>,
311 value: Vec<u8>,
312 ) -> Result<(), TypedStoreError> {
313 fail_point!("put-cf-before");
314 let ret = match (&self.storage, cf) {
315 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
316 .underlying
317 .put_cf(&cf.rocks_cf(db), key, value)
318 .map_err(typed_store_err_from_rocks_err),
319 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
320 db.put(cf_name, key, value);
321 Ok(())
322 }
323 #[cfg(tidehunter)]
324 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
325 .insert(*ks, transform_th_key(&key, prefix), value)
326 .map_err(typed_store_error_from_th_error),
327 _ => Err(TypedStoreError::RocksDBError(
328 "typed store invariant violation".to_string(),
329 )),
330 };
331 fail_point!("put-cf-after");
332 #[allow(clippy::let_and_return)]
333 ret
334 }
335
336 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
337 &self,
338 cf_name: &str,
339 key: K,
340 readopts: &ReadOptions,
341 ) -> bool {
342 match &self.storage {
343 Storage::Rocks(rocks) => {
346 rocks
347 .underlying
348 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
349 }
350 _ => true,
351 }
352 }
353
354 pub fn write(&self, batch: StorageWriteBatch) -> Result<(), TypedStoreError> {
355 self.write_opt(batch, &rocksdb::WriteOptions::default())
356 }
357
358 pub fn write_opt(
359 &self,
360 batch: StorageWriteBatch,
361 write_options: &rocksdb::WriteOptions,
362 ) -> Result<(), TypedStoreError> {
363 fail_point!("batch-write-before");
364 let ret = match (&self.storage, batch) {
365 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
366 .underlying
367 .write_opt(batch, write_options)
368 .map_err(typed_store_err_from_rocks_err),
369 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
370 db.write(batch);
372 Ok(())
373 }
374 #[cfg(tidehunter)]
375 (Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => {
376 db.write_batch(batch)
378 .map_err(typed_store_error_from_th_error)
379 }
380 _ => Err(TypedStoreError::RocksDBError(
381 "using invalid batch type for the database".to_string(),
382 )),
383 };
384 fail_point!("batch-write-after");
385 #[allow(clippy::let_and_return)]
386 ret
387 }
388
389 #[cfg(tidehunter)]
390 pub fn start_relocation(&self) -> anyhow::Result<()> {
391 if let Storage::TideHunter(db) = &self.storage {
392 db.start_relocation()?;
393 }
394 Ok(())
395 }
396
397 #[cfg(tidehunter)]
398 pub fn drop_cells_in_range(
399 &self,
400 ks: KeySpace,
401 from_inclusive: &[u8],
402 to_inclusive: &[u8],
403 ) -> anyhow::Result<()> {
404 if let Storage::TideHunter(db) = &self.storage {
405 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
406 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
407 } else {
408 panic!("drop_cells_in_range called on non-TideHunter storage");
409 }
410 Ok(())
411 }
412
413 pub fn compact_range_cf<K: AsRef<[u8]>>(
414 &self,
415 cf_name: &str,
416 start: Option<K>,
417 end: Option<K>,
418 ) {
419 if let Storage::Rocks(rocksdb) = &self.storage {
420 rocksdb
421 .underlying
422 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
423 }
424 }
425
426 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
427 if let Storage::Rocks(rocks) = &self.storage {
429 let checkpoint =
430 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
431 checkpoint
432 .create_checkpoint(path)
433 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
434 }
435 Ok(())
436 }
437
438 pub fn get_sampling_interval(&self) -> SamplingInterval {
439 self.metric_conf.read_sample_interval.new_from_self()
440 }
441
442 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
443 self.metric_conf.read_sample_interval.new_from_self()
444 }
445
446 pub fn write_sampling_interval(&self) -> SamplingInterval {
447 self.metric_conf.write_sample_interval.new_from_self()
448 }
449
450 pub fn iter_sampling_interval(&self) -> SamplingInterval {
451 self.metric_conf.iter_sample_interval.new_from_self()
452 }
453
454 fn db_name(&self) -> String {
455 let name = &self.metric_conf.db_name;
456 if name.is_empty() {
457 "default".to_string()
458 } else {
459 name.clone()
460 }
461 }
462
463 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
464 match &self.storage {
465 Storage::Rocks(rocks) => rocks.underlying.live_files(),
466 _ => Ok(vec![]),
467 }
468 }
469}
470
471fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
472 rocks_db
473 .underlying
474 .cf_handle(cf_name)
475 .expect("Map-keying column family should have been checked at DB creation")
476}
477
478fn rocks_cf_from_db<'a>(
479 db: &'a Database,
480 cf_name: &str,
481) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
482 match &db.storage {
483 Storage::Rocks(rocksdb) => Ok(rocksdb
484 .underlying
485 .cf_handle(cf_name)
486 .expect("Map-keying column family should have been checked at DB creation")),
487 _ => Err(TypedStoreError::RocksDBError(
488 "using invalid batch type for the database".to_string(),
489 )),
490 }
491}
492
493#[derive(Debug, Default)]
494pub struct MetricConf {
495 pub db_name: String,
496 pub read_sample_interval: SamplingInterval,
497 pub write_sample_interval: SamplingInterval,
498 pub iter_sample_interval: SamplingInterval,
499}
500
501impl MetricConf {
502 pub fn new(db_name: &str) -> Self {
503 if db_name.is_empty() {
504 error!("A meaningful db name should be used for metrics reporting.")
505 }
506 Self {
507 db_name: db_name.to_string(),
508 read_sample_interval: SamplingInterval::default(),
509 write_sample_interval: SamplingInterval::default(),
510 iter_sample_interval: SamplingInterval::default(),
511 }
512 }
513
514 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
515 Self {
516 db_name: self.db_name,
517 read_sample_interval: read_interval,
518 write_sample_interval: SamplingInterval::default(),
519 iter_sample_interval: SamplingInterval::default(),
520 }
521 }
522}
523const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
524const METRICS_ERROR: i64 = -1;
525
526#[derive(Clone, Debug)]
528pub struct DBMap<K, V> {
529 pub db: Arc<Database>,
530 _phantom: PhantomData<fn(K) -> V>,
531 column_family: ColumnFamily,
532 cf: String,
534 pub opts: ReadWriteOptions,
535 db_metrics: Arc<DBMetrics>,
536 get_sample_interval: SamplingInterval,
537 multiget_sample_interval: SamplingInterval,
538 write_sample_interval: SamplingInterval,
539 iter_sample_interval: SamplingInterval,
540 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
541}
542
543unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
544
545impl<K, V> DBMap<K, V> {
546 pub(crate) fn new(
547 db: Arc<Database>,
548 opts: &ReadWriteOptions,
549 opt_cf: &str,
550 column_family: ColumnFamily,
551 is_deprecated: bool,
552 ) -> Self {
553 let db_cloned = Arc::downgrade(&db.clone());
554 let db_metrics = DBMetrics::get();
555 let db_metrics_cloned = db_metrics.clone();
556 let cf = opt_cf.to_string();
557
558 let (sender, mut recv) = tokio::sync::oneshot::channel();
559 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
560 tokio::task::spawn(async move {
561 let mut interval =
562 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
563 loop {
564 tokio::select! {
565 _ = interval.tick() => {
566 if let Some(db) = db_cloned.upgrade() {
567 let cf = cf.clone();
568 let db_metrics = db_metrics.clone();
569 if let Err(e) = tokio::task::spawn_blocking(move || {
570 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
571 }).await {
572 error!("Failed to log metrics with error: {}", e);
573 }
574 }
575 }
576 _ = &mut recv => break,
577 }
578 }
579 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
580 });
581 }
582 DBMap {
583 db: db.clone(),
584 opts: opts.clone(),
585 _phantom: PhantomData,
586 column_family,
587 cf: opt_cf.to_string(),
588 db_metrics: db_metrics_cloned,
589 _metrics_task_cancel_handle: Arc::new(sender),
590 get_sample_interval: db.get_sampling_interval(),
591 multiget_sample_interval: db.multiget_sampling_interval(),
592 write_sample_interval: db.write_sampling_interval(),
593 iter_sample_interval: db.iter_sampling_interval(),
594 }
595 }
596
597 #[instrument(level = "debug", skip(db), err)]
600 pub fn reopen(
601 db: &Arc<Database>,
602 opt_cf: Option<&str>,
603 rw_options: &ReadWriteOptions,
604 is_deprecated: bool,
605 ) -> Result<Self, TypedStoreError> {
606 let cf_key = opt_cf
607 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
608 .to_owned();
609 Ok(DBMap::new(
610 db.clone(),
611 rw_options,
612 &cf_key,
613 ColumnFamily::Rocks(cf_key.to_string()),
614 is_deprecated,
615 ))
616 }
617
618 #[cfg(tidehunter)]
619 pub fn reopen_th(
620 db: Arc<Database>,
621 cf_name: &str,
622 ks: KeySpace,
623 prefix: Option<Vec<u8>>,
624 ) -> Self {
625 DBMap::new(
626 db,
627 &ReadWriteOptions::default(),
628 cf_name,
629 ColumnFamily::TideHunter((ks, prefix.clone())),
630 false,
631 )
632 }
633
634 pub fn cf_name(&self) -> &str {
635 &self.cf
636 }
637
638 pub fn batch(&self) -> DBBatch {
639 let batch = match &self.db.storage {
640 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
641 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
642 #[cfg(tidehunter)]
643 Storage::TideHunter(_) => {
644 StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
645 }
646 };
647 DBBatch::new(
648 &self.db,
649 batch,
650 &self.db_metrics,
651 &self.write_sample_interval,
652 )
653 }
654
655 pub fn flush(&self) -> Result<(), TypedStoreError> {
656 self.db.flush()
657 }
658
659 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
660 let from_buf = be_fix_int_ser(start);
661 let to_buf = be_fix_int_ser(end);
662 self.db
663 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
664 Ok(())
665 }
666
667 pub fn compact_range_raw(
668 &self,
669 cf_name: &str,
670 start: Vec<u8>,
671 end: Vec<u8>,
672 ) -> Result<(), TypedStoreError> {
673 self.db.compact_range_cf(cf_name, Some(start), Some(end));
674 Ok(())
675 }
676
677 #[cfg(tidehunter)]
678 pub fn drop_cells_in_range<J: Serialize>(
679 &self,
680 from_inclusive: &J,
681 to_inclusive: &J,
682 ) -> Result<(), TypedStoreError>
683 where
684 K: Serialize,
685 {
686 let from_buf = be_fix_int_ser(from_inclusive);
687 let to_buf = be_fix_int_ser(to_inclusive);
688 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
689 self.db
690 .drop_cells_in_range(*ks, &from_buf, &to_buf)
691 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
692 }
693 Ok(())
694 }
695
696 fn multi_get_pinned<J>(
698 &self,
699 keys: impl IntoIterator<Item = J>,
700 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
701 where
702 J: Borrow<K>,
703 K: Serialize,
704 {
705 let _timer = self
706 .db_metrics
707 .op_metrics
708 .rocksdb_multiget_latency_seconds
709 .with_label_values(&[&self.cf])
710 .start_timer();
711 let perf_ctx = if self.multiget_sample_interval.sample() {
712 Some(RocksDBPerfContext)
713 } else {
714 None
715 };
716 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
717 let results: Result<Vec<_>, TypedStoreError> = self
718 .db
719 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
720 .into_iter()
721 .collect();
722 let entries = results?;
723 let entry_size = entries
724 .iter()
725 .flatten()
726 .map(|entry| entry.len())
727 .sum::<usize>();
728 self.db_metrics
729 .op_metrics
730 .rocksdb_multiget_bytes
731 .with_label_values(&[&self.cf])
732 .observe(entry_size as f64);
733 if perf_ctx.is_some() {
734 self.db_metrics
735 .read_perf_ctx_metrics
736 .report_metrics(&self.cf);
737 }
738 Ok(entries)
739 }
740
741 fn get_rocksdb_int_property(
742 rocksdb: &RocksDB,
743 cf: &impl AsColumnFamilyRef,
744 property_name: &std::ffi::CStr,
745 ) -> Result<i64, TypedStoreError> {
746 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
747 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
748 Ok(None) => Ok(0),
749 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
750 }
751 }
752
753 fn report_rocksdb_metrics(
754 database: &Arc<Database>,
755 cf_name: &str,
756 db_metrics: &Arc<DBMetrics>,
757 ) {
758 let Storage::Rocks(rocksdb) = &database.storage else {
759 return;
760 };
761
762 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
763 tracing::warn!(
764 "unable to report metrics for cf {cf_name:?} in db {:?}",
765 database.db_name()
766 );
767 return;
768 };
769
770 db_metrics
771 .cf_metrics
772 .rocksdb_total_sst_files_size
773 .with_label_values(&[cf_name])
774 .set(
775 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
776 .unwrap_or(METRICS_ERROR),
777 );
778 db_metrics
779 .cf_metrics
780 .rocksdb_total_blob_files_size
781 .with_label_values(&[cf_name])
782 .set(
783 Self::get_rocksdb_int_property(
784 rocksdb,
785 &cf,
786 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
787 )
788 .unwrap_or(METRICS_ERROR),
789 );
790 let total_num_files: i64 = (0..=6)
793 .map(|level| {
794 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
795 .unwrap_or(METRICS_ERROR)
796 })
797 .sum();
798 db_metrics
799 .cf_metrics
800 .rocksdb_total_num_files
801 .with_label_values(&[cf_name])
802 .set(total_num_files);
803 db_metrics
804 .cf_metrics
805 .rocksdb_num_level0_files
806 .with_label_values(&[cf_name])
807 .set(
808 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
809 .unwrap_or(METRICS_ERROR),
810 );
811 db_metrics
812 .cf_metrics
813 .rocksdb_current_size_active_mem_tables
814 .with_label_values(&[cf_name])
815 .set(
816 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
817 .unwrap_or(METRICS_ERROR),
818 );
819 db_metrics
820 .cf_metrics
821 .rocksdb_size_all_mem_tables
822 .with_label_values(&[cf_name])
823 .set(
824 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
825 .unwrap_or(METRICS_ERROR),
826 );
827 db_metrics
828 .cf_metrics
829 .rocksdb_num_snapshots
830 .with_label_values(&[cf_name])
831 .set(
832 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
833 .unwrap_or(METRICS_ERROR),
834 );
835 db_metrics
836 .cf_metrics
837 .rocksdb_oldest_snapshot_time
838 .with_label_values(&[cf_name])
839 .set(
840 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
841 .unwrap_or(METRICS_ERROR),
842 );
843 db_metrics
844 .cf_metrics
845 .rocksdb_actual_delayed_write_rate
846 .with_label_values(&[cf_name])
847 .set(
848 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
849 .unwrap_or(METRICS_ERROR),
850 );
851 db_metrics
852 .cf_metrics
853 .rocksdb_is_write_stopped
854 .with_label_values(&[cf_name])
855 .set(
856 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
857 .unwrap_or(METRICS_ERROR),
858 );
859 db_metrics
860 .cf_metrics
861 .rocksdb_block_cache_capacity
862 .with_label_values(&[cf_name])
863 .set(
864 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
865 .unwrap_or(METRICS_ERROR),
866 );
867 db_metrics
868 .cf_metrics
869 .rocksdb_block_cache_usage
870 .with_label_values(&[cf_name])
871 .set(
872 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
873 .unwrap_or(METRICS_ERROR),
874 );
875 db_metrics
876 .cf_metrics
877 .rocksdb_block_cache_pinned_usage
878 .with_label_values(&[cf_name])
879 .set(
880 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
881 .unwrap_or(METRICS_ERROR),
882 );
883 db_metrics
884 .cf_metrics
885 .rocksdb_estimate_table_readers_mem
886 .with_label_values(&[cf_name])
887 .set(
888 Self::get_rocksdb_int_property(
889 rocksdb,
890 &cf,
891 properties::ESTIMATE_TABLE_READERS_MEM,
892 )
893 .unwrap_or(METRICS_ERROR),
894 );
895 db_metrics
896 .cf_metrics
897 .rocksdb_estimated_num_keys
898 .with_label_values(&[cf_name])
899 .set(
900 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
901 .unwrap_or(METRICS_ERROR),
902 );
903 db_metrics
904 .cf_metrics
905 .rocksdb_num_immutable_mem_tables
906 .with_label_values(&[cf_name])
907 .set(
908 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
909 .unwrap_or(METRICS_ERROR),
910 );
911 db_metrics
912 .cf_metrics
913 .rocksdb_mem_table_flush_pending
914 .with_label_values(&[cf_name])
915 .set(
916 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
917 .unwrap_or(METRICS_ERROR),
918 );
919 db_metrics
920 .cf_metrics
921 .rocksdb_compaction_pending
922 .with_label_values(&[cf_name])
923 .set(
924 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
925 .unwrap_or(METRICS_ERROR),
926 );
927 db_metrics
928 .cf_metrics
929 .rocksdb_estimate_pending_compaction_bytes
930 .with_label_values(&[cf_name])
931 .set(
932 Self::get_rocksdb_int_property(
933 rocksdb,
934 &cf,
935 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
936 )
937 .unwrap_or(METRICS_ERROR),
938 );
939 db_metrics
940 .cf_metrics
941 .rocksdb_num_running_compactions
942 .with_label_values(&[cf_name])
943 .set(
944 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
945 .unwrap_or(METRICS_ERROR),
946 );
947 db_metrics
948 .cf_metrics
949 .rocksdb_num_running_flushes
950 .with_label_values(&[cf_name])
951 .set(
952 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
953 .unwrap_or(METRICS_ERROR),
954 );
955 db_metrics
956 .cf_metrics
957 .rocksdb_estimate_oldest_key_time
958 .with_label_values(&[cf_name])
959 .set(
960 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
961 .unwrap_or(METRICS_ERROR),
962 );
963 db_metrics
964 .cf_metrics
965 .rocksdb_background_errors
966 .with_label_values(&[cf_name])
967 .set(
968 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
969 .unwrap_or(METRICS_ERROR),
970 );
971 db_metrics
972 .cf_metrics
973 .rocksdb_base_level
974 .with_label_values(&[cf_name])
975 .set(
976 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
977 .unwrap_or(METRICS_ERROR),
978 );
979 }
980
981 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
982 self.db.checkpoint(path)
983 }
984
985 pub fn table_summary(&self) -> eyre::Result<TableSummary>
986 where
987 K: Serialize + DeserializeOwned,
988 V: Serialize + DeserializeOwned,
989 {
990 let mut num_keys = 0;
991 let mut key_bytes_total = 0;
992 let mut value_bytes_total = 0;
993 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
994 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
995 for item in self.safe_iter() {
996 let (key, value) = item?;
997 num_keys += 1;
998 let key_len = be_fix_int_ser(key.borrow()).len();
999 let value_len = bcs::to_bytes(value.borrow())?.len();
1000 key_bytes_total += key_len;
1001 value_bytes_total += value_len;
1002 key_hist.record(key_len as u64)?;
1003 value_hist.record(value_len as u64)?;
1004 }
1005 Ok(TableSummary {
1006 num_keys,
1007 key_bytes_total,
1008 value_bytes_total,
1009 key_hist,
1010 value_hist,
1011 })
1012 }
1013
1014 fn start_iter_timer(&self) -> HistogramTimer {
1015 self.db_metrics
1016 .op_metrics
1017 .rocksdb_iter_latency_seconds
1018 .with_label_values(&[&self.cf])
1019 .start_timer()
1020 }
1021
1022 fn create_iter_context(
1024 &self,
1025 ) -> (
1026 Option<HistogramTimer>,
1027 Option<Histogram>,
1028 Option<Histogram>,
1029 Option<RocksDBPerfContext>,
1030 ) {
1031 let timer = self.start_iter_timer();
1032 let bytes_scanned = self
1033 .db_metrics
1034 .op_metrics
1035 .rocksdb_iter_bytes
1036 .with_label_values(&[&self.cf]);
1037 let keys_scanned = self
1038 .db_metrics
1039 .op_metrics
1040 .rocksdb_iter_keys
1041 .with_label_values(&[&self.cf]);
1042 let perf_ctx = if self.iter_sample_interval.sample() {
1043 Some(RocksDBPerfContext)
1044 } else {
1045 None
1046 };
1047 (
1048 Some(timer),
1049 Some(bytes_scanned),
1050 Some(keys_scanned),
1051 perf_ctx,
1052 )
1053 }
1054
1055 #[allow(clippy::complexity)]
1058 pub fn reversed_safe_iter_with_bounds(
1059 &self,
1060 lower_bound: Option<K>,
1061 upper_bound: Option<K>,
1062 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1063 where
1064 K: Serialize + DeserializeOwned,
1065 V: Serialize + DeserializeOwned,
1066 {
1067 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1068 lower_bound
1069 .as_ref()
1070 .map(Bound::Included)
1071 .unwrap_or(Bound::Unbounded),
1072 upper_bound
1073 .as_ref()
1074 .map(Bound::Included)
1075 .unwrap_or(Bound::Unbounded),
1076 ));
1077 match &self.db.storage {
1078 Storage::Rocks(db) => {
1079 let readopts = rocks_util::apply_range_bounds(
1080 self.opts.readopts(),
1081 it_lower_bound,
1082 it_upper_bound,
1083 );
1084 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1085 let db_iter = db
1086 .underlying
1087 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1088 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1089 let iter = SafeIter::new(
1090 self.cf.clone(),
1091 db_iter,
1092 _timer,
1093 _perf_ctx,
1094 bytes_scanned,
1095 keys_scanned,
1096 Some(self.db_metrics.clone()),
1097 );
1098 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1099 }
1100 Storage::InMemory(db) => {
1101 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1102 }
1103 #[cfg(tidehunter)]
1104 Storage::TideHunter(db) => match &self.column_family {
1105 ColumnFamily::TideHunter((ks, prefix)) => {
1106 let mut iter = db.iterator(*ks);
1107 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1108 iter.reverse();
1109 Ok(Box::new(transform_th_iterator(
1110 iter,
1111 prefix,
1112 self.start_iter_timer(),
1113 )))
1114 }
1115 _ => unreachable!("storage backend invariant violation"),
1116 },
1117 }
1118 }
1119}
1120
1121pub enum StorageWriteBatch {
1122 Rocks(rocksdb::WriteBatch),
1123 InMemory(InMemoryBatch),
1124 #[cfg(tidehunter)]
1125 TideHunter(tidehunter::batch::WriteBatch),
1126}
1127
1128pub struct DBBatch {
1180 database: Arc<Database>,
1181 batch: StorageWriteBatch,
1182 db_metrics: Arc<DBMetrics>,
1183 write_sample_interval: SamplingInterval,
1184}
1185
1186impl DBBatch {
1187 pub fn new(
1191 dbref: &Arc<Database>,
1192 batch: StorageWriteBatch,
1193 db_metrics: &Arc<DBMetrics>,
1194 write_sample_interval: &SamplingInterval,
1195 ) -> Self {
1196 DBBatch {
1197 database: dbref.clone(),
1198 batch,
1199 db_metrics: db_metrics.clone(),
1200 write_sample_interval: write_sample_interval.clone(),
1201 }
1202 }
1203
1204 #[instrument(level = "trace", skip_all, err)]
1206 pub fn write(self) -> Result<(), TypedStoreError> {
1207 self.write_opt(&rocksdb::WriteOptions::default())
1208 }
1209
1210 #[instrument(level = "trace", skip_all, err)]
1212 pub fn write_opt(self, write_options: &rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1213 let db_name = self.database.db_name();
1214 let timer = self
1215 .db_metrics
1216 .op_metrics
1217 .rocksdb_batch_commit_latency_seconds
1218 .with_label_values(&[&db_name])
1219 .start_timer();
1220 let batch_size = self.size_in_bytes();
1221
1222 let perf_ctx = if self.write_sample_interval.sample() {
1223 Some(RocksDBPerfContext)
1224 } else {
1225 None
1226 };
1227 self.database.write_opt(self.batch, write_options)?;
1228 self.db_metrics
1229 .op_metrics
1230 .rocksdb_batch_commit_bytes
1231 .with_label_values(&[&db_name])
1232 .observe(batch_size as f64);
1233
1234 if perf_ctx.is_some() {
1235 self.db_metrics
1236 .write_perf_ctx_metrics
1237 .report_metrics(&db_name);
1238 }
1239 let elapsed = timer.stop_and_record();
1240 if elapsed > 1.0 {
1241 warn!(?elapsed, ?db_name, "very slow batch write");
1242 self.db_metrics
1243 .op_metrics
1244 .rocksdb_very_slow_batch_writes_count
1245 .with_label_values(&[&db_name])
1246 .inc();
1247 self.db_metrics
1248 .op_metrics
1249 .rocksdb_very_slow_batch_writes_duration_ms
1250 .with_label_values(&[&db_name])
1251 .inc_by((elapsed * 1000.0) as u64);
1252 }
1253 Ok(())
1254 }
1255
1256 pub fn size_in_bytes(&self) -> usize {
1257 match self.batch {
1258 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1259 StorageWriteBatch::InMemory(_) => 0,
1260 #[cfg(tidehunter)]
1262 StorageWriteBatch::TideHunter(_) => 0,
1263 }
1264 }
1265
1266 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1267 &mut self,
1268 db: &DBMap<K, V>,
1269 purged_vals: impl IntoIterator<Item = J>,
1270 ) -> Result<(), TypedStoreError> {
1271 if !Arc::ptr_eq(&db.db, &self.database) {
1272 return Err(TypedStoreError::CrossDBBatch);
1273 }
1274
1275 purged_vals
1276 .into_iter()
1277 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1278 let k_buf = be_fix_int_ser(k.borrow());
1279 match (&mut self.batch, &db.column_family) {
1280 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1281 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1282 }
1283 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1284 b.delete_cf(name, k_buf)
1285 }
1286 #[cfg(tidehunter)]
1287 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1288 b.delete(*ks, transform_th_key(&k_buf, prefix))
1289 }
1290 _ => Err(TypedStoreError::RocksDBError(
1291 "typed store invariant violation".to_string(),
1292 ))?,
1293 }
1294 Ok(())
1295 })?;
1296 Ok(())
1297 }
1298
1299 pub fn schedule_delete_range<K: Serialize, V>(
1309 &mut self,
1310 db: &DBMap<K, V>,
1311 from: &K,
1312 to: &K,
1313 ) -> Result<(), TypedStoreError> {
1314 if !Arc::ptr_eq(&db.db, &self.database) {
1315 return Err(TypedStoreError::CrossDBBatch);
1316 }
1317
1318 let from_buf = be_fix_int_ser(from);
1319 let to_buf = be_fix_int_ser(to);
1320
1321 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1322 b.delete_range_cf(
1323 &rocks_cf_from_db(&self.database, db.cf_name())?,
1324 from_buf,
1325 to_buf,
1326 );
1327 }
1328 Ok(())
1329 }
1330
1331 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1333 &mut self,
1334 db: &DBMap<K, V>,
1335 new_vals: impl IntoIterator<Item = (J, U)>,
1336 ) -> Result<&mut Self, TypedStoreError> {
1337 if !Arc::ptr_eq(&db.db, &self.database) {
1338 return Err(TypedStoreError::CrossDBBatch);
1339 }
1340 let mut total = 0usize;
1341 new_vals
1342 .into_iter()
1343 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1344 let k_buf = be_fix_int_ser(k.borrow());
1345 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1346 total += k_buf.len() + v_buf.len();
1347 if db.opts.log_value_hash {
1348 let key_hash = default_hash(&k_buf);
1349 let value_hash = default_hash(&v_buf);
1350 debug!(
1351 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1352 db.cf_name(),
1353 key_hash,
1354 value_hash
1355 );
1356 }
1357 match (&mut self.batch, &db.column_family) {
1358 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1359 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1360 }
1361 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1362 b.put_cf(name, k_buf, v_buf)
1363 }
1364 #[cfg(tidehunter)]
1365 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1366 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1367 }
1368 _ => Err(TypedStoreError::RocksDBError(
1369 "typed store invariant violation".to_string(),
1370 ))?,
1371 }
1372 Ok(())
1373 })?;
1374 self.db_metrics
1375 .op_metrics
1376 .rocksdb_batch_put_bytes
1377 .with_label_values(&[&db.cf])
1378 .observe(total as f64);
1379 Ok(self)
1380 }
1381
1382 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1383 &mut self,
1384 db: &DBMap<K, V>,
1385 new_vals: impl IntoIterator<Item = (J, U)>,
1386 ) -> Result<&mut Self, TypedStoreError> {
1387 if !Arc::ptr_eq(&db.db, &self.database) {
1388 return Err(TypedStoreError::CrossDBBatch);
1389 }
1390 new_vals
1391 .into_iter()
1392 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1393 let k_buf = be_fix_int_ser(k.borrow());
1394 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1395 match &mut self.batch {
1396 StorageWriteBatch::Rocks(b) => b.merge_cf(
1397 &rocks_cf_from_db(&self.database, db.cf_name())?,
1398 k_buf,
1399 v_buf,
1400 ),
1401 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1402 }
1403 Ok(())
1404 })?;
1405 Ok(self)
1406 }
1407}
1408
1409impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1410where
1411 K: Serialize + DeserializeOwned,
1412 V: Serialize + DeserializeOwned,
1413{
1414 type Error = TypedStoreError;
1415
1416 #[instrument(level = "trace", skip_all, err)]
1417 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1418 let key_buf = be_fix_int_ser(key);
1419 let readopts = self.opts.readopts();
1420 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1421 && self
1422 .db
1423 .get(&self.column_family, &key_buf, &readopts)?
1424 .is_some())
1425 }
1426
1427 #[instrument(level = "trace", skip_all, err)]
1428 fn multi_contains_keys<J>(
1429 &self,
1430 keys: impl IntoIterator<Item = J>,
1431 ) -> Result<Vec<bool>, Self::Error>
1432 where
1433 J: Borrow<K>,
1434 {
1435 let values = self.multi_get_pinned(keys)?;
1436 Ok(values.into_iter().map(|v| v.is_some()).collect())
1437 }
1438
1439 #[instrument(level = "trace", skip_all, err)]
1440 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1441 let _timer = self
1442 .db_metrics
1443 .op_metrics
1444 .rocksdb_get_latency_seconds
1445 .with_label_values(&[&self.cf])
1446 .start_timer();
1447 let perf_ctx = if self.get_sample_interval.sample() {
1448 Some(RocksDBPerfContext)
1449 } else {
1450 None
1451 };
1452 let key_buf = be_fix_int_ser(key);
1453 let res = self
1454 .db
1455 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1456 self.db_metrics
1457 .op_metrics
1458 .rocksdb_get_bytes
1459 .with_label_values(&[&self.cf])
1460 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1461 if perf_ctx.is_some() {
1462 self.db_metrics
1463 .read_perf_ctx_metrics
1464 .report_metrics(&self.cf);
1465 }
1466 match res {
1467 Some(data) => {
1468 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1469 if value.is_err() {
1470 let key_hash = default_hash(&key_buf);
1471 let value_hash = default_hash(&data);
1472 debug_fatal!(
1473 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1474 self.cf_name(),
1475 key_hash,
1476 value_hash,
1477 value.as_ref().err().unwrap()
1478 );
1479 }
1480 Ok(Some(value?))
1481 }
1482 None => Ok(None),
1483 }
1484 }
1485
1486 #[instrument(level = "trace", skip_all, err)]
1487 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1488 let timer = self
1489 .db_metrics
1490 .op_metrics
1491 .rocksdb_put_latency_seconds
1492 .with_label_values(&[&self.cf])
1493 .start_timer();
1494 let perf_ctx = if self.write_sample_interval.sample() {
1495 Some(RocksDBPerfContext)
1496 } else {
1497 None
1498 };
1499 let key_buf = be_fix_int_ser(key);
1500 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1501 self.db_metrics
1502 .op_metrics
1503 .rocksdb_put_bytes
1504 .with_label_values(&[&self.cf])
1505 .observe((key_buf.len() + value_buf.len()) as f64);
1506 if perf_ctx.is_some() {
1507 self.db_metrics
1508 .write_perf_ctx_metrics
1509 .report_metrics(&self.cf);
1510 }
1511 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1512
1513 let elapsed = timer.stop_and_record();
1514 if elapsed > 1.0 {
1515 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1516 self.db_metrics
1517 .op_metrics
1518 .rocksdb_very_slow_puts_count
1519 .with_label_values(&[&self.cf])
1520 .inc();
1521 self.db_metrics
1522 .op_metrics
1523 .rocksdb_very_slow_puts_duration_ms
1524 .with_label_values(&[&self.cf])
1525 .inc_by((elapsed * 1000.0) as u64);
1526 }
1527
1528 Ok(())
1529 }
1530
1531 #[instrument(level = "trace", skip_all, err)]
1532 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1533 let _timer = self
1534 .db_metrics
1535 .op_metrics
1536 .rocksdb_delete_latency_seconds
1537 .with_label_values(&[&self.cf])
1538 .start_timer();
1539 let perf_ctx = if self.write_sample_interval.sample() {
1540 Some(RocksDBPerfContext)
1541 } else {
1542 None
1543 };
1544 let key_buf = be_fix_int_ser(key);
1545 self.db.delete_cf(&self.column_family, key_buf)?;
1546 self.db_metrics
1547 .op_metrics
1548 .rocksdb_deletes
1549 .with_label_values(&[&self.cf])
1550 .inc();
1551 if perf_ctx.is_some() {
1552 self.db_metrics
1553 .write_perf_ctx_metrics
1554 .report_metrics(&self.cf);
1555 }
1556 Ok(())
1557 }
1558
1559 #[instrument(level = "trace", skip_all, err)]
1568 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1569 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1570 let last_key = self
1571 .reversed_safe_iter_with_bounds(None, None)?
1572 .next()
1573 .transpose()?
1574 .map(|(k, _v)| k);
1575 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1576 let mut batch = self.batch();
1577 batch.schedule_delete_range(self, &first_key, &last_key)?;
1578 batch.write()?;
1579 }
1580 Ok(())
1581 }
1582
1583 fn is_empty(&self) -> bool {
1584 self.safe_iter().next().is_none()
1585 }
1586
1587 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1588 match &self.db.storage {
1589 Storage::Rocks(db) => {
1590 let db_iter = db
1591 .underlying
1592 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1593 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1594 Box::new(SafeIter::new(
1595 self.cf.clone(),
1596 db_iter,
1597 _timer,
1598 _perf_ctx,
1599 bytes_scanned,
1600 keys_scanned,
1601 Some(self.db_metrics.clone()),
1602 ))
1603 }
1604 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1605 #[cfg(tidehunter)]
1606 Storage::TideHunter(db) => match &self.column_family {
1607 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1608 db.iterator(*ks),
1609 prefix,
1610 self.start_iter_timer(),
1611 )),
1612 _ => unreachable!("storage backend invariant violation"),
1613 },
1614 }
1615 }
1616
1617 fn safe_iter_with_bounds(
1618 &'a self,
1619 lower_bound: Option<K>,
1620 upper_bound: Option<K>,
1621 ) -> DbIterator<'a, (K, V)> {
1622 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1623 match &self.db.storage {
1624 Storage::Rocks(db) => {
1625 let readopts =
1626 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1627 let db_iter = db
1628 .underlying
1629 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1630 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1631 Box::new(SafeIter::new(
1632 self.cf.clone(),
1633 db_iter,
1634 _timer,
1635 _perf_ctx,
1636 bytes_scanned,
1637 keys_scanned,
1638 Some(self.db_metrics.clone()),
1639 ))
1640 }
1641 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1642 #[cfg(tidehunter)]
1643 Storage::TideHunter(db) => match &self.column_family {
1644 ColumnFamily::TideHunter((ks, prefix)) => {
1645 let mut iter = db.iterator(*ks);
1646 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1647 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1648 }
1649 _ => unreachable!("storage backend invariant violation"),
1650 },
1651 }
1652 }
1653
1654 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1655 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1656 match &self.db.storage {
1657 Storage::Rocks(db) => {
1658 let readopts =
1659 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1660 let db_iter = db
1661 .underlying
1662 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1663 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1664 Box::new(SafeIter::new(
1665 self.cf.clone(),
1666 db_iter,
1667 _timer,
1668 _perf_ctx,
1669 bytes_scanned,
1670 keys_scanned,
1671 Some(self.db_metrics.clone()),
1672 ))
1673 }
1674 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1675 #[cfg(tidehunter)]
1676 Storage::TideHunter(db) => match &self.column_family {
1677 ColumnFamily::TideHunter((ks, prefix)) => {
1678 let mut iter = db.iterator(*ks);
1679 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1680 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1681 }
1682 _ => unreachable!("storage backend invariant violation"),
1683 },
1684 }
1685 }
1686
1687 #[instrument(level = "trace", skip_all, err)]
1689 fn multi_get<J>(
1690 &self,
1691 keys: impl IntoIterator<Item = J>,
1692 ) -> Result<Vec<Option<V>>, TypedStoreError>
1693 where
1694 J: Borrow<K>,
1695 {
1696 let results = self.multi_get_pinned(keys)?;
1697 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1698 .into_iter()
1699 .map(|value_byte| match value_byte {
1700 Some(data) => Ok(Some(
1701 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1702 )),
1703 None => Ok(None),
1704 })
1705 .collect();
1706
1707 values_parsed
1708 }
1709
1710 #[instrument(level = "trace", skip_all, err)]
1712 fn multi_insert<J, U>(
1713 &self,
1714 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1715 ) -> Result<(), Self::Error>
1716 where
1717 J: Borrow<K>,
1718 U: Borrow<V>,
1719 {
1720 let mut batch = self.batch();
1721 batch.insert_batch(self, key_val_pairs)?;
1722 batch.write()
1723 }
1724
1725 #[instrument(level = "trace", skip_all, err)]
1727 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1728 where
1729 J: Borrow<K>,
1730 {
1731 let mut batch = self.batch();
1732 batch.delete_batch(self, keys)?;
1733 batch.write()
1734 }
1735
1736 #[instrument(level = "trace", skip_all, err)]
1738 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1739 if let Storage::Rocks(rocks) = &self.db.storage {
1740 rocks
1741 .underlying
1742 .try_catch_up_with_primary()
1743 .map_err(typed_store_err_from_rocks_err)?;
1744 }
1745 Ok(())
1746 }
1747}
1748
1749#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1751pub fn open_cf_opts<P: AsRef<Path>>(
1752 path: P,
1753 db_options: Option<rocksdb::Options>,
1754 metric_conf: MetricConf,
1755 opt_cfs: &[(&str, rocksdb::Options)],
1756) -> Result<Arc<Database>, TypedStoreError> {
1757 let path = path.as_ref();
1758 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1767 nondeterministic!({
1768 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1769 options.create_if_missing(true);
1770 options.create_missing_column_families(true);
1771 let rocksdb = {
1772 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1773 &options,
1774 path,
1775 cfs.into_iter()
1776 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1777 )
1778 .map_err(typed_store_err_from_rocks_err)?
1779 };
1780 Ok(Arc::new(Database::new(
1781 Storage::Rocks(RocksDB {
1782 underlying: rocksdb,
1783 }),
1784 metric_conf,
1785 )))
1786 })
1787}
1788
1789pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1791 primary_path: P,
1792 secondary_path: Option<P>,
1793 db_options: Option<rocksdb::Options>,
1794 metric_conf: MetricConf,
1795 opt_cfs: &[(&str, rocksdb::Options)],
1796) -> Result<Arc<Database>, TypedStoreError> {
1797 let primary_path = primary_path.as_ref();
1798 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1799 nondeterministic!({
1801 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1803
1804 fdlimit::raise_fd_limit();
1805 options.set_max_open_files(-1);
1807
1808 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1809 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1810 .ok()
1811 .unwrap_or_default();
1812
1813 let default_db_options = default_db_options();
1814 for cf_key in cfs.iter() {
1816 if !opt_cfs.contains_key(&cf_key[..]) {
1817 opt_cfs.insert(cf_key, default_db_options.options.clone());
1818 }
1819 }
1820
1821 let primary_path = primary_path.to_path_buf();
1822 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1823 let mut s = primary_path.clone();
1824 s.pop();
1825 s.push("SECONDARY");
1826 s.as_path().to_path_buf()
1827 });
1828
1829 let rocksdb = {
1830 options.create_if_missing(true);
1831 options.create_missing_column_families(true);
1832 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
1833 &options,
1834 &primary_path,
1835 &secondary_path,
1836 opt_cfs
1837 .iter()
1838 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
1839 )
1840 .map_err(typed_store_err_from_rocks_err)?;
1841 db.try_catch_up_with_primary()
1842 .map_err(typed_store_err_from_rocks_err)?;
1843 db
1844 };
1845 Ok(Arc::new(Database::new(
1846 Storage::Rocks(RocksDB {
1847 underlying: rocksdb,
1848 }),
1849 metric_conf,
1850 )))
1851 })
1852}
1853
1854#[cfg(not(tidehunter))]
1856pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
1857 let mut backoff = backoff::ExponentialBackoff {
1858 max_elapsed_time: Some(timeout),
1859 ..Default::default()
1860 };
1861 loop {
1862 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
1863 Ok(()) => return Ok(()),
1864 Err(err) => match backoff.next_backoff() {
1865 Some(duration) => tokio::time::sleep(duration).await,
1866 None => return Err(err),
1867 },
1868 }
1869 }
1870}
1871
1872#[cfg(tidehunter)]
1873pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
1874 std::fs::remove_dir_all(path)
1875}
1876
1877fn populate_missing_cfs(
1878 input_cfs: &[(&str, rocksdb::Options)],
1879 path: &Path,
1880) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
1881 let mut cfs = vec![];
1882 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
1883 let existing_cfs =
1884 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
1885 .ok()
1886 .unwrap_or_default();
1887
1888 for cf_name in existing_cfs {
1889 if !input_cf_index.contains(&cf_name[..]) {
1890 cfs.push((cf_name, rocksdb::Options::default()));
1891 }
1892 }
1893 cfs.extend(
1894 input_cfs
1895 .iter()
1896 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
1897 );
1898 Ok(cfs)
1899}
1900
1901fn default_hash(value: &[u8]) -> Digest<32> {
1902 let mut hasher = fastcrypto::hash::Blake2b256::default();
1903 hasher.update(value);
1904 hasher.finalize()
1905}