pub mod errors;
pub mod memstore;
pub(crate) mod safe_iter;
use crate::rocks::errors::typed_store_err_from_bcs_err;
use crate::rocks::errors::typed_store_err_from_bincode_err;
use crate::rocks::errors::typed_store_err_from_rocks_err;
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
use crate::rocks::errors::typed_store_error_from_th_error;
use crate::rocks::memstore::{InMemoryBatch, InMemoryDB};
use crate::rocks::safe_iter::{SafeIter, SafeRevIter};
use crate::TypedStoreError;
use crate::{
metrics::{DBMetrics, RocksDBPerfContext, SamplingInterval},
traits::{Map, TableSummary},
};
use bincode::Options;
use prometheus::{Histogram, HistogramTimer};
use rocksdb::properties::num_files_at_level;
use rocksdb::{checkpoint::Checkpoint, BlockBasedOptions, Cache, DBPinnableSlice, LiveFile};
use rocksdb::{
properties, AsColumnFamilyRef, ColumnFamilyDescriptor, DBWithThreadMode, Error, MultiThreaded,
ReadOptions, WriteBatch,
};
use serde::{de::DeserializeOwned, Serialize};
use std::ops::{Bound, Deref};
use std::{
borrow::Borrow,
collections::BTreeMap,
env,
marker::PhantomData,
ops::RangeBounds,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use std::{collections::HashSet, ffi::CStr};
use sui_macros::{fail_point, nondeterministic};
use tap::TapFallible;
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
use tidehunter::{db::Db as TideHunterDb, key_shape::KeySpace};
use tokio::sync::oneshot;
use tracing::{debug, error, info, instrument, warn};
const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
const DEFAULT_DB_WAL_SIZE: usize = 1024;
const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
const ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE: &CStr =
unsafe { CStr::from_bytes_with_nul_unchecked("rocksdb.total-blob-file-size\0".as_bytes()) };
#[cfg(test)]
mod tests;
#[macro_export]
macro_rules! reopen {
( $db:expr, $($cf:expr;<$K:ty, $V:ty>),*) => {
(
$(
DBMap::<$K, $V>::reopen($db, Some($cf), &ReadWriteOptions::default(), false).expect(&format!("Cannot open {} CF.", $cf)[..])
),*
)
};
}
#[derive(Debug)]
pub struct RocksDB {
pub underlying: rocksdb::DBWithThreadMode<MultiThreaded>,
}
impl Drop for RocksDB {
fn drop(&mut self) {
self.underlying.cancel_all_background_work(true);
}
}
#[derive(Clone)]
pub enum ColumnFamily {
Rocks(String),
InMemory(String),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
TideHunter(KeySpace),
}
impl std::fmt::Debug for ColumnFamily {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ColumnFamily::Rocks(name) => write!(f, "RocksDB cf: {}", name),
ColumnFamily::InMemory(name) => write!(f, "InMemory cf: {}", name),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
ColumnFamily::TideHunter(_) => write!(f, "TideHunter column family"),
}
}
}
impl ColumnFamily {
fn rocks_cf<'a>(&self, rocks_db: &'a RocksDB) -> Arc<rocksdb::BoundColumnFamily<'a>> {
match &self {
ColumnFamily::Rocks(name) => rocks_db
.underlying
.cf_handle(name)
.expect("Map-keying column family should have been checked at DB creation"),
_ => unreachable!("invariant is checked by the caller"),
}
}
}
pub enum Storage {
Rocks(RocksDB),
InMemory(InMemoryDB),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
TideHunter(TideHunterDb),
}
impl std::fmt::Debug for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Storage::Rocks(db) => write!(f, "RocksDB Storage {:?}", db),
Storage::InMemory(db) => write!(f, "InMemoryDB Storage {:?}", db),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
Storage::TideHunter(_) => write!(f, "TideHunterDB Storage"),
}
}
}
#[derive(Debug)]
pub struct Database {
storage: Storage,
metric_conf: MetricConf,
}
impl Drop for Database {
fn drop(&mut self) {
DBMetrics::get().decrement_num_active_dbs(&self.metric_conf.db_name);
}
}
enum GetResult<'a> {
Rocks(DBPinnableSlice<'a>),
InMemory(Vec<u8>),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
TideHunter(tidehunter::minibytes::Bytes),
}
impl Deref for GetResult<'_> {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
GetResult::Rocks(d) => d.deref(),
GetResult::InMemory(d) => d.deref(),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
GetResult::TideHunter(d) => d.deref(),
}
}
}
impl Database {
fn new(storage: Storage, metric_conf: MetricConf) -> Self {
DBMetrics::get().increment_num_active_dbs(&metric_conf.db_name);
Self {
storage,
metric_conf,
}
}
fn get<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
key: K,
readopts: &ReadOptions,
) -> Result<Option<GetResult<'_>>, TypedStoreError> {
match (&self.storage, cf) {
(Storage::Rocks(db), ColumnFamily::Rocks(_)) => Ok(db
.underlying
.get_pinned_cf_opt(&cf.rocks_cf(db), key, readopts)
.map_err(typed_store_err_from_rocks_err)?
.map(GetResult::Rocks)),
(Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
Ok(db.get(cf_name, key).map(GetResult::InMemory))
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(Storage::TideHunter(db), ColumnFamily::TideHunter(ks)) => Ok(db
.get(*ks, key.as_ref())
.map_err(typed_store_error_from_th_error)?
.map(GetResult::TideHunter)),
_ => Err(TypedStoreError::RocksDBError(
"typed store invariant violation".to_string(),
)),
}
}
fn multi_get<I, K>(
&self,
cf: &ColumnFamily,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<GetResult<'_>>, TypedStoreError>>
where
I: IntoIterator<Item = K>,
K: AsRef<[u8]>,
{
match (&self.storage, cf) {
(Storage::Rocks(db), ColumnFamily::Rocks(_)) => {
let res = db.underlying.batched_multi_get_cf_opt(
&cf.rocks_cf(db),
keys,
false,
readopts,
);
res.into_iter()
.map(|r| {
r.map_err(typed_store_err_from_rocks_err)
.map(|item| item.map(GetResult::Rocks))
})
.collect()
}
(Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => db
.multi_get(cf_name, keys)
.into_iter()
.map(|r| Ok(r.map(GetResult::InMemory)))
.collect(),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(Storage::TideHunter(db), ColumnFamily::TideHunter(ks)) => {
let res = keys.into_iter().map(|k| {
db.get(*ks, k.as_ref())
.map_err(typed_store_error_from_th_error)
});
res.into_iter()
.map(|r| r.map(|item| item.map(GetResult::TideHunter)))
.collect()
}
_ => unreachable!("typed store invariant violation"),
}
}
pub fn create_cf<N: AsRef<str>>(
&self,
name: N,
opts: &rocksdb::Options,
) -> Result<(), rocksdb::Error> {
match &self.storage {
Storage::Rocks(db) => db.underlying.create_cf(name, opts),
Storage::InMemory(_) => Ok(()),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
Storage::TideHunter(_) => {
unimplemented!("TideHunter: recreation of column family on a fly not implemented")
}
}
}
pub fn drop_cf(&self, name: &str) -> Result<(), rocksdb::Error> {
match &self.storage {
Storage::Rocks(db) => db.underlying.drop_cf(name),
Storage::InMemory(db) => {
db.drop_cf(name);
Ok(())
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
Storage::TideHunter(_) => {
unimplemented!("TideHunter: deletion of column family on a fly not implemented")
}
}
}
pub fn delete_file_in_range<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
from: K,
to: K,
) -> Result<(), rocksdb::Error> {
match &self.storage {
Storage::Rocks(rocks) => rocks.underlying.delete_file_in_range_cf(cf, from, to),
_ => unimplemented!("delete_file_in_range is only supported for rocksdb backend"),
}
}
fn delete_cf<K: AsRef<[u8]>>(&self, cf: &ColumnFamily, key: K) -> Result<(), TypedStoreError> {
fail_point!("delete-cf-before");
let ret = match (&self.storage, cf) {
(Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
.underlying
.delete_cf(&cf.rocks_cf(db), key)
.map_err(typed_store_err_from_rocks_err),
(Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
db.delete(cf_name, key.as_ref());
Ok(())
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(Storage::TideHunter(db), ColumnFamily::TideHunter(ks)) => db
.remove(*ks, key.as_ref().to_vec())
.map_err(typed_store_error_from_th_error),
_ => Err(TypedStoreError::RocksDBError(
"typed store invariant violation".to_string(),
)),
};
fail_point!("delete-cf-after");
#[allow(clippy::let_and_return)]
ret
}
pub fn path_for_pruning(&self) -> &Path {
match &self.storage {
Storage::Rocks(rocks) => rocks.underlying.path(),
_ => unimplemented!("method is only supported for rocksdb backend"),
}
}
fn put_cf(
&self,
cf: &ColumnFamily,
key: Vec<u8>,
value: Vec<u8>,
) -> Result<(), TypedStoreError> {
fail_point!("put-cf-before");
let ret = match (&self.storage, cf) {
(Storage::Rocks(db), ColumnFamily::Rocks(_)) => db
.underlying
.put_cf(&cf.rocks_cf(db), key, value)
.map_err(typed_store_err_from_rocks_err),
(Storage::InMemory(db), ColumnFamily::InMemory(cf_name)) => {
db.put(cf_name, key, value);
Ok(())
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(Storage::TideHunter(db), ColumnFamily::TideHunter(ks)) => db
.insert(*ks, key, value)
.map_err(typed_store_error_from_th_error),
_ => Err(TypedStoreError::RocksDBError(
"typed store invariant violation".to_string(),
)),
};
fail_point!("put-cf-after");
#[allow(clippy::let_and_return)]
ret
}
pub fn key_may_exist_cf<K: AsRef<[u8]>>(
&self,
cf_name: &str,
key: K,
readopts: &ReadOptions,
) -> bool {
match &self.storage {
Storage::Rocks(rocks) => {
rocks
.underlying
.key_may_exist_cf_opt(&rocks_cf(rocks, cf_name), key, readopts)
}
_ => true,
}
}
pub fn write(&self, batch: StorageWriteBatch) -> Result<(), TypedStoreError> {
fail_point!("batch-write-before");
let ret = match (&self.storage, batch) {
(Storage::Rocks(rocks), StorageWriteBatch::Rocks(batch)) => rocks
.underlying
.write(batch)
.map_err(typed_store_err_from_rocks_err),
(Storage::InMemory(db), StorageWriteBatch::InMemory(batch)) => {
db.write(batch);
Ok(())
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(Storage::TideHunter(db), StorageWriteBatch::TideHunter(batch)) => db
.write_batch(batch)
.map_err(typed_store_error_from_th_error),
_ => Err(TypedStoreError::RocksDBError(
"using invalid batch type for the database".to_string(),
)),
};
fail_point!("batch-write-after");
#[allow(clippy::let_and_return)]
ret
}
fn raw_iterator_cf<'a: 'b, 'b>(&'a self, cf_name: &str, readopts: ReadOptions) -> RawIter<'b> {
match &self.storage {
Storage::Rocks(rocksdb) => rocksdb
.underlying
.raw_iterator_cf_opt(&rocks_cf(rocksdb, cf_name), readopts),
_ => unimplemented!("iterators not yet supported for other backends"),
}
}
pub fn compact_range_cf<K: AsRef<[u8]>>(
&self,
cf_name: &str,
start: Option<K>,
end: Option<K>,
) {
if let Storage::Rocks(rocksdb) = &self.storage {
rocksdb
.underlying
.compact_range_cf(&rocks_cf(rocksdb, cf_name), start, end);
}
}
pub fn flush(&self) -> Result<(), TypedStoreError> {
match &self.storage {
Storage::Rocks(db) => db
.underlying
.flush()
.map_err(typed_store_err_from_rocks_err),
Storage::InMemory(_) => Ok(()),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
Storage::TideHunter(_) => {
unimplemented!("TideHunter: flush database is not implemented")
}
}
}
pub fn checkpoint(&self, path: &Path) -> Result<(), TypedStoreError> {
if let Storage::Rocks(rocks) = &self.storage {
let checkpoint =
Checkpoint::new(&rocks.underlying).map_err(typed_store_err_from_rocks_err)?;
checkpoint
.create_checkpoint(path)
.map_err(|e| TypedStoreError::RocksDBError(e.to_string()))?;
}
Ok(())
}
pub fn get_sampling_interval(&self) -> SamplingInterval {
self.metric_conf.read_sample_interval.new_from_self()
}
pub fn multiget_sampling_interval(&self) -> SamplingInterval {
self.metric_conf.read_sample_interval.new_from_self()
}
pub fn write_sampling_interval(&self) -> SamplingInterval {
self.metric_conf.write_sample_interval.new_from_self()
}
pub fn iter_sampling_interval(&self) -> SamplingInterval {
self.metric_conf.iter_sample_interval.new_from_self()
}
fn db_name(&self) -> String {
let name = &self.metric_conf.db_name;
if name.is_empty() {
"default".to_string()
} else {
name.clone()
}
}
pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
match &self.storage {
Storage::Rocks(rocks) => rocks.underlying.live_files(),
_ => Ok(vec![]),
}
}
}
fn rocks_cf<'a>(rocks_db: &'a RocksDB, cf_name: &str) -> Arc<rocksdb::BoundColumnFamily<'a>> {
rocks_db
.underlying
.cf_handle(cf_name)
.expect("Map-keying column family should have been checked at DB creation")
}
fn rocks_cf_from_db<'a>(
db: &'a Database,
cf_name: &str,
) -> Result<Arc<rocksdb::BoundColumnFamily<'a>>, TypedStoreError> {
match &db.storage {
Storage::Rocks(rocksdb) => Ok(rocksdb
.underlying
.cf_handle(cf_name)
.expect("Map-keying column family should have been checked at DB creation")),
_ => Err(TypedStoreError::RocksDBError(
"using invalid batch type for the database".to_string(),
)),
}
}
#[derive(Debug, Default)]
pub struct MetricConf {
pub db_name: String,
pub read_sample_interval: SamplingInterval,
pub write_sample_interval: SamplingInterval,
pub iter_sample_interval: SamplingInterval,
}
impl MetricConf {
pub fn new(db_name: &str) -> Self {
if db_name.is_empty() {
error!("A meaningful db name should be used for metrics reporting.")
}
Self {
db_name: db_name.to_string(),
read_sample_interval: SamplingInterval::default(),
write_sample_interval: SamplingInterval::default(),
iter_sample_interval: SamplingInterval::default(),
}
}
pub fn with_sampling(self, read_interval: SamplingInterval) -> Self {
Self {
db_name: self.db_name,
read_sample_interval: read_interval,
write_sample_interval: SamplingInterval::default(),
iter_sample_interval: SamplingInterval::default(),
}
}
}
const CF_METRICS_REPORT_PERIOD_SECS: u64 = 30;
const METRICS_ERROR: i64 = -1;
#[derive(Clone, Debug)]
pub struct DBMap<K, V> {
pub db: Arc<Database>,
_phantom: PhantomData<fn(K) -> V>,
column_family: ColumnFamily,
cf: String,
pub opts: ReadWriteOptions,
db_metrics: Arc<DBMetrics>,
get_sample_interval: SamplingInterval,
multiget_sample_interval: SamplingInterval,
write_sample_interval: SamplingInterval,
iter_sample_interval: SamplingInterval,
_metrics_task_cancel_handle: Arc<oneshot::Sender<()>>,
}
unsafe impl<K: Send, V: Send> Send for DBMap<K, V> {}
impl<K, V> DBMap<K, V> {
pub(crate) fn new(
db: Arc<Database>,
opts: &ReadWriteOptions,
opt_cf: &str,
column_family: ColumnFamily,
is_deprecated: bool,
) -> Self {
let db_cloned = db.clone();
let db_metrics = DBMetrics::get();
let db_metrics_cloned = db_metrics.clone();
let cf = opt_cf.to_string();
let (sender, mut recv) = tokio::sync::oneshot::channel();
if !is_deprecated && matches!(db.storage, Storage::Rocks(_)) {
tokio::task::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(CF_METRICS_REPORT_PERIOD_SECS));
loop {
tokio::select! {
_ = interval.tick() => {
let db = db_cloned.clone();
let cf = cf.clone();
let db_metrics = db_metrics.clone();
if let Err(e) = tokio::task::spawn_blocking(move || {
Self::report_rocksdb_metrics(&db, &cf, &db_metrics);
}).await {
error!("Failed to log metrics with error: {}", e);
}
}
_ = &mut recv => break,
}
}
debug!("Returning the cf metric logging task for DBMap: {}", &cf);
});
}
DBMap {
db: db.clone(),
opts: opts.clone(),
_phantom: PhantomData,
column_family,
cf: opt_cf.to_string(),
db_metrics: db_metrics_cloned,
_metrics_task_cancel_handle: Arc::new(sender),
get_sample_interval: db.get_sampling_interval(),
multiget_sample_interval: db.multiget_sampling_interval(),
write_sample_interval: db.write_sampling_interval(),
iter_sample_interval: db.iter_sampling_interval(),
}
}
#[instrument(level = "debug", skip(db), err)]
pub fn reopen(
db: &Arc<Database>,
opt_cf: Option<&str>,
rw_options: &ReadWriteOptions,
is_deprecated: bool,
) -> Result<Self, TypedStoreError> {
let cf_key = opt_cf
.unwrap_or(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
.to_owned();
Ok(DBMap::new(
db.clone(),
rw_options,
&cf_key,
ColumnFamily::Rocks(cf_key.to_string()),
is_deprecated,
))
}
pub fn cf_name(&self) -> &str {
&self.cf
}
pub fn batch(&self) -> DBBatch {
let batch = match &self.db.storage {
Storage::Rocks(_) => StorageWriteBatch::Rocks(WriteBatch::default()),
Storage::InMemory(_) => StorageWriteBatch::InMemory(InMemoryBatch::default()),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
Storage::TideHunter(_) => {
StorageWriteBatch::TideHunter(tidehunter::batch::WriteBatch::new())
}
};
DBBatch::new(
&self.db,
batch,
&self.db_metrics,
&self.write_sample_interval,
)
}
pub fn compact_range<J: Serialize>(&self, start: &J, end: &J) -> Result<(), TypedStoreError> {
let from_buf = be_fix_int_ser(start)?;
let to_buf = be_fix_int_ser(end)?;
self.db
.compact_range_cf(&self.cf, Some(from_buf), Some(to_buf));
Ok(())
}
pub fn compact_range_raw(
&self,
cf_name: &str,
start: Vec<u8>,
end: Vec<u8>,
) -> Result<(), TypedStoreError> {
self.db.compact_range_cf(cf_name, Some(start), Some(end));
Ok(())
}
fn multi_get_pinned<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<Option<GetResult<'_>>>, TypedStoreError>
where
J: Borrow<K>,
K: Serialize,
{
let _timer = self
.db_metrics
.op_metrics
.rocksdb_multiget_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let perf_ctx = if self.multiget_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
let keys_bytes: Result<Vec<_>, TypedStoreError> = keys
.into_iter()
.map(|k| be_fix_int_ser(k.borrow()))
.collect();
let results: Result<Vec<_>, TypedStoreError> = self
.db
.multi_get(&self.column_family, keys_bytes?, &self.opts.readopts())
.into_iter()
.collect();
let entries = results?;
let entry_size = entries
.iter()
.flatten()
.map(|entry| entry.len())
.sum::<usize>();
self.db_metrics
.op_metrics
.rocksdb_multiget_bytes
.with_label_values(&[&self.cf])
.observe(entry_size as f64);
if perf_ctx.is_some() {
self.db_metrics
.read_perf_ctx_metrics
.report_metrics(&self.cf);
}
Ok(entries)
}
fn get_rocksdb_int_property(
rocksdb: &RocksDB,
cf: &impl AsColumnFamilyRef,
property_name: &std::ffi::CStr,
) -> Result<i64, TypedStoreError> {
match rocksdb.underlying.property_int_value_cf(cf, property_name) {
Ok(Some(value)) => Ok(value.min(i64::MAX as u64).try_into().unwrap_or_default()),
Ok(None) => Ok(0),
Err(e) => Err(TypedStoreError::RocksDBError(e.into_string())),
}
}
fn report_rocksdb_metrics(
database: &Arc<Database>,
cf_name: &str,
db_metrics: &Arc<DBMetrics>,
) {
let Storage::Rocks(rocksdb) = &database.storage else {
return;
};
let Some(cf) = rocksdb.underlying.cf_handle(cf_name) else {
tracing::warn!(
"unable to report metrics for cf {cf_name:?} in db {:?}",
database.db_name()
);
return;
};
db_metrics
.cf_metrics
.rocksdb_total_sst_files_size
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::TOTAL_SST_FILES_SIZE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_total_blob_files_size
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(
rocksdb,
&cf,
ROCKSDB_PROPERTY_TOTAL_BLOB_FILES_SIZE,
)
.unwrap_or(METRICS_ERROR),
);
let total_num_files: i64 = (0..=6)
.map(|level| {
Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(level))
.unwrap_or(METRICS_ERROR)
})
.sum();
db_metrics
.cf_metrics
.rocksdb_total_num_files
.with_label_values(&[cf_name])
.set(total_num_files);
db_metrics
.cf_metrics
.rocksdb_num_level0_files
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, &num_files_at_level(0))
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_current_size_active_mem_tables
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::CUR_SIZE_ACTIVE_MEM_TABLE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_size_all_mem_tables
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_snapshots
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_oldest_snapshot_time
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_actual_delayed_write_rate
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_is_write_stopped
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_block_cache_capacity
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_CAPACITY)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_block_cache_usage
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_block_cache_pinned_usage
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_estimate_table_readers_mem
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(
rocksdb,
&cf,
properties::ESTIMATE_TABLE_READERS_MEM,
)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_estimated_num_keys
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_immutable_mem_tables
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_IMMUTABLE_MEM_TABLE)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_mem_table_flush_pending
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::MEM_TABLE_FLUSH_PENDING)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_compaction_pending
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::COMPACTION_PENDING)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_estimate_pending_compaction_bytes
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(
rocksdb,
&cf,
properties::ESTIMATE_PENDING_COMPACTION_BYTES,
)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_running_compactions
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_COMPACTIONS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_num_running_flushes
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_estimate_oldest_key_time
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_background_errors
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS)
.unwrap_or(METRICS_ERROR),
);
db_metrics
.cf_metrics
.rocksdb_base_level
.with_label_values(&[cf_name])
.set(
Self::get_rocksdb_int_property(rocksdb, &cf, properties::BASE_LEVEL)
.unwrap_or(METRICS_ERROR),
);
}
pub fn checkpoint_db(&self, path: &Path) -> Result<(), TypedStoreError> {
self.db.checkpoint(path)
}
pub fn table_summary(&self) -> eyre::Result<TableSummary>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
let mut num_keys = 0;
let mut key_bytes_total = 0;
let mut value_bytes_total = 0;
let mut key_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
let mut value_hist = hdrhistogram::Histogram::<u64>::new_with_max(100000, 2).unwrap();
for item in self.safe_iter() {
let (key, value) = item?;
num_keys += 1;
let key_len = be_fix_int_ser(key.borrow())?.len();
let value_len = bcs::to_bytes(value.borrow())?.len();
key_bytes_total += key_len;
value_bytes_total += value_len;
key_hist.record(key_len as u64)?;
value_hist.record(value_len as u64)?;
}
Ok(TableSummary {
num_keys,
key_bytes_total,
value_bytes_total,
key_hist,
value_hist,
})
}
fn create_iter_context(
&self,
) -> (
Option<HistogramTimer>,
Option<Histogram>,
Option<Histogram>,
Option<RocksDBPerfContext>,
) {
let timer = self
.db_metrics
.op_metrics
.rocksdb_iter_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let bytes_scanned = self
.db_metrics
.op_metrics
.rocksdb_iter_bytes
.with_label_values(&[&self.cf]);
let keys_scanned = self
.db_metrics
.op_metrics
.rocksdb_iter_keys
.with_label_values(&[&self.cf]);
let perf_ctx = if self.iter_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
(
Some(timer),
Some(bytes_scanned),
Some(keys_scanned),
perf_ctx,
)
}
fn create_read_options_with_bounds(
&self,
lower_bound: Option<K>,
upper_bound: Option<K>,
) -> ReadOptions
where
K: Serialize,
{
let mut readopts = self.opts.readopts();
if let Some(lower_bound) = lower_bound {
let key_buf = be_fix_int_ser(&lower_bound).unwrap();
readopts.set_iterate_lower_bound(key_buf);
}
if let Some(upper_bound) = upper_bound {
let key_buf = be_fix_int_ser(&upper_bound).unwrap();
readopts.set_iterate_upper_bound(key_buf);
}
readopts
}
pub fn reversed_safe_iter_with_bounds(
&self,
lower_bound: Option<K>,
upper_bound: Option<K>,
) -> Result<SafeRevIter<'_, K, V>, TypedStoreError>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
let upper_bound_key = upper_bound.as_ref().map(|k| be_fix_int_ser(&k));
let readopts = self.create_read_options_with_range((
lower_bound
.as_ref()
.map(Bound::Included)
.unwrap_or(Bound::Unbounded),
upper_bound
.as_ref()
.map(Bound::Included)
.unwrap_or(Bound::Unbounded),
));
let db_iter = self.db.raw_iterator_cf(&self.cf, readopts);
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
let iter = SafeIter::new(
self.cf.clone(),
db_iter,
_timer,
_perf_ctx,
bytes_scanned,
keys_scanned,
Some(self.db_metrics.clone()),
);
Ok(SafeRevIter::new(iter, upper_bound_key.transpose()?))
}
fn create_read_options_with_range(&self, range: impl RangeBounds<K>) -> ReadOptions
where
K: Serialize,
{
let mut readopts = self.opts.readopts();
let lower_bound = range.start_bound();
let upper_bound = range.end_bound();
match lower_bound {
Bound::Included(lower_bound) => {
let key_buf = be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
readopts.set_iterate_lower_bound(key_buf);
}
Bound::Excluded(lower_bound) => {
let mut key_buf =
be_fix_int_ser(&lower_bound).expect("Serialization must not fail");
big_endian_saturating_add_one(&mut key_buf);
readopts.set_iterate_lower_bound(key_buf);
}
Bound::Unbounded => (),
};
match upper_bound {
Bound::Included(upper_bound) => {
let mut key_buf =
be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
if !is_max(&key_buf) {
big_endian_saturating_add_one(&mut key_buf);
readopts.set_iterate_upper_bound(key_buf);
}
}
Bound::Excluded(upper_bound) => {
let key_buf = be_fix_int_ser(&upper_bound).expect("Serialization must not fail");
readopts.set_iterate_upper_bound(key_buf);
}
Bound::Unbounded => (),
};
readopts
}
}
pub enum StorageWriteBatch {
Rocks(rocksdb::WriteBatch),
InMemory(InMemoryBatch),
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
TideHunter(tidehunter::batch::WriteBatch),
}
pub struct DBBatch {
database: Arc<Database>,
batch: StorageWriteBatch,
db_metrics: Arc<DBMetrics>,
write_sample_interval: SamplingInterval,
}
impl DBBatch {
pub fn new(
dbref: &Arc<Database>,
batch: StorageWriteBatch,
db_metrics: &Arc<DBMetrics>,
write_sample_interval: &SamplingInterval,
) -> Self {
DBBatch {
database: dbref.clone(),
batch,
db_metrics: db_metrics.clone(),
write_sample_interval: write_sample_interval.clone(),
}
}
#[instrument(level = "trace", skip_all, err)]
pub fn write(self) -> Result<(), TypedStoreError> {
let db_name = self.database.db_name();
let timer = self
.db_metrics
.op_metrics
.rocksdb_batch_commit_latency_seconds
.with_label_values(&[&db_name])
.start_timer();
let batch_size = self.size_in_bytes();
let perf_ctx = if self.write_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
self.database.write(self.batch)?;
self.db_metrics
.op_metrics
.rocksdb_batch_commit_bytes
.with_label_values(&[&db_name])
.observe(batch_size as f64);
if perf_ctx.is_some() {
self.db_metrics
.write_perf_ctx_metrics
.report_metrics(&db_name);
}
let elapsed = timer.stop_and_record();
if elapsed > 1.0 {
warn!(?elapsed, ?db_name, "very slow batch write");
self.db_metrics
.op_metrics
.rocksdb_very_slow_batch_writes_count
.with_label_values(&[&db_name])
.inc();
self.db_metrics
.op_metrics
.rocksdb_very_slow_batch_writes_duration_ms
.with_label_values(&[&db_name])
.inc_by((elapsed * 1000.0) as u64);
}
Ok(())
}
pub fn size_in_bytes(&self) -> usize {
match self.batch {
StorageWriteBatch::Rocks(ref b) => b.size_in_bytes(),
StorageWriteBatch::InMemory(_) => 0,
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
StorageWriteBatch::TideHunter(_) => 0,
}
}
pub fn delete_batch<J: Borrow<K>, K: Serialize, V>(
&mut self,
db: &DBMap<K, V>,
purged_vals: impl IntoIterator<Item = J>,
) -> Result<(), TypedStoreError> {
if !Arc::ptr_eq(&db.db, &self.database) {
return Err(TypedStoreError::CrossDBBatch);
}
purged_vals
.into_iter()
.try_for_each::<_, Result<_, TypedStoreError>>(|k| {
let k_buf = be_fix_int_ser(k.borrow())?;
match (&mut self.batch, &db.column_family) {
(StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
b.delete_cf(&rocks_cf_from_db(&self.database, name)?, k_buf)
}
(StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
b.delete_cf(name, k_buf)
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter(ks)) => {
b.delete(*ks, k_buf)
}
_ => Err(TypedStoreError::RocksDBError(
"typed store invariant violation".to_string(),
))?,
}
Ok(())
})?;
Ok(())
}
pub fn schedule_delete_range<K: Serialize, V>(
&mut self,
db: &DBMap<K, V>,
from: &K,
to: &K,
) -> Result<(), TypedStoreError> {
if !Arc::ptr_eq(&db.db, &self.database) {
return Err(TypedStoreError::CrossDBBatch);
}
let from_buf = be_fix_int_ser(from)?;
let to_buf = be_fix_int_ser(to)?;
if let StorageWriteBatch::Rocks(b) = &mut self.batch {
b.delete_range_cf(
&rocks_cf_from_db(&self.database, db.cf_name())?,
from_buf,
to_buf,
);
}
Ok(())
}
pub fn insert_batch<J: Borrow<K>, K: Serialize, U: Borrow<V>, V: Serialize>(
&mut self,
db: &DBMap<K, V>,
new_vals: impl IntoIterator<Item = (J, U)>,
) -> Result<&mut Self, TypedStoreError> {
if !Arc::ptr_eq(&db.db, &self.database) {
return Err(TypedStoreError::CrossDBBatch);
}
let mut total = 0usize;
new_vals
.into_iter()
.try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
let k_buf = be_fix_int_ser(k.borrow())?;
let v_buf = bcs::to_bytes(v.borrow()).map_err(typed_store_err_from_bcs_err)?;
total += k_buf.len() + v_buf.len();
match (&mut self.batch, &db.column_family) {
(StorageWriteBatch::Rocks(b), ColumnFamily::Rocks(name)) => {
b.put_cf(&rocks_cf_from_db(&self.database, name)?, k_buf, v_buf)
}
(StorageWriteBatch::InMemory(b), ColumnFamily::InMemory(name)) => {
b.put_cf(name, k_buf, v_buf)
}
#[cfg(all(not(target_os = "windows"), feature = "tide_hunter"))]
(StorageWriteBatch::TideHunter(b), ColumnFamily::TideHunter(ks)) => {
b.write(*ks, k_buf.to_vec(), v_buf.to_vec())
}
_ => Err(TypedStoreError::RocksDBError(
"typed store invariant violation".to_string(),
))?,
}
Ok(())
})?;
self.db_metrics
.op_metrics
.rocksdb_batch_put_bytes
.with_label_values(&[&db.cf])
.observe(total as f64);
Ok(self)
}
pub fn partial_merge_batch<J: Borrow<K>, K: Serialize, V: Serialize, B: AsRef<[u8]>>(
&mut self,
db: &DBMap<K, V>,
new_vals: impl IntoIterator<Item = (J, B)>,
) -> Result<&mut Self, TypedStoreError> {
if !Arc::ptr_eq(&db.db, &self.database) {
return Err(TypedStoreError::CrossDBBatch);
}
new_vals
.into_iter()
.try_for_each::<_, Result<_, TypedStoreError>>(|(k, v)| {
let k_buf = be_fix_int_ser(k.borrow())?;
match &mut self.batch {
StorageWriteBatch::Rocks(b) => {
b.merge_cf(&rocks_cf_from_db(&self.database, db.cf_name())?, k_buf, v)
}
_ => unimplemented!("merge operator is only implemented for RocksDB"),
}
Ok(())
})?;
Ok(self)
}
}
pub type RawIter<'a> = rocksdb::DBRawIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>;
impl<'a, K, V> Map<'a, K, V> for DBMap<K, V>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
type Error = TypedStoreError;
type SafeIterator = SafeIter<'a, K, V>;
#[instrument(level = "trace", skip_all, err)]
fn contains_key(&self, key: &K) -> Result<bool, TypedStoreError> {
let key_buf = be_fix_int_ser(key)?;
let readopts = self.opts.readopts();
Ok(self.db.key_may_exist_cf(&self.cf, &key_buf, &readopts)
&& self
.db
.get(&self.column_family, &key_buf, &readopts)?
.is_some())
}
#[instrument(level = "trace", skip_all, err)]
fn multi_contains_keys<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<bool>, Self::Error>
where
J: Borrow<K>,
{
let values = self.multi_get_pinned(keys)?;
Ok(values.into_iter().map(|v| v.is_some()).collect())
}
#[instrument(level = "trace", skip_all, err)]
fn get(&self, key: &K) -> Result<Option<V>, TypedStoreError> {
let _timer = self
.db_metrics
.op_metrics
.rocksdb_get_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let perf_ctx = if self.get_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
let key_buf = be_fix_int_ser(key)?;
let res = self
.db
.get(&self.column_family, &key_buf, &self.opts.readopts())?;
self.db_metrics
.op_metrics
.rocksdb_get_bytes
.with_label_values(&[&self.cf])
.observe(res.as_ref().map_or(0.0, |v| v.len() as f64));
if perf_ctx.is_some() {
self.db_metrics
.read_perf_ctx_metrics
.report_metrics(&self.cf);
}
match res {
Some(data) => Ok(Some(
bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
)),
None => Ok(None),
}
}
#[instrument(level = "trace", skip_all, err)]
fn insert(&self, key: &K, value: &V) -> Result<(), TypedStoreError> {
let timer = self
.db_metrics
.op_metrics
.rocksdb_put_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let perf_ctx = if self.write_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
let key_buf = be_fix_int_ser(key)?;
let value_buf = bcs::to_bytes(value).map_err(typed_store_err_from_bcs_err)?;
self.db_metrics
.op_metrics
.rocksdb_put_bytes
.with_label_values(&[&self.cf])
.observe((key_buf.len() + value_buf.len()) as f64);
if perf_ctx.is_some() {
self.db_metrics
.write_perf_ctx_metrics
.report_metrics(&self.cf);
}
self.db.put_cf(&self.column_family, key_buf, value_buf)?;
let elapsed = timer.stop_and_record();
if elapsed > 1.0 {
warn!(?elapsed, cf = ?self.cf, "very slow insert");
self.db_metrics
.op_metrics
.rocksdb_very_slow_puts_count
.with_label_values(&[&self.cf])
.inc();
self.db_metrics
.op_metrics
.rocksdb_very_slow_puts_duration_ms
.with_label_values(&[&self.cf])
.inc_by((elapsed * 1000.0) as u64);
}
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
fn remove(&self, key: &K) -> Result<(), TypedStoreError> {
let _timer = self
.db_metrics
.op_metrics
.rocksdb_delete_latency_seconds
.with_label_values(&[&self.cf])
.start_timer();
let perf_ctx = if self.write_sample_interval.sample() {
Some(RocksDBPerfContext)
} else {
None
};
let key_buf = be_fix_int_ser(key)?;
self.db.delete_cf(&self.column_family, key_buf)?;
self.db_metrics
.op_metrics
.rocksdb_deletes
.with_label_values(&[&self.cf])
.inc();
if perf_ctx.is_some() {
self.db_metrics
.write_perf_ctx_metrics
.report_metrics(&self.cf);
}
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
fn unsafe_clear(&self) -> Result<(), TypedStoreError> {
let _ = self.db.drop_cf(&self.cf);
self.db
.create_cf(self.cf.clone(), &default_db_options().options)
.map_err(typed_store_err_from_rocks_err)?;
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
fn schedule_delete_all(&self) -> Result<(), TypedStoreError> {
let first_key = self.safe_iter().next().transpose()?.map(|(k, _v)| k);
let last_key = self
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?
.map(|(k, _v)| k);
if let Some((first_key, last_key)) = first_key.zip(last_key) {
let mut batch = self.batch();
batch.schedule_delete_range(self, &first_key, &last_key)?;
batch.write()?;
}
Ok(())
}
fn is_empty(&self) -> bool {
self.safe_iter().next().is_none()
}
fn safe_iter(&'a self) -> Self::SafeIterator {
let db_iter = self.db.raw_iterator_cf(&self.cf, self.opts.readopts());
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
SafeIter::new(
self.cf.clone(),
db_iter,
_timer,
_perf_ctx,
bytes_scanned,
keys_scanned,
Some(self.db_metrics.clone()),
)
}
fn safe_iter_with_bounds(
&'a self,
lower_bound: Option<K>,
upper_bound: Option<K>,
) -> Self::SafeIterator {
let readopts = self.create_read_options_with_bounds(lower_bound, upper_bound);
let db_iter = self.db.raw_iterator_cf(&self.cf, readopts);
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
SafeIter::new(
self.cf.clone(),
db_iter,
_timer,
_perf_ctx,
bytes_scanned,
keys_scanned,
Some(self.db_metrics.clone()),
)
}
fn safe_range_iter(&'a self, range: impl RangeBounds<K>) -> Self::SafeIterator {
let readopts = self.create_read_options_with_range(range);
let db_iter = self.db.raw_iterator_cf(&self.cf, readopts);
let (_timer, bytes_scanned, keys_scanned, _perf_ctx) = self.create_iter_context();
SafeIter::new(
self.cf.clone(),
db_iter,
_timer,
_perf_ctx,
bytes_scanned,
keys_scanned,
Some(self.db_metrics.clone()),
)
}
#[instrument(level = "trace", skip_all, err)]
fn multi_get<J>(
&self,
keys: impl IntoIterator<Item = J>,
) -> Result<Vec<Option<V>>, TypedStoreError>
where
J: Borrow<K>,
{
let results = self.multi_get_pinned(keys)?;
let values_parsed: Result<Vec<_>, TypedStoreError> = results
.into_iter()
.map(|value_byte| match value_byte {
Some(data) => Ok(Some(
bcs::from_bytes(&data).map_err(typed_store_err_from_bcs_err)?,
)),
None => Ok(None),
})
.collect();
values_parsed
}
#[instrument(level = "trace", skip_all, err)]
fn multi_insert<J, U>(
&self,
key_val_pairs: impl IntoIterator<Item = (J, U)>,
) -> Result<(), Self::Error>
where
J: Borrow<K>,
U: Borrow<V>,
{
let mut batch = self.batch();
batch.insert_batch(self, key_val_pairs)?;
batch.write()
}
#[instrument(level = "trace", skip_all, err)]
fn multi_remove<J>(&self, keys: impl IntoIterator<Item = J>) -> Result<(), Self::Error>
where
J: Borrow<K>,
{
let mut batch = self.batch();
batch.delete_batch(self, keys)?;
batch.write()
}
#[instrument(level = "trace", skip_all, err)]
fn try_catch_up_with_primary(&self) -> Result<(), Self::Error> {
if let Storage::Rocks(rocks) = &self.db.storage {
rocks
.underlying
.try_catch_up_with_primary()
.map_err(typed_store_err_from_rocks_err)?;
}
Ok(())
}
}
pub fn read_size_from_env(var_name: &str) -> Option<usize> {
env::var(var_name)
.ok()?
.parse::<usize>()
.tap_err(|e| {
warn!(
"Env var {} does not contain valid usize integer: {}",
var_name, e
)
})
.ok()
}
#[derive(Clone, Debug)]
pub struct ReadWriteOptions {
pub ignore_range_deletions: bool,
}
impl ReadWriteOptions {
pub fn readopts(&self) -> ReadOptions {
let mut readopts = ReadOptions::default();
readopts.set_ignore_range_deletions(self.ignore_range_deletions);
readopts
}
pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
self.ignore_range_deletions = ignore;
self
}
}
impl Default for ReadWriteOptions {
fn default() -> Self {
Self {
ignore_range_deletions: true,
}
}
}
#[derive(Default, Clone)]
pub struct DBOptions {
pub options: rocksdb::Options,
pub rw_options: ReadWriteOptions,
}
impl DBOptions {
pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
self.options
.optimize_for_point_lookup(block_cache_size_mb as u64);
self
}
pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
info!("Large value blob storage optimization is disabled via env var.");
return self;
}
self.options.set_enable_blob_files(true);
self.options
.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
self.options.set_enable_blob_gc(true);
self.options.set_min_blob_size(min_blob_size);
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let target_file_size_base = 64 << 20;
self.options
.set_target_file_size_base(target_file_size_base);
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options
.set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
self
}
pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
self.options
.set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
self
}
pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
self.options
.set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
self.options
.set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
self
}
pub fn optimize_for_write_throughput(mut self) -> DBOptions {
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
self.options
.set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
self.options
.set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options.set_level_zero_file_num_compaction_trigger(
max_level_zero_file_num.try_into().unwrap(),
);
self.options.set_level_zero_slowdown_writes_trigger(
(max_level_zero_file_num * 12).try_into().unwrap(),
);
self.options
.set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
self.options.set_target_file_size_base(
read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
.unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
* 1024
* 1024,
);
self.options
.set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
self
}
pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
self.options
.set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
self.options
.set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
self.options
.set_compaction_style(rocksdb::DBCompactionStyle::Universal);
let mut compaction_options = rocksdb::UniversalCompactOptions::default();
compaction_options.set_max_size_amplification_percent(10000);
compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
self.options
.set_universal_compaction_options(&compaction_options);
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options.set_level_zero_file_num_compaction_trigger(
max_level_zero_file_num.try_into().unwrap(),
);
self.options.set_level_zero_slowdown_writes_trigger(
(max_level_zero_file_num * 12).try_into().unwrap(),
);
self.options
.set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
self.options.set_target_file_size_base(
read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
.unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
* 1024
* 1024,
);
self.options
.set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
self
}
pub fn set_block_options(
mut self,
block_cache_size_mb: usize,
block_size_bytes: usize,
) -> DBOptions {
self.options
.set_block_based_table_factory(&get_block_options(
block_cache_size_mb,
block_size_bytes,
));
self
}
pub fn disable_write_throttling(mut self) -> DBOptions {
self.options.set_soft_pending_compaction_bytes_limit(0);
self.options.set_hard_pending_compaction_bytes_limit(0);
self
}
}
pub fn default_db_options() -> DBOptions {
let mut opt = rocksdb::Options::default();
if let Some(limit) = fdlimit::raise_fd_limit() {
opt.set_max_open_files((limit / 8) as i32);
}
opt.set_table_cache_num_shard_bits(10);
opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
opt.set_db_write_buffer_size(
read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
* 1024
* 1024,
);
opt.set_max_total_wal_size(
read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
);
opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
opt.set_enable_pipelined_write(true);
opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
opt.set_memtable_prefix_bloom_ratio(0.02);
DBOptions {
options: opt,
rw_options: ReadWriteOptions::default(),
}
}
fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
let mut block_options = BlockBasedOptions::default();
block_options.set_block_size(block_size_bytes);
block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
block_options.set_bloom_filter(10.0, false);
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_options
}
#[instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)]
pub fn open_cf<P: AsRef<Path>>(
path: P,
db_options: Option<rocksdb::Options>,
metric_conf: MetricConf,
opt_cfs: &[&str],
) -> Result<Arc<Database>, TypedStoreError> {
let options = db_options.unwrap_or_else(|| default_db_options().options);
let column_descriptors: Vec<_> = opt_cfs
.iter()
.map(|name| (*name, options.clone()))
.collect();
open_cf_opts(
path,
Some(options.clone()),
metric_conf,
&column_descriptors[..],
)
}
fn prepare_db_options(db_options: Option<rocksdb::Options>) -> rocksdb::Options {
let mut options = db_options.unwrap_or_else(|| default_db_options().options);
options.create_if_missing(true);
options.create_missing_column_families(true);
options
}
#[instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)]
pub fn open_cf_opts<P: AsRef<Path>>(
path: P,
db_options: Option<rocksdb::Options>,
metric_conf: MetricConf,
opt_cfs: &[(&str, rocksdb::Options)],
) -> Result<Arc<Database>, TypedStoreError> {
let path = path.as_ref();
let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
nondeterministic!({
let options = prepare_db_options(db_options);
let rocksdb = {
rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&options,
path,
cfs.into_iter()
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
)
.map_err(typed_store_err_from_rocks_err)?
};
Ok(Arc::new(Database::new(
Storage::Rocks(RocksDB {
underlying: rocksdb,
}),
metric_conf,
)))
})
}
pub fn open_cf_opts_secondary<P: AsRef<Path>>(
primary_path: P,
secondary_path: Option<P>,
db_options: Option<rocksdb::Options>,
metric_conf: MetricConf,
opt_cfs: &[(&str, rocksdb::Options)],
) -> Result<Arc<Database>, TypedStoreError> {
let primary_path = primary_path.as_ref();
let secondary_path = secondary_path.as_ref().map(|p| p.as_ref());
nondeterministic!({
let mut options = db_options.unwrap_or_else(|| default_db_options().options);
fdlimit::raise_fd_limit();
options.set_max_open_files(-1);
let mut opt_cfs: std::collections::HashMap<_, _> = opt_cfs.iter().cloned().collect();
let cfs = rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&options, primary_path)
.ok()
.unwrap_or_default();
let default_db_options = default_db_options();
for cf_key in cfs.iter() {
if !opt_cfs.contains_key(&cf_key[..]) {
opt_cfs.insert(cf_key, default_db_options.options.clone());
}
}
let primary_path = primary_path.to_path_buf();
let secondary_path = secondary_path.map(|q| q.to_path_buf()).unwrap_or_else(|| {
let mut s = primary_path.clone();
s.pop();
s.push("SECONDARY");
s.as_path().to_path_buf()
});
let rocksdb = {
options.create_if_missing(true);
options.create_missing_column_families(true);
let db = rocksdb::DBWithThreadMode::<MultiThreaded>::open_cf_descriptors_as_secondary(
&options,
&primary_path,
&secondary_path,
opt_cfs
.iter()
.map(|(name, opts)| ColumnFamilyDescriptor::new(*name, (*opts).clone())),
)
.map_err(typed_store_err_from_rocks_err)?;
db.try_catch_up_with_primary()
.map_err(typed_store_err_from_rocks_err)?;
db
};
Ok(Arc::new(Database::new(
Storage::Rocks(RocksDB {
underlying: rocksdb,
}),
metric_conf,
)))
})
}
pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
const DB_DEFAULT_CF_NAME: &str = "default";
let opts = rocksdb::Options::default();
rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
.map_err(|e| e.into())
.map(|q| {
q.iter()
.filter_map(|s| {
if s != DB_DEFAULT_CF_NAME {
Some(s.clone())
} else {
None
}
})
.collect()
})
}
#[inline]
pub fn be_fix_int_ser<S>(t: &S) -> Result<Vec<u8>, TypedStoreError>
where
S: ?Sized + serde::Serialize,
{
bincode::DefaultOptions::new()
.with_big_endian()
.with_fixint_encoding()
.serialize(t)
.map_err(typed_store_err_from_bincode_err)
}
#[derive(Clone)]
pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
impl DBMapTableConfigMap {
pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
Self(map)
}
pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
self.0.clone()
}
}
pub enum RocksDBAccessType {
Primary,
Secondary(Option<PathBuf>),
}
pub fn safe_drop_db(path: PathBuf) -> Result<(), rocksdb::Error> {
rocksdb::DB::destroy(&rocksdb::Options::default(), path)
}
fn populate_missing_cfs(
input_cfs: &[(&str, rocksdb::Options)],
path: &Path,
) -> Result<Vec<(String, rocksdb::Options)>, rocksdb::Error> {
let mut cfs = vec![];
let input_cf_index: HashSet<_> = input_cfs.iter().map(|(name, _)| *name).collect();
let existing_cfs =
rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(&rocksdb::Options::default(), path)
.ok()
.unwrap_or_default();
for cf_name in existing_cfs {
if !input_cf_index.contains(&cf_name[..]) {
cfs.push((cf_name, rocksdb::Options::default()));
}
}
cfs.extend(
input_cfs
.iter()
.map(|(name, opts)| (name.to_string(), (*opts).clone())),
);
Ok(cfs)
}
fn big_endian_saturating_add_one(v: &mut [u8]) {
if is_max(v) {
return;
}
for i in (0..v.len()).rev() {
if v[i] == u8::MAX {
v[i] = 0;
} else {
v[i] += 1;
break;
}
}
}
fn is_max(v: &[u8]) -> bool {
v.iter().all(|&x| x == u8::MAX)
}
#[allow(clippy::assign_op_pattern)]
#[test]
fn test_helpers() {
let v = vec![];
assert!(is_max(&v));
fn check_add(v: Vec<u8>) {
let mut v = v;
let num = Num32::from_big_endian(&v);
big_endian_saturating_add_one(&mut v);
assert!(num + 1 == Num32::from_big_endian(&v));
}
uint::construct_uint! {
struct Num32(4);
}
let mut v = vec![255; 32];
big_endian_saturating_add_one(&mut v);
assert!(Num32::MAX == Num32::from_big_endian(&v));
check_add(vec![1; 32]);
check_add(vec![6; 32]);
check_add(vec![254; 32]);
}