typed_store/rocks/
options.rs1use std::{collections::BTreeMap, env};
5
6use rocksdb::{BlockBasedOptions, Cache, MergeOperands, ReadOptions, compaction_filter::Decision};
7use tap::TapFallible;
8use tracing::{info, warn};
9
10const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
13const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
14
15const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
18const DEFAULT_DB_WAL_SIZE: usize = 1024;
19
20const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
22const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
23const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
24const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
25const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
26const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
27const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
28const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
29const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
30
31const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
33const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
34
35#[derive(Clone, Debug)]
36pub struct ReadWriteOptions {
37 pub ignore_range_deletions: bool,
38 pub log_value_hash: bool,
41}
42
43impl ReadWriteOptions {
44 pub fn readopts(&self) -> ReadOptions {
45 let mut readopts = ReadOptions::default();
46 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
47 readopts
48 }
49
50 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
51 self.ignore_range_deletions = ignore;
52 self
53 }
54
55 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
56 self.log_value_hash = log_value_hash;
57 self
58 }
59}
60
61impl Default for ReadWriteOptions {
62 fn default() -> Self {
63 Self {
64 ignore_range_deletions: true,
65 log_value_hash: false,
66 }
67 }
68}
69
70#[derive(Default, Clone)]
71pub struct DBOptions {
72 pub options: rocksdb::Options,
73 pub rw_options: ReadWriteOptions,
74}
75
76#[derive(Clone)]
77pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
78impl DBMapTableConfigMap {
79 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
80 Self(map)
81 }
82
83 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
84 self.0.clone()
85 }
86}
87
88impl DBOptions {
89 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
93 self.options
95 .optimize_for_point_lookup(block_cache_size_mb as u64);
96 self
97 }
98
99 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
102 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
103 info!("Large value blob storage optimization is disabled via env var.");
104 return self;
105 }
106
107 self.options.set_enable_blob_files(true);
109 self.options
110 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
111 self.options.set_enable_blob_gc(true);
112 self.options.set_min_blob_size(min_blob_size);
115
116 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
118 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
119 * 1024
120 * 1024;
121 self.options.set_write_buffer_size(write_buffer_size);
122 let target_file_size_base = 64 << 20;
125 self.options
126 .set_target_file_size_base(target_file_size_base);
127 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
129 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
130 self.options
131 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
132
133 self
134 }
135
136 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
138 self.options
139 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
140 self
141 }
142
143 pub fn optimize_db_for_write_throughput(
145 mut self,
146 db_max_write_buffer_gb: u64,
147 unlimited_open_files: bool,
148 ) -> DBOptions {
149 self.options
150 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
151 self.options
152 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
153 if unlimited_open_files {
154 self.options.set_max_open_files(-1);
155 }
156 self
157 }
158
159 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
161 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
163 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
164 * 1024
165 * 1024;
166 self.options.set_write_buffer_size(write_buffer_size);
167 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
169 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
170 self.options
171 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
172 self.options
174 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
175
176 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
178 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
179 self.options.set_level_zero_file_num_compaction_trigger(
180 max_level_zero_file_num.try_into().unwrap(),
181 );
182 self.options.set_level_zero_slowdown_writes_trigger(
183 (max_level_zero_file_num * 12).try_into().unwrap(),
184 );
185 self.options
186 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
187
188 self.options.set_target_file_size_base(
190 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
191 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
192 * 1024
193 * 1024,
194 );
195
196 self.options
198 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
199
200 self
201 }
202
203 pub fn optimize_for_no_deletion(mut self) -> DBOptions {
206 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
208 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
209 * 1024
210 * 1024;
211 self.options.set_write_buffer_size(write_buffer_size);
212 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
214 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
215 self.options
216 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
217 self.options
219 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
220
221 self.options
223 .set_compaction_style(rocksdb::DBCompactionStyle::Fifo);
224 let mut compaction_options = rocksdb::FifoCompactOptions::default();
225 compaction_options.set_max_table_files_size(u64::MAX);
227 self.options
228 .set_fifo_compaction_options(&compaction_options);
229
230 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
231 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
232 self.options.set_level_zero_file_num_compaction_trigger(
233 max_level_zero_file_num.try_into().unwrap(),
234 );
235 self.options.set_level_zero_slowdown_writes_trigger(
236 (max_level_zero_file_num * 12).try_into().unwrap(),
237 );
238 self.options
239 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
240 self.options
241 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
242
243 self.options.set_target_file_size_base(
245 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
246 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
247 * 1024
248 * 1024,
249 );
250
251 self
252 }
253
254 pub fn set_block_options(
256 mut self,
257 block_cache_size_mb: usize,
258 block_size_bytes: usize,
259 ) -> DBOptions {
260 self.options
261 .set_block_based_table_factory(&get_block_options(
262 block_cache_size_mb,
263 block_size_bytes,
264 ));
265 self
266 }
267
268 pub fn disable_write_throttling(mut self) -> DBOptions {
270 self.options.set_soft_pending_compaction_bytes_limit(0);
271 self.options.set_hard_pending_compaction_bytes_limit(0);
272 self.options.set_level_zero_slowdown_writes_trigger(512);
273 self.options.set_level_zero_stop_writes_trigger(1024);
274 self
275 }
276
277 pub fn set_merge_operator_associative<F>(mut self, name: &str, merge_fn: F) -> DBOptions
278 where
279 F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>>
280 + Send
281 + Sync
282 + Clone
283 + 'static,
284 {
285 self.options.set_merge_operator_associative(name, merge_fn);
286 self
287 }
288
289 pub fn set_compaction_filter<F>(mut self, name: &str, filter_fn: F) -> DBOptions
290 where
291 F: FnMut(u32, &[u8], &[u8]) -> Decision + Send + 'static,
292 {
293 self.options.set_compaction_filter(name, filter_fn);
294 self
295 }
296}
297
298pub fn default_db_options() -> DBOptions {
300 let mut opt = rocksdb::Options::default();
301
302 if let Some(limit) = fdlimit::raise_fd_limit() {
305 opt.set_max_open_files((limit / 8) as i32);
307 }
308
309 opt.set_table_cache_num_shard_bits(10);
312
313 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
315 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
316 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
317
318 opt.set_db_write_buffer_size(
328 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
329 * 1024
330 * 1024,
331 );
332 opt.set_max_total_wal_size(
333 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
334 );
335
336 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
338
339 opt.set_enable_pipelined_write(true);
340
341 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
344
345 opt.set_memtable_prefix_bloom_ratio(0.02);
347
348 DBOptions {
349 options: opt,
350 rw_options: ReadWriteOptions::default(),
351 }
352}
353
354fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
355 let mut block_options = BlockBasedOptions::default();
360 block_options.set_block_size(block_size_bytes);
362 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
364 block_options.set_bloom_filter(10.0, false);
366 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
368 block_options
369}
370
371pub fn read_size_from_env(var_name: &str) -> Option<usize> {
372 env::var(var_name)
373 .ok()?
374 .parse::<usize>()
375 .tap_err(|e| {
376 warn!(
377 "Env var {} does not contain valid usize integer: {}",
378 var_name, e
379 )
380 })
381 .ok()
382}