1pub mod errors;
4mod options;
5mod rocks_util;
6pub(crate) mod safe_iter;
7
8use crate::memstore::{InMemoryBatch, InMemoryDB};
9use crate::rocks::errors::typed_store_err_from_bcs_err;
10use crate::rocks::errors::typed_store_err_from_rocks_err;
11pub use crate::rocks::options::{
12 DBMapTableConfigMap, DBOptions, ReadWriteOptions, default_db_options, read_size_from_env,
13};
14use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
15#[cfg(tidehunter)]
16use crate::tidehunter_util::{
17 apply_range_bounds, transform_th_iterator, transform_th_key, typed_store_error_from_th_error,
18};
19use crate::util::{
20 be_fix_int_ser, be_fix_int_ser_into, ensure_database_type, iterator_bounds,
21 iterator_bounds_with_range,
22};
23use crate::{DbIterator, StorageType, TypedStoreError};
24use crate::{
25 metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
26 traits::{Map, TableSummary},
27};
28use backoff::backoff::Backoff;
29use fastcrypto::hash::{Digest, HashFunction};
30use mysten_common::debug_fatal;
31use mysten_metrics::RegistryID;
32use prometheus::{Histogram, HistogramTimer};
33use rocksdb::properties::num_files_at_level;
34use rocksdb::{
35 AsColumnFamilyRef, ColumnFamilyDescriptor, Error, MultiThreaded, ReadOptions, WriteBatch,
36 properties,
37};
38use rocksdb::{DBPinnableSlice, LiveFile, checkpoint::Checkpoint};
39use serde::{Serialize, de::DeserializeOwned};
40use std::ops::{Bound, Deref};
41use std::{
42 borrow::Borrow,
43 marker::PhantomData,
44 ops::RangeBounds,
45 path::{Path, PathBuf},
46 sync::{Arc, OnceLock},
47 time::Duration,
48};
49use std::{collections::HashSet, ffi::CStr};
50use sui_macros::{fail_point, nondeterministic};
51#[cfg(tidehunter)]
52use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
53use tokio::sync::oneshot;
54use tracing::{debug, error, instrument, warn};
55
56const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
59 unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
60
61static WRITE_SYNC_ENABLED: OnceLock<bool> = OnceLock::new();
62
63fn write_sync_enabled() -> bool {
64 *WRITE_SYNC_ENABLED
65 .get_or_init(|| std::env::var("SUI_DB_SYNC_TO_DISK").is_ok_and(|v| v == "1" || v == "true"))
66}
67
68pub fn init_write_sync(enabled: Option<bool>) {
71 if let Some(value) = enabled {
72 let _ = WRITE_SYNC_ENABLED.set(value);
73 }
74}
75
76#[cfg(test)]
77mod tests;
78
79#[derive(Debug)]
80pub struct RocksDB {
81 pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
82}
83
84impl Drop for RocksDB {
85 fn drop(&mut self) {
86 self.underlying.cancel_all_background_work(true);
87 }
88}
89
90#[derive(Clone)]
91pub enum ColumnFamily {
92 Rocks(String),
93 InMemory(String),
94 #[cfg(tidehunter)]
95 TideHunter((KeySpace, Option<Vec<u8>>)),
96}
97
98impl std::fmt::Debug for ColumnFamily {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
102 ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
103 #[cfg(tidehunter)]
104 ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
105 }
106 }
107}
108
109impl ColumnFamily {
110 fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
111 match &self {
112 ColumnFamily::Rocks(name) => rocks_db
113 .underlying
114 .cf_handle(name)
115 .expect("Map-keying column family should have been checked at DB creation"),
116 _ => unreachable!("invariant is checked by the caller"),
117 }
118 }
119}
120
121pub enum Storage {
122 Rocks(RocksDB),
123 InMemory(InMemoryDB),
124 #[cfg(tidehunter)]
125 TideHunter(Arc<TideHunterDb>),
126}
127
128impl std::fmt::Debug for Storage {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
132 Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
133 #[cfg(tidehunter)]
134 Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
135 }
136 }
137}
138
139#[derive(Debug)]
140pub struct Database {
141 storage: Storage,
142 metric_conf: MetricConf,
143 registry_id: Option<RegistryID>,
144}
145
146impl Drop for Database {
147 fn drop(&mut self) {
148 let metrics = DBMetrics::get();
149 metrics.decrement_num_active_dbs(&self.metric_conf.db_name);
150 if let Some(registry_id) = self.registry_id {
151 metrics.registry_serivce.remove(registry_id);
152 }
153 }
154}
155
156enum GetResult<'a> {
157 Rocks(DBPinnableSlice<'a>),
158 InMemory(Vec<u8>),
159 #[cfg(tidehunter)]
160 TideHunter(tidehunter::minibytes::Bytes),
161}
162
163impl Deref for GetResult<'_> {
164 type Target = [u8];
165 fn deref(&self) -> &[u8] {
166 match self {
167 GetResult::Rocks(d) => d.deref(),
168 GetResult::InMemory(d) => d.deref(),
169 #[cfg(tidehunter)]
170 GetResult::TideHunter(d) => d.deref(),
171 }
172 }
173}
174
175impl Database {
176 pub fn new(storage: Storage, metric_conf: MetricConf, registry_id: Option<RegistryID>) -> Self {
177 DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
178 Self {
179 storage,
180 metric_conf,
181 registry_id,
182 }
183 }
184
185 pub fn flush(&self) -> Result<(), TypedStoreError> {
187 match &self.storage {
188 Storage::Rocks(rocks_db) => rocks_db.underlying.flush().map_err(|e| {
189 TypedStoreError::RocksDBError(format!("Failed to flush database: {}", e))
190 }),
191 Storage::InMemory(_) => {
192 Ok(())
194 }
195 #[cfg(tidehunter)]
196 Storage::TideHunter(_) => {
197 Ok(())
199 }
200 }
201 }
202
203 fn get<K: AsRef<[u8]>>(
204 &self,
205 cf: &ColumnFamily,
206 key: K,
207 readopts: &ReadOptions,
208 ) -> Result<Option<GetResult<'_>>, TypedStoreError> {
209 match (&self.storage, cf) {
210 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
211 .underlying
212 .get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
213 .map_err(typed_store_err_from_rocks_err)?
214 .map(GetResult::Rocks)),
215 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
216 Ok(db.get(cf_name, key).map(GetResult::InMemory))
217 }
218 #[cfg(tidehunter)]
219 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => Ok(db
220 .get(*ks, &transform_th_key(key.as_ref(), prefix))
221 .map_err(typed_store_error_from_th_error)?
222 .map(GetResult::TideHunter)),
223
224 _ => Err(TypedStoreError::RocksDBError(
225 "typed store invariant violation".to_string(),
226 )),
227 }
228 }
229
230 fn multi_get<I, K>(
231 &self,
232 cf: &ColumnFamily,
233 keys: I,
234 readopts: &ReadOptions,
235 ) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
236 where
237 I: IntoIterator<Item = K>,
238 K: AsRef<[u8]>,
239 {
240 match (&self.storage, cf) {
241 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
242 let keys_vec: Vec<K> = keys.into_iter().collect();
243 let res = db.underlying.batched_multi_get_cf_opt(
244 &cf.rocks_cf(db),
245 keys_vec.iter(),
246 false,
247 readopts,
248 );
249 res.into_iter()
250 .map(|r| {
251 r.map_err(typed_store_err_from_rocks_err)
252 .map(|item| item.map(GetResult::Rocks))
253 })
254 .collect()
255 }
256 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
257 .multi_get(cf_name, keys)
258 .into_iter()
259 .map(|r| Ok(r.map(GetResult::InMemory)))
260 .collect(),
261 #[cfg(tidehunter)]
262 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => {
263 let res = keys.into_iter().map(|k| {
264 db.get(*ks, &transform_th_key(k.as_ref(), prefix))
265 .map_err(typed_store_error_from_th_error)
266 });
267 res.into_iter()
268 .map(|r| r.map(|item| item.map(GetResult::TideHunter)))
269 .collect()
270 }
271 _ => unreachable!("typed store invariant violation"),
272 }
273 }
274
275 pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
276 match &self.storage {
277 Storage::Rocks(db) => db.underlying.drop_cf(name),
278 Storage::InMemory(db) => {
279 db.drop_cf(name);
280 Ok(())
281 }
282 #[cfg(tidehunter)]
283 Storage::TideHunter(_) => {
284 unimplemented!("TideHunter: deletion of column family on a fly not implemented")
285 }
286 }
287 }
288
289 pub fn delete_file_in_range<K: AsRef<[u8]>>(
290 &self,
291 cf: &impl AsColumnFamilyRef,
292 from: K,
293 to: K,
294 ) -> Result<(), rocksdb::Error> {
295 match &self.storage {
296 Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
297 _ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
298 }
299 }
300
301 fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
302 fail_point!("delete-cf-before");
303 let ret = match (&self.storage, cf) {
304 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
305 .underlying
306 .delete_cf(&cf.rocks_cf(db), key)
307 .map_err(typed_store_err_from_rocks_err),
308 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
309 db.delete(cf_name, key.as_ref());
310 Ok(())
311 }
312 #[cfg(tidehunter)]
313 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
314 .remove(*ks, transform_th_key(key.as_ref(), prefix))
315 .map_err(typed_store_error_from_th_error),
316 _ => Err(TypedStoreError::RocksDBError(
317 "typed store invariant violation".to_string(),
318 )),
319 };
320 fail_point!("delete-cf-after");
321 #[allow(clippy::let_and_return)]
322 ret
323 }
324
325 pub fn path_for_pruning(&self) -> &Path {
326 match &self.storage {
327 Storage::Rocks(rocks) => rocks.underlying.path(),
328 _ => unimplemented!("method is only supported for rocksdb backend"),
329 }
330 }
331
332 fn put_cf(
333 &self,
334 cf: &ColumnFamily,
335 key: Vec<u8>,
336 value: Vec<u8>,
337 ) -> Result<(), TypedStoreError> {
338 fail_point!("put-cf-before");
339 let ret = match (&self.storage, cf) {
340 (Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
341 .underlying
342 .put_cf(&cf.rocks_cf(db), key, value)
343 .map_err(typed_store_err_from_rocks_err),
344 (Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
345 db.put(cf_name, key, value);
346 Ok(())
347 }
348 #[cfg(tidehunter)]
349 (Storage::TideHunter(db), ColumnFamily::TideHunter((ks, prefix))) => db
350 .insert(*ks, transform_th_key(&key, prefix), value)
351 .map_err(typed_store_error_from_th_error),
352 _ => Err(TypedStoreError::RocksDBError(
353 "typed store invariant violation".to_string(),
354 )),
355 };
356 fail_point!("put-cf-after");
357 #[allow(clippy::let_and_return)]
358 ret
359 }
360
361 pub fn key_may_exist_cf<K: AsRef<[u8]>>(
362 &self,
363 cf_name: &str,
364 key: K,
365 readopts: &ReadOptions,
366 ) -> bool {
367 match &self.storage {
368 Storage::Rocks(rocks) => {
371 rocks
372 .underlying
373 .key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
374 }
375 _ => true,
376 }
377 }
378
379 pub(crate) fn write_opt_internal(
380 &self,
381 batch: StorageWriteBatch,
382 write_options: &rocksdb::WriteOptions,
383 ) -> Result<(), TypedStoreError> {
384 fail_point!("batch-write-before");
385 let ret = match (&self.storage, batch) {
386 (Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
387 .underlying
388 .write_opt(batch, write_options)
389 .map_err(typed_store_err_from_rocks_err),
390 (Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
391 db.write(batch);
393 Ok(())
394 }
395 #[cfg(tidehunter)]
396 (Storage::TideHunter(_db), StorageWriteBatch::TideHunter(batch)) => {
397 batch.commit().map_err(typed_store_error_from_th_error)
399 }
400 _ => Err(TypedStoreError::RocksDBError(
401 "using invalid batch type for the database".to_string(),
402 )),
403 };
404 fail_point!("batch-write-after");
405 #[allow(clippy::let_and_return)]
406 ret
407 }
408
409 #[cfg(tidehunter)]
410 pub fn start_relocation(&self) -> anyhow::Result<()> {
411 if let Storage::TideHunter(db) = &self.storage {
412 db.start_relocation()?;
413 }
414 Ok(())
415 }
416
417 #[cfg(tidehunter)]
418 pub fn drop_cells_in_range(
419 &self,
420 ks: KeySpace,
421 from_inclusive: &[u8],
422 to_inclusive: &[u8],
423 ) -> anyhow::Result<()> {
424 if let Storage::TideHunter(db) = &self.storage {
425 db.drop_cells_in_range(ks, from_inclusive, to_inclusive)
426 .map_err(|e| anyhow::anyhow!("{:?}", e))?;
427 } else {
428 panic!("drop_cells_in_range called on non-TideHunter storage");
429 }
430 Ok(())
431 }
432
433 pub fn compact_range_cf<K: AsRef<[u8]>>(
434 &self,
435 cf_name: &str,
436 start: Option<K>,
437 end: Option<K>,
438 ) {
439 if let Storage::Rocks(rocksdb) = &self.storage {
440 rocksdb
441 .underlying
442 .compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
443 }
444 }
445
446 pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
447 if let Storage::Rocks(rocks) = &self.storage {
449 let checkpoint =
450 Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
451 checkpoint
452 .create_checkpoint(path)
453 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
454 }
455 Ok(())
456 }
457
458 pub fn get_sampling_interval(&self) -> SamplingInterval {
459 self.metric_conf.read_sample_interval.new_from_self()
460 }
461
462 pub fn multiget_sampling_interval(&self) -> SamplingInterval {
463 self.metric_conf.read_sample_interval.new_from_self()
464 }
465
466 pub fn write_sampling_interval(&self) -> SamplingInterval {
467 self.metric_conf.write_sample_interval.new_from_self()
468 }
469
470 pub fn iter_sampling_interval(&self) -> SamplingInterval {
471 self.metric_conf.iter_sample_interval.new_from_self()
472 }
473
474 fn db_name(&self) -> String {
475 let name = &self.metric_conf.db_name;
476 if name.is_empty() {
477 "default".to_string()
478 } else {
479 name.clone()
480 }
481 }
482
483 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
484 match &self.storage {
485 Storage::Rocks(rocks) => rocks.underlying.live_files(),
486 _ => Ok(vec![]),
487 }
488 }
489}
490
491fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
492 rocks_db
493 .underlying
494 .cf_handle(cf_name)
495 .expect("Map-keying column family should have been checked at DB creation")
496}
497
498fn rocks_cf_from_db<'a>(
499 db: &'a Database,
500 cf_name: &str,
501) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
502 match &db.storage {
503 Storage::Rocks(rocksdb) => Ok(rocksdb
504 .underlying
505 .cf_handle(cf_name)
506 .expect("Map-keying column family should have been checked at DB creation")),
507 _ => Err(TypedStoreError::RocksDBError(
508 "using invalid batch type for the database".to_string(),
509 )),
510 }
511}
512
513#[derive(Debug, Default)]
514pub struct MetricConf {
515 pub db_name: String,
516 pub read_sample_interval: SamplingInterval,
517 pub write_sample_interval: SamplingInterval,
518 pub iter_sample_interval: SamplingInterval,
519}
520
521impl MetricConf {
522 pub fn new(db_name: &str) -> Self {
523 if db_name.is_empty() {
524 error!("A meaningful db name should be used for metrics reporting.")
525 }
526 Self {
527 db_name: db_name.to_string(),
528 read_sample_interval: SamplingInterval::default(),
529 write_sample_interval: SamplingInterval::default(),
530 iter_sample_interval: SamplingInterval::default(),
531 }
532 }
533
534 pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
535 Self {
536 db_name: self.db_name,
537 read_sample_interval: read_interval,
538 write_sample_interval: SamplingInterval::default(),
539 iter_sample_interval: SamplingInterval::default(),
540 }
541 }
542}
543const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
544const METRICS_ERROR: i64 = -1;
545
546#[derive(Clone, Debug)]
548pub struct DBMap<K, V> {
549 pub db: Arc<Database>,
550 _phantom: PhantomData<fn(K) -> V>,
551 column_family: ColumnFamily,
552 cf: String,
554 pub opts: ReadWriteOptions,
555 db_metrics: Arc<DBMetrics>,
556 get_sample_interval: SamplingInterval,
557 multiget_sample_interval: SamplingInterval,
558 write_sample_interval: SamplingInterval,
559 iter_sample_interval: SamplingInterval,
560 _metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
561}
562
563unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
564
565impl<K, V> DBMap<K, V> {
566 pub(crate) fn new(
567 db: Arc<Database>,
568 opts: &ReadWriteOptions,
569 opt_cf: &str,
570 column_family: ColumnFamily,
571 is_deprecated: bool,
572 ) -> Self {
573 let db_cloned = Arc::downgrade(&db.clone());
574 let db_metrics = DBMetrics::get();
575 let db_metrics_cloned = db_metrics.clone();
576 let cf = opt_cf.to_string();
577
578 let (sender, mut recv) = tokio::sync::oneshot::channel();
579 if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
580 tokio::task::spawn(async move {
581 let mut interval =
582 tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
583 loop {
584 tokio::select! {
585 _ = interval.tick() => {
586 if let Some(db) = db_cloned.upgrade() {
587 let cf = cf.clone();
588 let db_metrics = db_metrics.clone();
589 if let Err(e) = tokio::task::spawn_blocking(move || {
590 Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
591 }).await {
592 error!("Failed to log metrics with error: {}", e);
593 }
594 }
595 }
596 _ = &mut recv => break,
597 }
598 }
599 debug!("Returning the cf metric logging task for DBMap: {}", &cf);
600 });
601 }
602 DBMap {
603 db: db.clone(),
604 opts: opts.clone(),
605 _phantom: PhantomData,
606 column_family,
607 cf: opt_cf.to_string(),
608 db_metrics: db_metrics_cloned,
609 _metrics_task_cancel_handle: Arc::new(sender),
610 get_sample_interval: db.get_sampling_interval(),
611 multiget_sample_interval: db.multiget_sampling_interval(),
612 write_sample_interval: db.write_sampling_interval(),
613 iter_sample_interval: db.iter_sampling_interval(),
614 }
615 }
616
617 #[instrument(level = "debug", skip(db), err)]
620 pub fn reopen(
621 db: &Arc<Database>,
622 opt_cf: Option<&str>,
623 rw_options: &ReadWriteOptions,
624 is_deprecated: bool,
625 ) -> Result<Self, TypedStoreError> {
626 let cf_key = opt_cf
627 .unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
628 .to_owned();
629 Ok(DBMap::new(
630 db.clone(),
631 rw_options,
632 &cf_key,
633 ColumnFamily::Rocks(cf_key.to_string()),
634 is_deprecated,
635 ))
636 }
637
638 #[cfg(tidehunter)]
639 pub fn reopen_th(
640 db: Arc<Database>,
641 cf_name: &str,
642 ks: KeySpace,
643 prefix: Option<Vec<u8>>,
644 ) -> Self {
645 DBMap::new(
646 db,
647 &ReadWriteOptions::default(),
648 cf_name,
649 ColumnFamily::TideHunter((ks, prefix.clone())),
650 false,
651 )
652 }
653
654 pub fn cf_name(&self) -> &str {
655 &self.cf
656 }
657
658 pub fn batch(&self) -> DBBatch {
659 let batch = match &self.db.storage {
660 Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
661 Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
662 #[cfg(tidehunter)]
663 Storage::TideHunter(db) => StorageWriteBatch::TideHunter(db.write_batch()),
664 };
665 DBBatch::new(
666 &self.db,
667 batch,
668 &self.db_metrics,
669 &self.write_sample_interval,
670 )
671 }
672
673 pub fn flush(&self) -> Result<(), TypedStoreError> {
674 self.db.flush()
675 }
676
677 pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
678 let from_buf = be_fix_int_ser(start);
679 let to_buf = be_fix_int_ser(end);
680 self.db
681 .compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
682 Ok(())
683 }
684
685 pub fn compact_range_raw(
686 &self,
687 cf_name: &str,
688 start: Vec<u8>,
689 end: Vec<u8>,
690 ) -> Result<(), TypedStoreError> {
691 self.db.compact_range_cf(cf_name, Some(start), Some(end));
692 Ok(())
693 }
694
695 #[cfg(tidehunter)]
696 pub fn drop_cells_in_range<J: Serialize>(
697 &self,
698 from_inclusive: &J,
699 to_inclusive: &J,
700 ) -> Result<(), TypedStoreError>
701 where
702 K: Serialize,
703 {
704 let from_buf = be_fix_int_ser(from_inclusive);
705 let to_buf = be_fix_int_ser(to_inclusive);
706 if let ColumnFamily::TideHunter((ks, _)) = &self.column_family {
707 self.db
708 .drop_cells_in_range(*ks, &from_buf, &to_buf)
709 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
710 }
711 Ok(())
712 }
713
714 fn multi_get_pinned<J>(
716 &self,
717 keys: impl IntoIterator<Item = J>,
718 ) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
719 where
720 J: Borrow<K>,
721 K: Serialize,
722 {
723 let _timer = self
724 .db_metrics
725 .op_metrics
726 .rocksdb_multiget_latency_seconds
727 .with_label_values(&[&self.cf])
728 .start_timer();
729 let perf_ctx = if self.multiget_sample_interval.sample() {
730 Some(RocksDBPerfContext)
731 } else {
732 None
733 };
734 let keys_bytes = keys.into_iter().map(|k| be_fix_int_ser(k.borrow()));
735 let results: Result<Vec<_>, TypedStoreError> = self
736 .db
737 .multi_get(&self.column_family, keys_bytes, &self.opts.readopts())
738 .into_iter()
739 .collect();
740 let entries = results?;
741 let entry_size = entries
742 .iter()
743 .flatten()
744 .map(|entry| entry.len())
745 .sum::<usize>();
746 self.db_metrics
747 .op_metrics
748 .rocksdb_multiget_bytes
749 .with_label_values(&[&self.cf])
750 .observe(entry_size as f64);
751 if perf_ctx.is_some() {
752 self.db_metrics
753 .read_perf_ctx_metrics
754 .report_metrics(&self.cf);
755 }
756 Ok(entries)
757 }
758
759 fn get_rocksdb_int_property(
760 rocksdb: &RocksDB,
761 cf: &impl AsColumnFamilyRef,
762 property_name: &std::ffi::CStr,
763 ) -> Result<i64, TypedStoreError> {
764 match rocksdb.underlying.property_int_value_cf(cf, property_name) {
765 Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
766 Ok(None) => Ok(0),
767 Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
768 }
769 }
770
771 fn report_rocksdb_metrics(
772 database: &Arc<Database>,
773 cf_name: &str,
774 db_metrics: &Arc<DBMetrics>,
775 ) {
776 let Storage::Rocks(rocksdb) = &database.storage else {
777 return;
778 };
779
780 let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
781 tracing::warn!(
782 "unable to report metrics for cf {cf_name:?} in db {:?}",
783 database.db_name()
784 );
785 return;
786 };
787
788 db_metrics
789 .cf_metrics
790 .rocksdb_total_sst_files_size
791 .with_label_values(&[cf_name])
792 .set(
793 Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
794 .unwrap_or(METRICS_ERROR),
795 );
796 db_metrics
797 .cf_metrics
798 .rocksdb_total_blob_files_size
799 .with_label_values(&[cf_name])
800 .set(
801 Self::get_rocksdb_int_property(
802 rocksdb,
803 &cf,
804 ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
805 )
806 .unwrap_or(METRICS_ERROR),
807 );
808 let total_num_files: i64 = (0..=6)
811 .map(|level| {
812 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
813 .unwrap_or(METRICS_ERROR)
814 })
815 .sum();
816 db_metrics
817 .cf_metrics
818 .rocksdb_total_num_files
819 .with_label_values(&[cf_name])
820 .set(total_num_files);
821 db_metrics
822 .cf_metrics
823 .rocksdb_num_level0_files
824 .with_label_values(&[cf_name])
825 .set(
826 Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
827 .unwrap_or(METRICS_ERROR),
828 );
829 db_metrics
830 .cf_metrics
831 .rocksdb_current_size_active_mem_tables
832 .with_label_values(&[cf_name])
833 .set(
834 Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
835 .unwrap_or(METRICS_ERROR),
836 );
837 db_metrics
838 .cf_metrics
839 .rocksdb_size_all_mem_tables
840 .with_label_values(&[cf_name])
841 .set(
842 Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
843 .unwrap_or(METRICS_ERROR),
844 );
845 db_metrics
846 .cf_metrics
847 .rocksdb_num_snapshots
848 .with_label_values(&[cf_name])
849 .set(
850 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
851 .unwrap_or(METRICS_ERROR),
852 );
853 db_metrics
854 .cf_metrics
855 .rocksdb_oldest_snapshot_time
856 .with_label_values(&[cf_name])
857 .set(
858 Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
859 .unwrap_or(METRICS_ERROR),
860 );
861 db_metrics
862 .cf_metrics
863 .rocksdb_actual_delayed_write_rate
864 .with_label_values(&[cf_name])
865 .set(
866 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
867 .unwrap_or(METRICS_ERROR),
868 );
869 db_metrics
870 .cf_metrics
871 .rocksdb_is_write_stopped
872 .with_label_values(&[cf_name])
873 .set(
874 Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
875 .unwrap_or(METRICS_ERROR),
876 );
877 db_metrics
878 .cf_metrics
879 .rocksdb_block_cache_capacity
880 .with_label_values(&[cf_name])
881 .set(
882 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
883 .unwrap_or(METRICS_ERROR),
884 );
885 db_metrics
886 .cf_metrics
887 .rocksdb_block_cache_usage
888 .with_label_values(&[cf_name])
889 .set(
890 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
891 .unwrap_or(METRICS_ERROR),
892 );
893 db_metrics
894 .cf_metrics
895 .rocksdb_block_cache_pinned_usage
896 .with_label_values(&[cf_name])
897 .set(
898 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
899 .unwrap_or(METRICS_ERROR),
900 );
901 db_metrics
902 .cf_metrics
903 .rocksdb_estimate_table_readers_mem
904 .with_label_values(&[cf_name])
905 .set(
906 Self::get_rocksdb_int_property(
907 rocksdb,
908 &cf,
909 properties::ESTIMATE_TABLE_READERS_MEM,
910 )
911 .unwrap_or(METRICS_ERROR),
912 );
913 db_metrics
914 .cf_metrics
915 .rocksdb_estimated_num_keys
916 .with_label_values(&[cf_name])
917 .set(
918 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
919 .unwrap_or(METRICS_ERROR),
920 );
921 db_metrics
922 .cf_metrics
923 .rocksdb_num_immutable_mem_tables
924 .with_label_values(&[cf_name])
925 .set(
926 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
927 .unwrap_or(METRICS_ERROR),
928 );
929 db_metrics
930 .cf_metrics
931 .rocksdb_mem_table_flush_pending
932 .with_label_values(&[cf_name])
933 .set(
934 Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
935 .unwrap_or(METRICS_ERROR),
936 );
937 db_metrics
938 .cf_metrics
939 .rocksdb_compaction_pending
940 .with_label_values(&[cf_name])
941 .set(
942 Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
943 .unwrap_or(METRICS_ERROR),
944 );
945 db_metrics
946 .cf_metrics
947 .rocksdb_estimate_pending_compaction_bytes
948 .with_label_values(&[cf_name])
949 .set(
950 Self::get_rocksdb_int_property(
951 rocksdb,
952 &cf,
953 properties::ESTIMATE_PENDING_COMPACTION_BYTES,
954 )
955 .unwrap_or(METRICS_ERROR),
956 );
957 db_metrics
958 .cf_metrics
959 .rocksdb_num_running_compactions
960 .with_label_values(&[cf_name])
961 .set(
962 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
963 .unwrap_or(METRICS_ERROR),
964 );
965 db_metrics
966 .cf_metrics
967 .rocksdb_num_running_flushes
968 .with_label_values(&[cf_name])
969 .set(
970 Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
971 .unwrap_or(METRICS_ERROR),
972 );
973 db_metrics
974 .cf_metrics
975 .rocksdb_estimate_oldest_key_time
976 .with_label_values(&[cf_name])
977 .set(
978 Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
979 .unwrap_or(METRICS_ERROR),
980 );
981 db_metrics
982 .cf_metrics
983 .rocksdb_background_errors
984 .with_label_values(&[cf_name])
985 .set(
986 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
987 .unwrap_or(METRICS_ERROR),
988 );
989 db_metrics
990 .cf_metrics
991 .rocksdb_base_level
992 .with_label_values(&[cf_name])
993 .set(
994 Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
995 .unwrap_or(METRICS_ERROR),
996 );
997 }
998
999 pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
1000 self.db.checkpoint(path)
1001 }
1002
1003 pub fn table_summary(&self) -> eyre::Result<TableSummary>
1004 where
1005 K: Serialize + DeserializeOwned,
1006 V: Serialize + DeserializeOwned,
1007 {
1008 let mut num_keys = 0;
1009 let mut key_bytes_total = 0;
1010 let mut value_bytes_total = 0;
1011 let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1012 let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
1013 for item in self.safe_iter() {
1014 let (key, value) = item?;
1015 num_keys += 1;
1016 let key_len = be_fix_int_ser(key.borrow()).len();
1017 let value_len = bcs::to_bytes(value.borrow())?.len();
1018 key_bytes_total += key_len;
1019 value_bytes_total += value_len;
1020 key_hist.record(key_len as u64)?;
1021 value_hist.record(value_len as u64)?;
1022 }
1023 Ok(TableSummary {
1024 num_keys,
1025 key_bytes_total,
1026 value_bytes_total,
1027 key_hist,
1028 value_hist,
1029 })
1030 }
1031
1032 fn start_iter_timer(&self) -> HistogramTimer {
1033 self.db_metrics
1034 .op_metrics
1035 .rocksdb_iter_latency_seconds
1036 .with_label_values(&[&self.cf])
1037 .start_timer()
1038 }
1039
1040 fn create_iter_context(
1042 &self,
1043 ) -> (
1044 Option<HistogramTimer>,
1045 Option<Histogram>,
1046 Option<Histogram>,
1047 Option<RocksDBPerfContext>,
1048 ) {
1049 let timer = self.start_iter_timer();
1050 let bytes_scanned = self
1051 .db_metrics
1052 .op_metrics
1053 .rocksdb_iter_bytes
1054 .with_label_values(&[&self.cf]);
1055 let keys_scanned = self
1056 .db_metrics
1057 .op_metrics
1058 .rocksdb_iter_keys
1059 .with_label_values(&[&self.cf]);
1060 let perf_ctx = if self.iter_sample_interval.sample() {
1061 Some(RocksDBPerfContext)
1062 } else {
1063 None
1064 };
1065 (
1066 Some(timer),
1067 Some(bytes_scanned),
1068 Some(keys_scanned),
1069 perf_ctx,
1070 )
1071 }
1072
1073 #[allow(clippy::complexity)]
1076 pub fn reversed_safe_iter_with_bounds(
1077 &self,
1078 lower_bound: Option<K>,
1079 upper_bound: Option<K>,
1080 ) -> Result<DbIterator<'_, (K, V)>, TypedStoreError>
1081 where
1082 K: Serialize + DeserializeOwned,
1083 V: Serialize + DeserializeOwned,
1084 {
1085 let (it_lower_bound, it_upper_bound) = iterator_bounds_with_range::<K>((
1086 lower_bound
1087 .as_ref()
1088 .map(Bound::Included)
1089 .unwrap_or(Bound::Unbounded),
1090 upper_bound
1091 .as_ref()
1092 .map(Bound::Included)
1093 .unwrap_or(Bound::Unbounded),
1094 ));
1095 match &self.db.storage {
1096 Storage::Rocks(db) => {
1097 let readopts = rocks_util::apply_range_bounds(
1098 self.opts.readopts(),
1099 it_lower_bound,
1100 it_upper_bound,
1101 );
1102 let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
1103 let db_iter = db
1104 .underlying
1105 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1106 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1107 let iter = SafeIter::new(
1108 self.cf.clone(),
1109 db_iter,
1110 _timer,
1111 _perf_ctx,
1112 bytes_scanned,
1113 keys_scanned,
1114 Some(self.db_metrics.clone()),
1115 );
1116 Ok(Box::new(SafeRevIter::new(iter, upper_bound_key)))
1117 }
1118 Storage::InMemory(db) => {
1119 Ok(db.iterator(&self.cf, it_lower_bound, it_upper_bound, true))
1120 }
1121 #[cfg(tidehunter)]
1122 Storage::TideHunter(db) => match &self.column_family {
1123 ColumnFamily::TideHunter((ks, prefix)) => {
1124 let mut iter = db.iterator(*ks);
1125 apply_range_bounds(&mut iter, it_lower_bound, it_upper_bound);
1126 iter.reverse();
1127 Ok(Box::new(transform_th_iterator(
1128 iter,
1129 prefix,
1130 self.start_iter_timer(),
1131 )))
1132 }
1133 _ => unreachable!("storage backend invariant violation"),
1134 },
1135 }
1136 }
1137}
1138
1139pub enum StorageWriteBatch {
1140 Rocks(rocksdb::WriteBatch),
1141 InMemory(InMemoryBatch),
1142 #[cfg(tidehunter)]
1143 TideHunter(tidehunter::batch::WriteBatch),
1144}
1145
1146struct EntryHeader {
1150 offset: usize,
1152 cf_name_len: usize,
1153 key_len: usize,
1154 is_put: bool,
1155}
1156
1157#[derive(Default)]
1163pub struct StagedBatch {
1164 data: Vec<u8>,
1165 entries: Vec<EntryHeader>,
1166}
1167
1168impl StagedBatch {
1169 pub fn new() -> Self {
1170 Self {
1171 data: Vec::with_capacity(1024),
1172 entries: Vec::with_capacity(16),
1173 }
1174 }
1175
1176 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1177 &mut self,
1178 db: &DBMap<K, V>,
1179 new_vals: impl IntoIterator<Item = (J, U)>,
1180 ) -> Result<&mut Self, TypedStoreError> {
1181 let cf_name = db.cf_name();
1182 new_vals
1183 .into_iter()
1184 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1185 let offset = self.data.len();
1186 self.data.extend_from_slice(cf_name.as_bytes());
1187 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1188 bcs::serialize_into(&mut self.data, v.borrow())
1189 .map_err(typed_store_err_from_bcs_err)?;
1190 self.entries.push(EntryHeader {
1191 offset,
1192 cf_name_len: cf_name.len(),
1193 key_len,
1194 is_put: true,
1195 });
1196 Ok(())
1197 })?;
1198 Ok(self)
1199 }
1200
1201 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1202 &mut self,
1203 db: &DBMap<K, V>,
1204 purged_vals: impl IntoIterator<Item = J>,
1205 ) -> Result<(), TypedStoreError> {
1206 let cf_name = db.cf_name();
1207 purged_vals
1208 .into_iter()
1209 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1210 let offset = self.data.len();
1211 self.data.extend_from_slice(cf_name.as_bytes());
1212 let key_len = be_fix_int_ser_into(&mut self.data, k.borrow());
1213 self.entries.push(EntryHeader {
1214 offset,
1215 cf_name_len: cf_name.len(),
1216 key_len,
1217 is_put: false,
1218 });
1219 Ok(())
1220 })?;
1221 Ok(())
1222 }
1223
1224 pub fn size_in_bytes(&self) -> usize {
1225 self.data.len()
1226 }
1227}
1228
1229pub struct DBBatch {
1281 database: Arc<Database>,
1282 batch: StorageWriteBatch,
1283 db_metrics: Arc<DBMetrics>,
1284 write_sample_interval: SamplingInterval,
1285}
1286
1287impl DBBatch {
1288 pub fn new(
1292 dbref: &Arc<Database>,
1293 batch: StorageWriteBatch,
1294 db_metrics: &Arc<DBMetrics>,
1295 write_sample_interval: &SamplingInterval,
1296 ) -> Self {
1297 DBBatch {
1298 database: dbref.clone(),
1299 batch,
1300 db_metrics: db_metrics.clone(),
1301 write_sample_interval: write_sample_interval.clone(),
1302 }
1303 }
1304
1305 #[instrument(level = "trace", skip_all, err)]
1307 pub fn write(self) -> Result<(), TypedStoreError> {
1308 let mut write_options = rocksdb::WriteOptions::default();
1309
1310 if write_sync_enabled() {
1311 write_options.set_sync(true);
1312 }
1313
1314 self.write_opt(write_options)
1315 }
1316
1317 #[instrument(level = "trace", skip_all, err)]
1319 pub fn write_opt(self, write_options: rocksdb::WriteOptions) -> Result<(), TypedStoreError> {
1320 let db_name = self.database.db_name();
1321 let timer = self
1322 .db_metrics
1323 .op_metrics
1324 .rocksdb_batch_commit_latency_seconds
1325 .with_label_values(&[&db_name])
1326 .start_timer();
1327 let batch_size = self.size_in_bytes();
1328
1329 let perf_ctx = if self.write_sample_interval.sample() {
1330 Some(RocksDBPerfContext)
1331 } else {
1332 None
1333 };
1334
1335 self.database
1336 .write_opt_internal(self.batch, &write_options)?;
1337
1338 self.db_metrics
1339 .op_metrics
1340 .rocksdb_batch_commit_bytes
1341 .with_label_values(&[&db_name])
1342 .observe(batch_size as f64);
1343
1344 if perf_ctx.is_some() {
1345 self.db_metrics
1346 .write_perf_ctx_metrics
1347 .report_metrics(&db_name);
1348 }
1349 let elapsed = timer.stop_and_record();
1350 if elapsed > 1.0 {
1351 warn!(?elapsed, ?db_name, "very slow batch write");
1352 self.db_metrics
1353 .op_metrics
1354 .rocksdb_very_slow_batch_writes_count
1355 .with_label_values(&[&db_name])
1356 .inc();
1357 self.db_metrics
1358 .op_metrics
1359 .rocksdb_very_slow_batch_writes_duration_ms
1360 .with_label_values(&[&db_name])
1361 .inc_by((elapsed * 1000.0) as u64);
1362 }
1363 Ok(())
1364 }
1365
1366 pub fn size_in_bytes(&self) -> usize {
1367 match self.batch {
1368 StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
1369 StorageWriteBatch::InMemory(_) => 0,
1370 #[cfg(tidehunter)]
1372 StorageWriteBatch::TideHunter(_) => 0,
1373 }
1374 }
1375
1376 pub fn concat(&mut self, raw_batches: Vec<StagedBatch>) -> Result<&mut Self, TypedStoreError> {
1378 for raw_batch in raw_batches {
1379 let data = &raw_batch.data;
1380 for (i, hdr) in raw_batch.entries.iter().enumerate() {
1381 let end = raw_batch
1382 .entries
1383 .get(i + 1)
1384 .map_or(data.len(), |next| next.offset);
1385 let cf_bytes = &data[hdr.offset..hdr.offset + hdr.cf_name_len];
1386 let key_start = hdr.offset + hdr.cf_name_len;
1387 let key = &data[key_start..key_start + hdr.key_len];
1388 let cf_name = std::str::from_utf8(cf_bytes)
1390 .map_err(|e| TypedStoreError::SerializationError(e.to_string()))?;
1391
1392 if hdr.is_put {
1393 let value = &data[key_start + hdr.key_len..end];
1394 match &mut self.batch {
1395 StorageWriteBatch::Rocks(b) => {
1396 b.put_cf(&rocks_cf_from_db(&self.database, cf_name)?, key, value);
1397 }
1398 StorageWriteBatch::InMemory(b) => {
1399 b.put_cf(cf_name, key, value);
1400 }
1401 #[cfg(tidehunter)]
1402 _ => {
1403 return Err(TypedStoreError::RocksDBError(
1404 "concat not supported for TideHunter".to_string(),
1405 ));
1406 }
1407 }
1408 } else {
1409 match &mut self.batch {
1410 StorageWriteBatch::Rocks(b) => {
1411 b.delete_cf(&rocks_cf_from_db(&self.database, cf_name)?, key);
1412 }
1413 StorageWriteBatch::InMemory(b) => {
1414 b.delete_cf(cf_name, key);
1415 }
1416 #[cfg(tidehunter)]
1417 _ => {
1418 return Err(TypedStoreError::RocksDBError(
1419 "concat not supported for TideHunter".to_string(),
1420 ));
1421 }
1422 }
1423 }
1424 }
1425 }
1426 Ok(self)
1427 }
1428
1429 pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
1430 &mut self,
1431 db: &DBMap<K, V>,
1432 purged_vals: impl IntoIterator<Item = J>,
1433 ) -> Result<(), TypedStoreError> {
1434 if !Arc::ptr_eq(&db.db, &self.database) {
1435 return Err(TypedStoreError::CrossDBBatch);
1436 }
1437
1438 purged_vals
1439 .into_iter()
1440 .try_for_each::<_, Result<_, TypedStoreError>>(|k| {
1441 let k_buf = be_fix_int_ser(k.borrow());
1442 match (&mut self.batch, &db.column_family) {
1443 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1444 b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
1445 }
1446 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1447 b.delete_cf(name, k_buf)
1448 }
1449 #[cfg(tidehunter)]
1450 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1451 b.delete(*ks, transform_th_key(&k_buf, prefix))
1452 }
1453 _ => Err(TypedStoreError::RocksDBError(
1454 "typed store invariant violation".to_string(),
1455 ))?,
1456 }
1457 Ok(())
1458 })?;
1459 Ok(())
1460 }
1461
1462 pub fn schedule_delete_range<K: Serialize, V>(
1472 &mut self,
1473 db: &DBMap<K, V>,
1474 from: &K,
1475 to: &K,
1476 ) -> Result<(), TypedStoreError> {
1477 if !Arc::ptr_eq(&db.db, &self.database) {
1478 return Err(TypedStoreError::CrossDBBatch);
1479 }
1480
1481 let from_buf = be_fix_int_ser(from);
1482 let to_buf = be_fix_int_ser(to);
1483
1484 if let StorageWriteBatch::Rocks(b) = &mut self.batch {
1485 b.delete_range_cf(
1486 &rocks_cf_from_db(&self.database, db.cf_name())?,
1487 from_buf,
1488 to_buf,
1489 );
1490 }
1491 Ok(())
1492 }
1493
1494 pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1496 &mut self,
1497 db: &DBMap<K, V>,
1498 new_vals: impl IntoIterator<Item = (J, U)>,
1499 ) -> Result<&mut Self, TypedStoreError> {
1500 if !Arc::ptr_eq(&db.db, &self.database) {
1501 return Err(TypedStoreError::CrossDBBatch);
1502 }
1503 let mut total = 0usize;
1504 new_vals
1505 .into_iter()
1506 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1507 let k_buf = be_fix_int_ser(k.borrow());
1508 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1509 total += k_buf.len() + v_buf.len();
1510 if db.opts.log_value_hash {
1511 let key_hash = default_hash(&k_buf);
1512 let value_hash = default_hash(&v_buf);
1513 debug!(
1514 "Insert to DB table: {:?}, key_hash: {:?}, value_hash: {:?}",
1515 db.cf_name(),
1516 key_hash,
1517 value_hash
1518 );
1519 }
1520 match (&mut self.batch, &db.column_family) {
1521 (StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
1522 b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
1523 }
1524 (StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
1525 b.put_cf(name, k_buf, v_buf)
1526 }
1527 #[cfg(tidehunter)]
1528 (StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter((ks, prefix))) => {
1529 b.write(*ks, transform_th_key(&k_buf, prefix), v_buf.to_vec())
1530 }
1531 _ => Err(TypedStoreError::RocksDBError(
1532 "typed store invariant violation".to_string(),
1533 ))?,
1534 }
1535 Ok(())
1536 })?;
1537 self.db_metrics
1538 .op_metrics
1539 .rocksdb_batch_put_bytes
1540 .with_label_values(&[&db.cf])
1541 .observe(total as f64);
1542 Ok(self)
1543 }
1544
1545 pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
1546 &mut self,
1547 db: &DBMap<K, V>,
1548 new_vals: impl IntoIterator<Item = (J, U)>,
1549 ) -> Result<&mut Self, TypedStoreError> {
1550 if !Arc::ptr_eq(&db.db, &self.database) {
1551 return Err(TypedStoreError::CrossDBBatch);
1552 }
1553 new_vals
1554 .into_iter()
1555 .try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
1556 let k_buf = be_fix_int_ser(k.borrow());
1557 let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
1558 match &mut self.batch {
1559 StorageWriteBatch::Rocks(b) => b.merge_cf(
1560 &rocks_cf_from_db(&self.database, db.cf_name())?,
1561 k_buf,
1562 v_buf,
1563 ),
1564 _ => unimplemented!("merge operator is only implemented for RocksDB"),
1565 }
1566 Ok(())
1567 })?;
1568 Ok(self)
1569 }
1570}
1571
1572impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
1573where
1574 K: Serialize + DeserializeOwned,
1575 V: Serialize + DeserializeOwned,
1576{
1577 type Error = TypedStoreError;
1578
1579 #[instrument(level = "trace", skip_all, err)]
1580 fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
1581 let key_buf = be_fix_int_ser(key);
1582 let readopts = self.opts.readopts();
1583 Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
1584 && self
1585 .db
1586 .get(&self.column_family, &key_buf, &readopts)?
1587 .is_some())
1588 }
1589
1590 #[instrument(level = "trace", skip_all, err)]
1591 fn multi_contains_keys<J>(
1592 &self,
1593 keys: impl IntoIterator<Item = J>,
1594 ) -> Result<Vec<bool>, Self::Error>
1595 where
1596 J: Borrow<K>,
1597 {
1598 let values = self.multi_get_pinned(keys)?;
1599 Ok(values.into_iter().map(|v| v.is_some()).collect())
1600 }
1601
1602 #[instrument(level = "trace", skip_all, err)]
1603 fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
1604 let _timer = self
1605 .db_metrics
1606 .op_metrics
1607 .rocksdb_get_latency_seconds
1608 .with_label_values(&[&self.cf])
1609 .start_timer();
1610 let perf_ctx = if self.get_sample_interval.sample() {
1611 Some(RocksDBPerfContext)
1612 } else {
1613 None
1614 };
1615 let key_buf = be_fix_int_ser(key);
1616 let res = self
1617 .db
1618 .get(&self.column_family, &key_buf, &self.opts.readopts())?;
1619 self.db_metrics
1620 .op_metrics
1621 .rocksdb_get_bytes
1622 .with_label_values(&[&self.cf])
1623 .observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
1624 if perf_ctx.is_some() {
1625 self.db_metrics
1626 .read_perf_ctx_metrics
1627 .report_metrics(&self.cf);
1628 }
1629 match res {
1630 Some(data) => {
1631 let value = bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err);
1632 if value.is_err() {
1633 let key_hash = default_hash(&key_buf);
1634 let value_hash = default_hash(&data);
1635 debug_fatal!(
1636 "Failed to deserialize value from DB table {:?}, key_hash: {:?}, value_hash: {:?}, error: {:?}",
1637 self.cf_name(),
1638 key_hash,
1639 value_hash,
1640 value.as_ref().err().unwrap()
1641 );
1642 }
1643 Ok(Some(value?))
1644 }
1645 None => Ok(None),
1646 }
1647 }
1648
1649 #[instrument(level = "trace", skip_all, err)]
1650 fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
1651 let timer = self
1652 .db_metrics
1653 .op_metrics
1654 .rocksdb_put_latency_seconds
1655 .with_label_values(&[&self.cf])
1656 .start_timer();
1657 let perf_ctx = if self.write_sample_interval.sample() {
1658 Some(RocksDBPerfContext)
1659 } else {
1660 None
1661 };
1662 let key_buf = be_fix_int_ser(key);
1663 let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
1664 self.db_metrics
1665 .op_metrics
1666 .rocksdb_put_bytes
1667 .with_label_values(&[&self.cf])
1668 .observe((key_buf.len() + value_buf.len()) as f64);
1669 if perf_ctx.is_some() {
1670 self.db_metrics
1671 .write_perf_ctx_metrics
1672 .report_metrics(&self.cf);
1673 }
1674 self.db.put_cf(&self.column_family, key_buf, value_buf)?;
1675
1676 let elapsed = timer.stop_and_record();
1677 if elapsed > 1.0 {
1678 warn!(?elapsed, cf = ?self.cf, "very slow insert");
1679 self.db_metrics
1680 .op_metrics
1681 .rocksdb_very_slow_puts_count
1682 .with_label_values(&[&self.cf])
1683 .inc();
1684 self.db_metrics
1685 .op_metrics
1686 .rocksdb_very_slow_puts_duration_ms
1687 .with_label_values(&[&self.cf])
1688 .inc_by((elapsed * 1000.0) as u64);
1689 }
1690
1691 Ok(())
1692 }
1693
1694 #[instrument(level = "trace", skip_all, err)]
1695 fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
1696 let _timer = self
1697 .db_metrics
1698 .op_metrics
1699 .rocksdb_delete_latency_seconds
1700 .with_label_values(&[&self.cf])
1701 .start_timer();
1702 let perf_ctx = if self.write_sample_interval.sample() {
1703 Some(RocksDBPerfContext)
1704 } else {
1705 None
1706 };
1707 let key_buf = be_fix_int_ser(key);
1708 self.db.delete_cf(&self.column_family, key_buf)?;
1709 self.db_metrics
1710 .op_metrics
1711 .rocksdb_deletes
1712 .with_label_values(&[&self.cf])
1713 .inc();
1714 if perf_ctx.is_some() {
1715 self.db_metrics
1716 .write_perf_ctx_metrics
1717 .report_metrics(&self.cf);
1718 }
1719 Ok(())
1720 }
1721
1722 #[instrument(level = "trace", skip_all, err)]
1731 fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
1732 let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
1733 let last_key = self
1734 .reversed_safe_iter_with_bounds(None, None)?
1735 .next()
1736 .transpose()?
1737 .map(|(k, _v)| k);
1738 if let Some((first_key, last_key)) = first_key.zip(last_key) {
1739 let mut batch = self.batch();
1740 batch.schedule_delete_range(self, &first_key, &last_key)?;
1741 batch.write()?;
1742 }
1743 Ok(())
1744 }
1745
1746 fn is_empty(&self) -> bool {
1747 self.safe_iter().next().is_none()
1748 }
1749
1750 fn safe_iter(&'a self) -> DbIterator<'a, (K, V)> {
1751 match &self.db.storage {
1752 Storage::Rocks(db) => {
1753 let db_iter = db
1754 .underlying
1755 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), self.opts.readopts());
1756 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1757 Box::new(SafeIter::new(
1758 self.cf.clone(),
1759 db_iter,
1760 _timer,
1761 _perf_ctx,
1762 bytes_scanned,
1763 keys_scanned,
1764 Some(self.db_metrics.clone()),
1765 ))
1766 }
1767 Storage::InMemory(db) => db.iterator(&self.cf, None, None, false),
1768 #[cfg(tidehunter)]
1769 Storage::TideHunter(db) => match &self.column_family {
1770 ColumnFamily::TideHunter((ks, prefix)) => Box::new(transform_th_iterator(
1771 db.iterator(*ks),
1772 prefix,
1773 self.start_iter_timer(),
1774 )),
1775 _ => unreachable!("storage backend invariant violation"),
1776 },
1777 }
1778 }
1779
1780 fn safe_iter_with_bounds(
1781 &'a self,
1782 lower_bound: Option<K>,
1783 upper_bound: Option<K>,
1784 ) -> DbIterator<'a, (K, V)> {
1785 let (lower_bound, upper_bound) = iterator_bounds(lower_bound, upper_bound);
1786 match &self.db.storage {
1787 Storage::Rocks(db) => {
1788 let readopts =
1789 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1790 let db_iter = db
1791 .underlying
1792 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1793 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1794 Box::new(SafeIter::new(
1795 self.cf.clone(),
1796 db_iter,
1797 _timer,
1798 _perf_ctx,
1799 bytes_scanned,
1800 keys_scanned,
1801 Some(self.db_metrics.clone()),
1802 ))
1803 }
1804 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1805 #[cfg(tidehunter)]
1806 Storage::TideHunter(db) => match &self.column_family {
1807 ColumnFamily::TideHunter((ks, prefix)) => {
1808 let mut iter = db.iterator(*ks);
1809 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1810 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1811 }
1812 _ => unreachable!("storage backend invariant violation"),
1813 },
1814 }
1815 }
1816
1817 fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> DbIterator<'a, (K, V)> {
1818 let (lower_bound, upper_bound) = iterator_bounds_with_range(range);
1819 match &self.db.storage {
1820 Storage::Rocks(db) => {
1821 let readopts =
1822 rocks_util::apply_range_bounds(self.opts.readopts(), lower_bound, upper_bound);
1823 let db_iter = db
1824 .underlying
1825 .raw_iterator_cf_opt(&rocks_cf(db, &self.cf), readopts);
1826 let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
1827 Box::new(SafeIter::new(
1828 self.cf.clone(),
1829 db_iter,
1830 _timer,
1831 _perf_ctx,
1832 bytes_scanned,
1833 keys_scanned,
1834 Some(self.db_metrics.clone()),
1835 ))
1836 }
1837 Storage::InMemory(db) => db.iterator(&self.cf, lower_bound, upper_bound, false),
1838 #[cfg(tidehunter)]
1839 Storage::TideHunter(db) => match &self.column_family {
1840 ColumnFamily::TideHunter((ks, prefix)) => {
1841 let mut iter = db.iterator(*ks);
1842 apply_range_bounds(&mut iter, lower_bound, upper_bound);
1843 Box::new(transform_th_iterator(iter, prefix, self.start_iter_timer()))
1844 }
1845 _ => unreachable!("storage backend invariant violation"),
1846 },
1847 }
1848 }
1849
1850 #[instrument(level = "trace", skip_all, err)]
1852 fn multi_get<J>(
1853 &self,
1854 keys: impl IntoIterator<Item = J>,
1855 ) -> Result<Vec<Option<V>>, TypedStoreError>
1856 where
1857 J: Borrow<K>,
1858 {
1859 let results = self.multi_get_pinned(keys)?;
1860 let values_parsed: Result<Vec<_>, TypedStoreError> = results
1861 .into_iter()
1862 .map(|value_byte| match value_byte {
1863 Some(data) => Ok(Some(
1864 bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
1865 )),
1866 None => Ok(None),
1867 })
1868 .collect();
1869
1870 values_parsed
1871 }
1872
1873 #[instrument(level = "trace", skip_all, err)]
1875 fn multi_insert<J, U>(
1876 &self,
1877 key_val_pairs: impl IntoIterator<Item = (J, U)>,
1878 ) -> Result<(), Self::Error>
1879 where
1880 J: Borrow<K>,
1881 U: Borrow<V>,
1882 {
1883 let mut batch = self.batch();
1884 batch.insert_batch(self, key_val_pairs)?;
1885 batch.write()
1886 }
1887
1888 #[instrument(level = "trace", skip_all, err)]
1890 fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
1891 where
1892 J: Borrow<K>,
1893 {
1894 let mut batch = self.batch();
1895 batch.delete_batch(self, keys)?;
1896 batch.write()
1897 }
1898
1899 #[instrument(level = "trace", skip_all, err)]
1901 fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
1902 if let Storage::Rocks(rocks) = &self.db.storage {
1903 rocks
1904 .underlying
1905 .try_catch_up_with_primary()
1906 .map_err(typed_store_err_from_rocks_err)?;
1907 }
1908 Ok(())
1909 }
1910}
1911
1912#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
1914pub fn open_cf_opts<P: AsRef<Path>>(
1915 path: P,
1916 db_options: Option<rocksdb::Options>,
1917 metric_conf: MetricConf,
1918 opt_cfs: &[(&str, rocksdb::Options)],
1919) -> Result<Arc<Database>, TypedStoreError> {
1920 let path = path.as_ref();
1921 ensure_database_type(path, StorageType::Rocks)
1922 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1923 let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
1932 nondeterministic!({
1933 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1934 options.create_if_missing(true);
1935 options.create_missing_column_families(true);
1936 let rocksdb = {
1937 rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
1938 &options,
1939 path,
1940 cfs.into_iter()
1941 .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
1942 )
1943 .map_err(typed_store_err_from_rocks_err)?
1944 };
1945 Ok(Arc::new(Database::new(
1946 Storage::Rocks(RocksDB {
1947 underlying: rocksdb,
1948 }),
1949 metric_conf,
1950 None,
1951 )))
1952 })
1953}
1954
1955pub fn open_cf_opts_secondary<P: AsRef<Path>>(
1957 primary_path: P,
1958 secondary_path: Option<P>,
1959 db_options: Option<rocksdb::Options>,
1960 metric_conf: MetricConf,
1961 opt_cfs: &[(&str, rocksdb::Options)],
1962) -> Result<Arc<Database>, TypedStoreError> {
1963 let primary_path = primary_path.as_ref();
1964 let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
1965 nondeterministic!({
1967 let mut options = db_options.unwrap_or_else(|| default_db_options().options);
1969
1970 fdlimit::raise_fd_limit();
1971 options.set_max_open_files(-1);
1973
1974 let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
1975 let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
1976 .ok()
1977 .unwrap_or_default();
1978
1979 let default_db_options = default_db_options();
1980 for cf_key in cfs.iter() {
1982 if !opt_cfs.contains_key(&cf_key[..]) {
1983 opt_cfs.insert(cf_key, default_db_options.options.clone());
1984 }
1985 }
1986
1987 let primary_path = primary_path.to_path_buf();
1988 let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
1989 let mut s = primary_path.clone();
1990 s.pop();
1991 s.push("SECONDARY");
1992 s.as_path().to_path_buf()
1993 });
1994
1995 ensure_database_type(&primary_path, StorageType::Rocks)
1996 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1997 ensure_database_type(&secondary_path, StorageType::Rocks)
1998 .map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
1999
2000 let rocksdb = {
2001 options.create_if_missing(true);
2002 options.create_missing_column_families(true);
2003 let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
2004 &options,
2005 &primary_path,
2006 &secondary_path,
2007 opt_cfs
2008 .iter()
2009 .map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
2010 )
2011 .map_err(typed_store_err_from_rocks_err)?;
2012 db.try_catch_up_with_primary()
2013 .map_err(typed_store_err_from_rocks_err)?;
2014 db
2015 };
2016 Ok(Arc::new(Database::new(
2017 Storage::Rocks(RocksDB {
2018 underlying: rocksdb,
2019 }),
2020 metric_conf,
2021 None,
2022 )))
2023 })
2024}
2025
2026#[cfg(not(tidehunter))]
2028pub async fn safe_drop_db(path: PathBuf, timeout: Duration) -> Result<(), rocksdb::Error> {
2029 let mut backoff = backoff::ExponentialBackoff {
2030 max_elapsed_time: Some(timeout),
2031 ..Default::default()
2032 };
2033 loop {
2034 match rocksdb::DB::destroy(&rocksdb::Options::default(), path.clone()) {
2035 Ok(()) => return Ok(()),
2036 Err(err) => match backoff.next_backoff() {
2037 Some(duration) => tokio::time::sleep(duration).await,
2038 None => return Err(err),
2039 },
2040 }
2041 }
2042}
2043
2044#[cfg(tidehunter)]
2045pub async fn safe_drop_db(path: PathBuf, _: Duration) -> Result<(), std::io::Error> {
2046 std::fs::remove_dir_all(path)
2047}
2048
2049fn populate_missing_cfs(
2050 input_cfs: &[(&str, rocksdb::Options)],
2051 path: &Path,
2052) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
2053 let mut cfs = vec![];
2054 let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
2055 let existing_cfs =
2056 rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
2057 .ok()
2058 .unwrap_or_default();
2059
2060 for cf_name in existing_cfs {
2061 if !input_cf_index.contains(&cf_name[..]) {
2062 cfs.push((cf_name, rocksdb::Options::default()));
2063 }
2064 }
2065 cfs.extend(
2066 input_cfs
2067 .iter()
2068 .map(|(name, opts)| (name.to_string(), (*opts).clone())),
2069 );
2070 Ok(cfs)
2071}
2072
2073fn default_hash(value: &[u8]) -> Digest<32> {
2074 let mut hasher = fastcrypto::hash::Blake2b256::default();
2075 hasher.update(value);
2076 hasher.finalize()
2077}