typed_store/rocks/
options.rs1use rocksdb::{BlockBasedOptions, Cache, MergeOperands, ReadOptions, compaction_filter::Decision};
5use std::collections::BTreeMap;
6use std::env;
7use tap::TapFallible;
8use tracing::{info, warn};
9
10const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
13const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
14
15const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
18const DEFAULT_DB_WAL_SIZE: usize = 1024;
19
20const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
22const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
23const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
24const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
25const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
26const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
27const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
28const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
29const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
30
31const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
33const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
34
35#[derive(Clone, Debug)]
36pub struct ReadWriteOptions {
37 pub ignore_range_deletions: bool,
38 pub log_value_hash: bool,
41 pub sync_writes: bool,
43}
44
45impl ReadWriteOptions {
46 pub fn readopts(&self) -> ReadOptions {
47 let mut readopts = ReadOptions::default();
48 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
49 readopts
50 }
51
52 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
53 self.ignore_range_deletions = ignore;
54 self
55 }
56
57 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
58 self.log_value_hash = log_value_hash;
59 self
60 }
61}
62
63impl Default for ReadWriteOptions {
64 fn default() -> Self {
65 Self {
66 ignore_range_deletions: true,
67 log_value_hash: false,
68 sync_writes: false,
69 }
70 }
71}
72
73#[derive(Default, Clone)]
74pub struct DBOptions {
75 pub options: rocksdb::Options,
76 pub rw_options: ReadWriteOptions,
77}
78
79#[derive(Clone)]
80pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
81impl DBMapTableConfigMap {
82 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
83 Self(map)
84 }
85
86 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
87 self.0.clone()
88 }
89}
90
91impl DBOptions {
92 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
96 self.options
98 .optimize_for_point_lookup(block_cache_size_mb as u64);
99 self
100 }
101
102 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
105 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
106 info!("Large value blob storage optimization is disabled via env var.");
107 return self;
108 }
109
110 self.options.set_enable_blob_files(true);
112 self.options
113 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
114 self.options.set_enable_blob_gc(true);
115 self.options.set_min_blob_size(min_blob_size);
118
119 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
121 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
122 * 1024
123 * 1024;
124 self.options.set_write_buffer_size(write_buffer_size);
125 let target_file_size_base = 64 << 20;
128 self.options
129 .set_target_file_size_base(target_file_size_base);
130 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
132 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
133 self.options
134 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
135
136 self
137 }
138
139 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
141 self.options
142 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
143 self
144 }
145
146 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
148 self.options
149 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
150 self.options
151 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
152 self
153 }
154
155 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
157 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
159 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
160 * 1024
161 * 1024;
162 self.options.set_write_buffer_size(write_buffer_size);
163 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
165 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
166 self.options
167 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
168 self.options
170 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
171
172 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
174 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
175 self.options.set_level_zero_file_num_compaction_trigger(
176 max_level_zero_file_num.try_into().unwrap(),
177 );
178 self.options.set_level_zero_slowdown_writes_trigger(
179 (max_level_zero_file_num * 12).try_into().unwrap(),
180 );
181 self.options
182 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
183
184 self.options.set_target_file_size_base(
186 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
187 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
188 * 1024
189 * 1024,
190 );
191
192 self.options
194 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
195
196 self
197 }
198
199 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
203 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
205 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
206 * 1024
207 * 1024;
208 self.options.set_write_buffer_size(write_buffer_size);
209 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
211 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
212 self.options
213 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
214 self.options
216 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
217
218 self.options
220 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
221 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
222 compaction_options.set_max_size_amplification_percent(10000);
223 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
224 self.options
225 .set_universal_compaction_options(&compaction_options);
226
227 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
228 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
229 self.options.set_level_zero_file_num_compaction_trigger(
230 max_level_zero_file_num.try_into().unwrap(),
231 );
232 self.options.set_level_zero_slowdown_writes_trigger(
233 (max_level_zero_file_num * 12).try_into().unwrap(),
234 );
235 self.options
236 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
237
238 self.options.set_target_file_size_base(
240 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
241 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
242 * 1024
243 * 1024,
244 );
245
246 self.options
248 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
249
250 self
251 }
252
253 pub fn set_block_options(
255 mut self,
256 block_cache_size_mb: usize,
257 block_size_bytes: usize,
258 ) -> DBOptions {
259 self.options
260 .set_block_based_table_factory(&get_block_options(
261 block_cache_size_mb,
262 block_size_bytes,
263 ));
264 self
265 }
266
267 pub fn disable_write_throttling(mut self) -> DBOptions {
269 self.options.set_soft_pending_compaction_bytes_limit(0);
270 self.options.set_hard_pending_compaction_bytes_limit(0);
271 self.options.set_level_zero_slowdown_writes_trigger(512);
272 self.options.set_level_zero_stop_writes_trigger(1024);
273 self
274 }
275
276 pub fn set_merge_operator_associative<F>(mut self, name: &str, merge_fn: F) -> DBOptions
277 where
278 F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>>
279 + Send
280 + Sync
281 + Clone
282 + 'static,
283 {
284 self.options.set_merge_operator_associative(name, merge_fn);
285 self
286 }
287
288 pub fn set_compaction_filter<F>(mut self, name: &str, filter_fn: F) -> DBOptions
289 where
290 F: FnMut(u32, &[u8], &[u8]) -> Decision + Send + 'static,
291 {
292 self.options.set_compaction_filter(name, filter_fn);
293 self
294 }
295}
296
297pub fn default_db_options() -> DBOptions {
299 let mut opt = rocksdb::Options::default();
300
301 if let Some(limit) = fdlimit::raise_fd_limit() {
304 opt.set_max_open_files((limit / 8) as i32);
306 }
307
308 opt.set_table_cache_num_shard_bits(10);
311
312 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
314 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
315 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
316
317 opt.set_db_write_buffer_size(
327 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
328 * 1024
329 * 1024,
330 );
331 opt.set_max_total_wal_size(
332 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
333 );
334
335 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
337
338 opt.set_enable_pipelined_write(true);
339
340 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
343
344 opt.set_memtable_prefix_bloom_ratio(0.02);
346
347 DBOptions {
348 options: opt,
349 rw_options: ReadWriteOptions::default(),
350 }
351}
352
353fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
354 let mut block_options = BlockBasedOptions::default();
359 block_options.set_block_size(block_size_bytes);
361 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
363 block_options.set_bloom_filter(10.0, false);
365 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
367 block_options
368}
369
370pub fn read_size_from_env(var_name: &str) -> Option<usize> {
371 env::var(var_name)
372 .ok()?
373 .parse::<usize>()
374 .tap_err(|e| {
375 warn!(
376 "Env var {} does not contain valid usize integer: {}",
377 var_name, e
378 )
379 })
380 .ok()
381}