typed_store/rocks/
options.rs1use rocksdb::{BlockBasedOptions, Cache, MergeOperands, ReadOptions, compaction_filter::Decision};
5use std::collections::BTreeMap;
6use std::env;
7use tap::TapFallible;
8use tracing::{info, warn};
9
10const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
13const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
14
15const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
18const DEFAULT_DB_WAL_SIZE: usize = 1024;
19
20const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
22const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
23const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
24const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
25const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
26const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
27const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
28const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
29const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
30
31const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
33const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
34
35#[derive(Clone, Debug)]
36pub struct ReadWriteOptions {
37 pub ignore_range_deletions: bool,
38 pub log_value_hash: bool,
41 pub sync_writes: bool,
43}
44
45impl ReadWriteOptions {
46 pub fn readopts(&self) -> ReadOptions {
47 let mut readopts = ReadOptions::default();
48 readopts.set_ignore_range_deletions(self.ignore_range_deletions);
49 readopts
50 }
51
52 pub fn set_ignore_range_deletions(mut self, ignore: bool) -> Self {
53 self.ignore_range_deletions = ignore;
54 self
55 }
56
57 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
58 self.log_value_hash = log_value_hash;
59 self
60 }
61
62 pub fn set_sync_writes(mut self, sync_writes: bool) -> Self {
63 self.sync_writes = sync_writes;
64 self
65 }
66}
67
68impl Default for ReadWriteOptions {
69 fn default() -> Self {
70 Self {
71 ignore_range_deletions: true,
72 log_value_hash: false,
73 sync_writes: false,
74 }
75 }
76}
77
78#[derive(Default, Clone)]
79pub struct DBOptions {
80 pub options: rocksdb::Options,
81 pub rw_options: ReadWriteOptions,
82}
83
84#[derive(Clone)]
85pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
86impl DBMapTableConfigMap {
87 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
88 Self(map)
89 }
90
91 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
92 self.0.clone()
93 }
94}
95
96impl DBOptions {
97 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
101 self.options
103 .optimize_for_point_lookup(block_cache_size_mb as u64);
104 self
105 }
106
107 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
110 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
111 info!("Large value blob storage optimization is disabled via env var.");
112 return self;
113 }
114
115 self.options.set_enable_blob_files(true);
117 self.options
118 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
119 self.options.set_enable_blob_gc(true);
120 self.options.set_min_blob_size(min_blob_size);
123
124 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
126 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
127 * 1024
128 * 1024;
129 self.options.set_write_buffer_size(write_buffer_size);
130 let target_file_size_base = 64 << 20;
133 self.options
134 .set_target_file_size_base(target_file_size_base);
135 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
137 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
138 self.options
139 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
140
141 self
142 }
143
144 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
146 self.options
147 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
148 self
149 }
150
151 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
153 self.options
154 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
155 self.options
156 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
157 self
158 }
159
160 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
162 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
164 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
165 * 1024
166 * 1024;
167 self.options.set_write_buffer_size(write_buffer_size);
168 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
170 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
171 self.options
172 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
173 self.options
175 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
176
177 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
179 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
180 self.options.set_level_zero_file_num_compaction_trigger(
181 max_level_zero_file_num.try_into().unwrap(),
182 );
183 self.options.set_level_zero_slowdown_writes_trigger(
184 (max_level_zero_file_num * 12).try_into().unwrap(),
185 );
186 self.options
187 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
188
189 self.options.set_target_file_size_base(
191 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
192 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
193 * 1024
194 * 1024,
195 );
196
197 self.options
199 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
200
201 self
202 }
203
204 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
208 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
210 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
211 * 1024
212 * 1024;
213 self.options.set_write_buffer_size(write_buffer_size);
214 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
216 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
217 self.options
218 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
219 self.options
221 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
222
223 self.options
225 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
226 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
227 compaction_options.set_max_size_amplification_percent(10000);
228 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
229 self.options
230 .set_universal_compaction_options(&compaction_options);
231
232 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
233 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
234 self.options.set_level_zero_file_num_compaction_trigger(
235 max_level_zero_file_num.try_into().unwrap(),
236 );
237 self.options.set_level_zero_slowdown_writes_trigger(
238 (max_level_zero_file_num * 12).try_into().unwrap(),
239 );
240 self.options
241 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
242
243 self.options.set_target_file_size_base(
245 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
246 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
247 * 1024
248 * 1024,
249 );
250
251 self.options
253 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
254
255 self
256 }
257
258 pub fn set_block_options(
260 mut self,
261 block_cache_size_mb: usize,
262 block_size_bytes: usize,
263 ) -> DBOptions {
264 self.options
265 .set_block_based_table_factory(&get_block_options(
266 block_cache_size_mb,
267 block_size_bytes,
268 ));
269 self
270 }
271
272 pub fn disable_write_throttling(mut self) -> DBOptions {
274 self.options.set_soft_pending_compaction_bytes_limit(0);
275 self.options.set_hard_pending_compaction_bytes_limit(0);
276 self.options.set_level_zero_slowdown_writes_trigger(512);
277 self.options.set_level_zero_stop_writes_trigger(1024);
278 self
279 }
280
281 pub fn set_sync_writes(mut self, sync_writes: bool) -> DBOptions {
282 self.rw_options.sync_writes = sync_writes;
283 self
284 }
285
286 pub fn set_merge_operator_associative<F>(mut self, name: &str, merge_fn: F) -> DBOptions
287 where
288 F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>>
289 + Send
290 + Sync
291 + Clone
292 + 'static,
293 {
294 self.options.set_merge_operator_associative(name, merge_fn);
295 self
296 }
297
298 pub fn set_compaction_filter<F>(mut self, name: &str, filter_fn: F) -> DBOptions
299 where
300 F: FnMut(u32, &[u8], &[u8]) -> Decision + Send + 'static,
301 {
302 self.options.set_compaction_filter(name, filter_fn);
303 self
304 }
305}
306
307pub fn default_db_options() -> DBOptions {
309 let mut opt = rocksdb::Options::default();
310
311 if let Some(limit) = fdlimit::raise_fd_limit() {
314 opt.set_max_open_files((limit / 8) as i32);
316 }
317
318 opt.set_table_cache_num_shard_bits(10);
321
322 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
324 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
325 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
326
327 opt.set_db_write_buffer_size(
337 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
338 * 1024
339 * 1024,
340 );
341 opt.set_max_total_wal_size(
342 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
343 );
344
345 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
347
348 opt.set_enable_pipelined_write(true);
349
350 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
353
354 opt.set_memtable_prefix_bloom_ratio(0.02);
356
357 DBOptions {
358 options: opt,
359 rw_options: ReadWriteOptions::default(),
360 }
361}
362
363fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
364 let mut block_options = BlockBasedOptions::default();
369 block_options.set_block_size(block_size_bytes);
371 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
373 block_options.set_bloom_filter(10.0, false);
375 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
377 block_options
378}
379
380pub fn read_size_from_env(var_name: &str) -> Option<usize> {
381 env::var(var_name)
382 .ok()?
383 .parse::<usize>()
384 .tap_err(|e| {
385 warn!(
386 "Env var {} does not contain valid usize integer: {}",
387 var_name, e
388 )
389 })
390 .ok()
391}