typed_store/rocks/
options.rs1use std::{collections::BTreeMap, env};
5
6use rocksdb::{BlockBasedOptions, Cache, MergeOperands, ReadOptions, compaction_filter::Decision};
7use tap::TapFallible;
8use tracing::{info, warn};
9
10const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
13const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
14
15const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
18const DEFAULT_DB_WAL_SIZE: usize = 1024;
19
20const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
22const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
23const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
24const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
25const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
26const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
27const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
28const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
29const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
30
31const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
33const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
34
35#[derive(Clone, Debug)]
36pub struct ReadWriteOptions {
37 pub ignore_range_deletions: bool,
38 pub log_value_hash: bool,
41}
42
43impl ReadWriteOptions {
44 pub fn readopts(&self) -> ReadOptions {
45 let mut readopts = ReadOptions::default();
46 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
47 readopts
48 }
49
50 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
51 self.ignore_range_deletions = ignore;
52 self
53 }
54
55 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
56 self.log_value_hash = log_value_hash;
57 self
58 }
59}
60
61impl Default for ReadWriteOptions {
62 fn default() -> Self {
63 Self {
64 ignore_range_deletions: true,
65 log_value_hash: false,
66 }
67 }
68}
69
70#[derive(Default, Clone)]
71pub struct DBOptions {
72 pub options: rocksdb::Options,
73 pub rw_options: ReadWriteOptions,
74}
75
76#[derive(Clone)]
77pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
78impl DBMapTableConfigMap {
79 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
80 Self(map)
81 }
82
83 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
84 self.0.clone()
85 }
86}
87
88impl DBOptions {
89 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
93 self.options
95 .optimize_for_point_lookup(block_cache_size_mb as u64);
96 self
97 }
98
99 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
102 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
103 info!("Large value blob storage optimization is disabled via env var.");
104 return self;
105 }
106
107 self.options.set_enable_blob_files(true);
109 self.options
110 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
111 self.options.set_enable_blob_gc(true);
112 self.options.set_min_blob_size(min_blob_size);
115
116 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
118 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
119 * 1024
120 * 1024;
121 self.options.set_write_buffer_size(write_buffer_size);
122 let target_file_size_base = 64 << 20;
125 self.options
126 .set_target_file_size_base(target_file_size_base);
127 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
129 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
130 self.options
131 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
132
133 self
134 }
135
136 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
138 self.options
139 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
140 self
141 }
142
143 pub fn optimize_db_for_write_throughput(
145 mut self,
146 db_max_write_buffer_gb: u64,
147 unlimited_open_files: bool,
148 ) -> DBOptions {
149 self.options
150 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
151 self.options
152 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
153 if unlimited_open_files {
154 self.options.set_max_open_files(-1);
155 }
156 self
157 }
158
159 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
161 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
163 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
164 * 1024
165 * 1024;
166 self.options.set_write_buffer_size(write_buffer_size);
167 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
169 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
170 self.options
171 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
172 self.options
174 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
175
176 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
178 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
179 self.options.set_level_zero_file_num_compaction_trigger(
180 max_level_zero_file_num.try_into().unwrap(),
181 );
182 self.options.set_level_zero_slowdown_writes_trigger(
183 (max_level_zero_file_num * 12).try_into().unwrap(),
184 );
185 self.options
186 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
187
188 self.options.set_target_file_size_base(
190 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
191 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
192 * 1024
193 * 1024,
194 );
195
196 self.options
198 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
199
200 self
201 }
202
203 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
207 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
209 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
210 * 1024
211 * 1024;
212 self.options.set_write_buffer_size(write_buffer_size);
213 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
215 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
216 self.options
217 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
218 self.options
220 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
221
222 self.options
224 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
225 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
226 compaction_options.set_max_size_amplification_percent(10000);
227 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
228 self.options
229 .set_universal_compaction_options(&compaction_options);
230
231 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
232 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
233 self.options.set_level_zero_file_num_compaction_trigger(
234 max_level_zero_file_num.try_into().unwrap(),
235 );
236 self.options.set_level_zero_slowdown_writes_trigger(
237 (max_level_zero_file_num * 12).try_into().unwrap(),
238 );
239 self.options
240 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
241
242 self.options.set_target_file_size_base(
244 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
245 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
246 * 1024
247 * 1024,
248 );
249
250 self.options
252 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
253
254 self
255 }
256
257 pub fn optimize_for_no_deletion(mut self) -> DBOptions {
260 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
262 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
263 * 1024
264 * 1024;
265 self.options.set_write_buffer_size(write_buffer_size);
266 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
268 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
269 self.options
270 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
271 self.options
273 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
274
275 self.options
277 .set_compaction_style(rocksdb::DBCompactionStyle::Fifo);
278 let mut compaction_options = rocksdb::FifoCompactOptions::default();
279 compaction_options.set_max_table_files_size(u64::MAX);
281 self.options
282 .set_fifo_compaction_options(&compaction_options);
283
284 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
285 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
286 self.options.set_level_zero_file_num_compaction_trigger(
287 max_level_zero_file_num.try_into().unwrap(),
288 );
289 self.options.set_level_zero_slowdown_writes_trigger(
290 (max_level_zero_file_num * 12).try_into().unwrap(),
291 );
292 self.options
293 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
294 self.options
295 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
296
297 self.options.set_target_file_size_base(
299 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
300 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
301 * 1024
302 * 1024,
303 );
304
305 self
306 }
307
308 pub fn set_block_options(
310 mut self,
311 block_cache_size_mb: usize,
312 block_size_bytes: usize,
313 ) -> DBOptions {
314 self.options
315 .set_block_based_table_factory(&get_block_options(
316 block_cache_size_mb,
317 block_size_bytes,
318 ));
319 self
320 }
321
322 pub fn disable_write_throttling(mut self) -> DBOptions {
324 self.options.set_soft_pending_compaction_bytes_limit(0);
325 self.options.set_hard_pending_compaction_bytes_limit(0);
326 self.options.set_level_zero_slowdown_writes_trigger(512);
327 self.options.set_level_zero_stop_writes_trigger(1024);
328 self
329 }
330
331 pub fn set_merge_operator_associative<F>(mut self, name: &str, merge_fn: F) -> DBOptions
332 where
333 F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>>
334 + Send
335 + Sync
336 + Clone
337 + 'static,
338 {
339 self.options.set_merge_operator_associative(name, merge_fn);
340 self
341 }
342
343 pub fn set_compaction_filter<F>(mut self, name: &str, filter_fn: F) -> DBOptions
344 where
345 F: FnMut(u32, &[u8], &[u8]) -> Decision + Send + 'static,
346 {
347 self.options.set_compaction_filter(name, filter_fn);
348 self
349 }
350}
351
352pub fn default_db_options() -> DBOptions {
354 let mut opt = rocksdb::Options::default();
355
356 if let Some(limit) = fdlimit::raise_fd_limit() {
359 opt.set_max_open_files((limit / 8) as i32);
361 }
362
363 opt.set_table_cache_num_shard_bits(10);
366
367 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
369 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
370 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
371
372 opt.set_db_write_buffer_size(
382 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
383 * 1024
384 * 1024,
385 );
386 opt.set_max_total_wal_size(
387 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
388 );
389
390 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
392
393 opt.set_enable_pipelined_write(true);
394
395 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
398
399 opt.set_memtable_prefix_bloom_ratio(0.02);
401
402 DBOptions {
403 options: opt,
404 rw_options: ReadWriteOptions::default(),
405 }
406}
407
408fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
409 let mut block_options = BlockBasedOptions::default();
414 block_options.set_block_size(block_size_bytes);
416 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
418 block_options.set_bloom_filter(10.0, false);
420 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
422 block_options
423}
424
425pub fn read_size_from_env(var_name: &str) -> Option<usize> {
426 env::var(var_name)
427 .ok()?
428 .parse::<usize>()
429 .tap_err(|e| {
430 warn!(
431 "Env var {} does not contain valid usize integer: {}",
432 var_name, e
433 )
434 })
435 .ok()
436}