typed_store/rocks/
options.rs1use rocksdb::{BlockBasedOptions, Cache, MergeOperands, ReadOptions, compaction_filter::Decision};
5use std::collections::BTreeMap;
6use std::env;
7use tap::TapFallible;
8use tracing::{info, warn};
9
10const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
13const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
14
15const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
18const DEFAULT_DB_WAL_SIZE: usize = 1024;
19
20const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
22const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
23const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
24const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
25const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
26const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
27const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
28const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
29const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
30
31const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
33const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
34
35#[derive(Clone, Debug)]
36pub struct ReadWriteOptions {
37 pub ignore_range_deletions: bool,
38 pub log_value_hash: bool,
41}
42
43impl ReadWriteOptions {
44 pub fn readopts(&self) -> ReadOptions {
45 let mut readopts = ReadOptions::default();
46 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
47 readopts
48 }
49
50 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
51 self.ignore_range_deletions = ignore;
52 self
53 }
54
55 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
56 self.log_value_hash = log_value_hash;
57 self
58 }
59}
60
61impl Default for ReadWriteOptions {
62 fn default() -> Self {
63 Self {
64 ignore_range_deletions: true,
65 log_value_hash: false,
66 }
67 }
68}
69
70#[derive(Default, Clone)]
71pub struct DBOptions {
72 pub options: rocksdb::Options,
73 pub rw_options: ReadWriteOptions,
74}
75
76#[derive(Clone)]
77pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
78impl DBMapTableConfigMap {
79 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
80 Self(map)
81 }
82
83 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
84 self.0.clone()
85 }
86}
87
88impl DBOptions {
89 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
93 self.options
95 .optimize_for_point_lookup(block_cache_size_mb as u64);
96 self
97 }
98
99 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
102 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
103 info!("Large value blob storage optimization is disabled via env var.");
104 return self;
105 }
106
107 self.options.set_enable_blob_files(true);
109 self.options
110 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
111 self.options.set_enable_blob_gc(true);
112 self.options.set_min_blob_size(min_blob_size);
115
116 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
118 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
119 * 1024
120 * 1024;
121 self.options.set_write_buffer_size(write_buffer_size);
122 let target_file_size_base = 64 << 20;
125 self.options
126 .set_target_file_size_base(target_file_size_base);
127 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
129 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
130 self.options
131 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
132
133 self
134 }
135
136 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
138 self.options
139 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
140 self
141 }
142
143 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
145 self.options
146 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
147 self.options
148 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
149 self
150 }
151
152 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
154 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
156 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
157 * 1024
158 * 1024;
159 self.options.set_write_buffer_size(write_buffer_size);
160 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
162 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
163 self.options
164 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
165 self.options
167 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
168
169 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
171 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
172 self.options.set_level_zero_file_num_compaction_trigger(
173 max_level_zero_file_num.try_into().unwrap(),
174 );
175 self.options.set_level_zero_slowdown_writes_trigger(
176 (max_level_zero_file_num * 12).try_into().unwrap(),
177 );
178 self.options
179 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
180
181 self.options.set_target_file_size_base(
183 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
184 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
185 * 1024
186 * 1024,
187 );
188
189 self.options
191 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
192
193 self
194 }
195
196 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
200 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
202 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
203 * 1024
204 * 1024;
205 self.options.set_write_buffer_size(write_buffer_size);
206 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
208 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
209 self.options
210 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
211 self.options
213 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
214
215 self.options
217 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
218 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
219 compaction_options.set_max_size_amplification_percent(10000);
220 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
221 self.options
222 .set_universal_compaction_options(&compaction_options);
223
224 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
225 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
226 self.options.set_level_zero_file_num_compaction_trigger(
227 max_level_zero_file_num.try_into().unwrap(),
228 );
229 self.options.set_level_zero_slowdown_writes_trigger(
230 (max_level_zero_file_num * 12).try_into().unwrap(),
231 );
232 self.options
233 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
234
235 self.options.set_target_file_size_base(
237 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
238 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
239 * 1024
240 * 1024,
241 );
242
243 self.options
245 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
246
247 self
248 }
249
250 pub fn set_block_options(
252 mut self,
253 block_cache_size_mb: usize,
254 block_size_bytes: usize,
255 ) -> DBOptions {
256 self.options
257 .set_block_based_table_factory(&get_block_options(
258 block_cache_size_mb,
259 block_size_bytes,
260 ));
261 self
262 }
263
264 pub fn disable_write_throttling(mut self) -> DBOptions {
266 self.options.set_soft_pending_compaction_bytes_limit(0);
267 self.options.set_hard_pending_compaction_bytes_limit(0);
268 self.options.set_level_zero_slowdown_writes_trigger(512);
269 self.options.set_level_zero_stop_writes_trigger(1024);
270 self
271 }
272
273 pub fn set_merge_operator_associative<F>(mut self, name: &str, merge_fn: F) -> DBOptions
274 where
275 F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>>
276 + Send
277 + Sync
278 + Clone
279 + 'static,
280 {
281 self.options.set_merge_operator_associative(name, merge_fn);
282 self
283 }
284
285 pub fn set_compaction_filter<F>(mut self, name: &str, filter_fn: F) -> DBOptions
286 where
287 F: FnMut(u32, &[u8], &[u8]) -> Decision + Send + 'static,
288 {
289 self.options.set_compaction_filter(name, filter_fn);
290 self
291 }
292}
293
294pub fn default_db_options() -> DBOptions {
296 let mut opt = rocksdb::Options::default();
297
298 if let Some(limit) = fdlimit::raise_fd_limit() {
301 opt.set_max_open_files((limit / 8) as i32);
303 }
304
305 opt.set_table_cache_num_shard_bits(10);
308
309 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
311 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
312 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
313
314 opt.set_db_write_buffer_size(
324 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
325 * 1024
326 * 1024,
327 );
328 opt.set_max_total_wal_size(
329 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
330 );
331
332 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
334
335 opt.set_enable_pipelined_write(true);
336
337 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
340
341 opt.set_memtable_prefix_bloom_ratio(0.02);
343
344 DBOptions {
345 options: opt,
346 rw_options: ReadWriteOptions::default(),
347 }
348}
349
350fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
351 let mut block_options = BlockBasedOptions::default();
356 block_options.set_block_size(block_size_bytes);
358 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
360 block_options.set_bloom_filter(10.0, false);
362 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
364 block_options
365}
366
367pub fn read_size_from_env(var_name: &str) -> Option<usize> {
368 env::var(var_name)
369 .ok()?
370 .parse::<usize>()
371 .tap_err(|e| {
372 warn!(
373 "Env var {} does not contain valid usize integer: {}",
374 var_name, e
375 )
376 })
377 .ok()
378}