typed_store/rocks/
options.rsuse rocksdb::{BlockBasedOptions, Cache, ReadOptions};
use std::collections::BTreeMap;
use std::env;
use tap::TapFallible;
use tracing::{info, warn};
const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
const DEFAULT_DB_WAL_SIZE: usize = 1024;
const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
#[derive(Clone, Debug)]
pub struct ReadWriteOptions {
pub ignore_range_deletions: bool,
pub log_value_hash: bool,
}
impl ReadWriteOptions {
pub fn readopts(&self) -> ReadOptions {
let mut readopts = ReadOptions::default();
readopts.set_ignore_range_deletions(self.ignore_range_deletions);
readopts
}
pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
self.ignore_range_deletions = ignore;
self
}
pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
self.log_value_hash = log_value_hash;
self
}
}
impl Default for ReadWriteOptions {
fn default() -> Self {
Self {
ignore_range_deletions: true,
log_value_hash: false,
}
}
}
#[derive(Default, Clone)]
pub struct DBOptions {
pub options: rocksdb::Options,
pub rw_options: ReadWriteOptions,
}
#[derive(Clone)]
pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
impl DBMapTableConfigMap {
pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
Self(map)
}
pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
self.0.clone()
}
}
impl DBOptions {
pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
self.options
.optimize_for_point_lookup(block_cache_size_mb as u64);
self
}
pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
info!("Large value blob storage optimization is disabled via env var.");
return self;
}
self.options.set_enable_blob_files(true);
self.options
.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
self.options.set_enable_blob_gc(true);
self.options.set_min_blob_size(min_blob_size);
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let target_file_size_base = 64 << 20;
self.options
.set_target_file_size_base(target_file_size_base);
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options
.set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
self
}
pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
self.options
.set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
self
}
pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
self.options
.set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
self.options
.set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
self
}
pub fn optimize_for_write_throughput(mut self) -> DBOptions {
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
self.options
.set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
self.options
.set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options.set_level_zero_file_num_compaction_trigger(
max_level_zero_file_num.try_into().unwrap(),
);
self.options.set_level_zero_slowdown_writes_trigger(
(max_level_zero_file_num * 12).try_into().unwrap(),
);
self.options
.set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
self.options.set_target_file_size_base(
read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
.unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
* 1024
* 1024,
);
self.options
.set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
self
}
pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
* 1024
* 1024;
self.options.set_write_buffer_size(write_buffer_size);
let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
.unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
self.options
.set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
self.options
.set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
self.options
.set_compaction_style(rocksdb::DBCompactionStyle::Universal);
let mut compaction_options = rocksdb::UniversalCompactOptions::default();
compaction_options.set_max_size_amplification_percent(10000);
compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
self.options
.set_universal_compaction_options(&compaction_options);
let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
.unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
self.options.set_level_zero_file_num_compaction_trigger(
max_level_zero_file_num.try_into().unwrap(),
);
self.options.set_level_zero_slowdown_writes_trigger(
(max_level_zero_file_num * 12).try_into().unwrap(),
);
self.options
.set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
self.options.set_target_file_size_base(
read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
.unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
* 1024
* 1024,
);
self.options
.set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
self
}
pub fn set_block_options(
mut self,
block_cache_size_mb: usize,
block_size_bytes: usize,
) -> DBOptions {
self.options
.set_block_based_table_factory(&get_block_options(
block_cache_size_mb,
block_size_bytes,
));
self
}
pub fn disable_write_throttling(mut self) -> DBOptions {
self.options.set_soft_pending_compaction_bytes_limit(0);
self.options.set_hard_pending_compaction_bytes_limit(0);
self
}
}
pub fn default_db_options() -> DBOptions {
let mut opt = rocksdb::Options::default();
if let Some(limit) = fdlimit::raise_fd_limit() {
opt.set_max_open_files((limit / 8) as i32);
}
opt.set_table_cache_num_shard_bits(10);
opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
opt.set_db_write_buffer_size(
read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
* 1024
* 1024,
);
opt.set_max_total_wal_size(
read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
);
opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
opt.set_enable_pipelined_write(true);
opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
opt.set_memtable_prefix_bloom_ratio(0.02);
DBOptions {
options: opt,
rw_options: ReadWriteOptions::default(),
}
}
fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
let mut block_options = BlockBasedOptions::default();
block_options.set_block_size(block_size_bytes);
block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
block_options.set_bloom_filter(10.0, false);
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_options
}
pub fn read_size_from_env(var_name: &str) -> Option<usize> {
env::var(var_name)
.ok()?
.parse::<usize>()
.tap_err(|e| {
warn!(
"Env var {} does not contain valid usize integer: {}",
var_name, e
)
})
.ok()
}