1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline::CommitterConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
10use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
11
12pub trait Merge: Sized {
14 fn merge(self, other: Self) -> anyhow::Result<Self>;
15}
16
17#[DefaultConfig]
18#[derive(Clone, Default, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct IndexerConfig {
21 pub ingestion: IngestionLayer,
23
24 pub committer: CommitterLayer,
27
28 pub pruner: PrunerLayer,
34
35 pub pipeline: PipelineLayer,
37}
38
39#[DefaultConfig]
48#[derive(Clone, Default, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct IngestionLayer {
51 pub checkpoint_buffer_size: Option<usize>,
52 pub ingest_concurrency: Option<ConcurrencyConfig>,
53 pub retry_interval_ms: Option<u64>,
54 pub streaming_backoff_initial_batch_size: Option<usize>,
55 pub streaming_backoff_max_batch_size: Option<usize>,
56 pub streaming_connection_timeout_ms: Option<u64>,
57 pub streaming_statement_timeout_ms: Option<u64>,
58}
59
60#[DefaultConfig]
61#[derive(Clone, Default, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct SequentialLayer {
64 pub committer: Option<CommitterLayer>,
65 pub checkpoint_lag: Option<u64>,
66 pub fanout: Option<ConcurrencyConfig>,
67 pub min_eager_rows: Option<usize>,
68 pub max_batch_checkpoints: Option<usize>,
69 pub processor_channel_size: Option<usize>,
70}
71
72#[DefaultConfig]
73#[derive(Clone, Default, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ConcurrentLayer {
76 pub committer: Option<CommitterLayer>,
77 pub pruner: Option<PrunerLayer>,
78 pub fanout: Option<ConcurrencyConfig>,
79 pub min_eager_rows: Option<usize>,
80 pub max_pending_rows: Option<usize>,
81 pub max_watermark_updates: Option<usize>,
82 pub processor_channel_size: Option<usize>,
83 pub collector_channel_size: Option<usize>,
84 pub committer_channel_size: Option<usize>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct CommitterLayer {
91 pub write_concurrency: Option<usize>,
92 pub collect_interval_ms: Option<u64>,
93 pub watermark_interval_ms: Option<u64>,
94}
95
96#[DefaultConfig]
97#[derive(Clone, Default, Debug)]
98#[serde(deny_unknown_fields)]
99pub struct PrunerLayer {
100 pub interval_ms: Option<u64>,
101 pub delay_ms: Option<u64>,
102 pub retention: Option<u64>,
103 pub max_chunk_size: Option<u64>,
104 pub prune_concurrency: Option<u64>,
105}
106
107#[DefaultConfig]
108#[derive(Clone, Default, Debug)]
109#[serde(rename_all = "snake_case", deny_unknown_fields)]
110pub struct PipelineLayer {
111 pub sum_displays: Option<SequentialLayer>,
113
114 pub cp_bloom_blocks: Option<ConcurrentLayer>,
116 pub cp_blooms: Option<ConcurrentLayer>,
117 pub cp_sequence_numbers: Option<ConcurrentLayer>,
118 pub ev_emit_mod: Option<ConcurrentLayer>,
119 pub ev_struct_inst: Option<ConcurrentLayer>,
120 pub kv_checkpoints: Option<ConcurrentLayer>,
121 pub kv_epoch_ends: Option<ConcurrentLayer>,
122 pub kv_epoch_starts: Option<ConcurrentLayer>,
123 pub kv_feature_flags: Option<ConcurrentLayer>,
124 pub kv_objects: Option<ConcurrentLayer>,
125 pub kv_packages: Option<ConcurrentLayer>,
126 pub kv_protocol_configs: Option<ConcurrentLayer>,
127 pub kv_transactions: Option<ConcurrentLayer>,
128 pub obj_versions: Option<ConcurrentLayer>,
129 pub tx_affected_addresses: Option<ConcurrentLayer>,
130 pub tx_affected_objects: Option<ConcurrentLayer>,
131 pub tx_balance_changes: Option<ConcurrentLayer>,
132 pub tx_calls: Option<ConcurrentLayer>,
133 pub tx_digests: Option<ConcurrentLayer>,
134 pub tx_kinds: Option<ConcurrentLayer>,
135}
136
137impl IndexerConfig {
138 pub fn example() -> Self {
141 let mut example: Self = Default::default();
142
143 example.ingestion = IngestionConfig::default().into();
144 example.committer = CommitterConfig::default().into();
145 example.pruner = PrunerConfig::default().into();
146 example.pipeline = PipelineLayer::example();
147
148 example
149 }
150
151 pub fn for_test() -> Self {
155 Self::example()
156 .merge(IndexerConfig {
157 ingestion: IngestionLayer {
158 retry_interval_ms: Some(10),
159 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
160 ..Default::default()
161 },
162 committer: CommitterLayer {
163 collect_interval_ms: Some(50),
164 watermark_interval_ms: Some(50),
165 write_concurrency: Some(1),
166 },
167 pruner: PrunerLayer {
168 interval_ms: Some(50),
169 delay_ms: Some(0),
170 ..Default::default()
171 },
172 ..Default::default()
173 })
174 .expect("Merge failed for test configuration")
175 }
176}
177
178impl IngestionLayer {
179 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
180 Ok(IngestionConfig {
181 checkpoint_buffer_size: self
182 .checkpoint_buffer_size
183 .unwrap_or(base.checkpoint_buffer_size),
184 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
185 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
186 streaming_backoff_initial_batch_size: self
187 .streaming_backoff_initial_batch_size
188 .unwrap_or(base.streaming_backoff_initial_batch_size),
189 streaming_backoff_max_batch_size: self
190 .streaming_backoff_max_batch_size
191 .unwrap_or(base.streaming_backoff_max_batch_size),
192 streaming_connection_timeout_ms: self
193 .streaming_connection_timeout_ms
194 .unwrap_or(base.streaming_connection_timeout_ms),
195 streaming_statement_timeout_ms: self
196 .streaming_statement_timeout_ms
197 .unwrap_or(base.streaming_statement_timeout_ms),
198 })
199 }
200}
201
202impl SequentialLayer {
203 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
204 Ok(SequentialConfig {
205 committer: if let Some(committer) = self.committer {
206 committer.finish(base.committer)?
207 } else {
208 base.committer
209 },
210 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
211 fanout: self.fanout.or(base.fanout),
212 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
213 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
214 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
215 })
216 }
217}
218
219impl ConcurrentLayer {
220 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
223 Ok(ConcurrentConfig {
224 committer: if let Some(committer) = self.committer {
225 committer.finish(base.committer)?
226 } else {
227 base.committer
228 },
229 pruner: match (self.pruner, base.pruner) {
230 (None, _) | (_, None) => None,
231 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
232 },
233 fanout: self.fanout.or(base.fanout),
234 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
235 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
236 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
237 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
238 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
239 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
240 })
241 }
242}
243
244impl CommitterLayer {
245 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
246 Ok(CommitterConfig {
247 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
248 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
249 watermark_interval_ms: self
250 .watermark_interval_ms
251 .unwrap_or(base.watermark_interval_ms),
252 watermark_interval_jitter_ms: 0,
253 })
254 }
255}
256
257impl PrunerLayer {
258 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
259 Ok(PrunerConfig {
260 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
261 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
262 retention: self.retention.unwrap_or(base.retention),
263 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
264 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
265 })
266 }
267}
268
269impl PipelineLayer {
270 pub fn example() -> Self {
273 PipelineLayer {
274 cp_blooms: Some(Default::default()),
275 cp_bloom_blocks: Some(Default::default()),
276 sum_displays: Some(Default::default()),
277 cp_sequence_numbers: Some(Default::default()),
278 ev_emit_mod: Some(Default::default()),
279 ev_struct_inst: Some(Default::default()),
280 kv_checkpoints: Some(Default::default()),
281 kv_epoch_ends: Some(Default::default()),
282 kv_epoch_starts: Some(Default::default()),
283 kv_feature_flags: Some(Default::default()),
284 kv_objects: Some(Default::default()),
285 kv_packages: Some(Default::default()),
286 kv_protocol_configs: Some(Default::default()),
287 kv_transactions: Some(Default::default()),
288 obj_versions: Some(Default::default()),
289 tx_affected_addresses: Some(Default::default()),
290 tx_affected_objects: Some(Default::default()),
291 tx_balance_changes: Some(Default::default()),
292 tx_calls: Some(Default::default()),
293 tx_digests: Some(Default::default()),
294 tx_kinds: Some(Default::default()),
295 }
296 }
297}
298
299impl Merge for IndexerConfig {
300 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
301 Ok(IndexerConfig {
302 ingestion: self.ingestion.merge(other.ingestion)?,
303 committer: self.committer.merge(other.committer)?,
304 pruner: self.pruner.merge(other.pruner)?,
305 pipeline: self.pipeline.merge(other.pipeline)?,
306 })
307 }
308}
309
310impl Merge for IngestionLayer {
311 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
312 Ok(IngestionLayer {
313 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
314 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
315 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
316 streaming_backoff_initial_batch_size: other
317 .streaming_backoff_initial_batch_size
318 .or(self.streaming_backoff_initial_batch_size),
319 streaming_backoff_max_batch_size: other
320 .streaming_backoff_max_batch_size
321 .or(self.streaming_backoff_max_batch_size),
322 streaming_connection_timeout_ms: other
323 .streaming_connection_timeout_ms
324 .or(self.streaming_connection_timeout_ms),
325 streaming_statement_timeout_ms: other
326 .streaming_statement_timeout_ms
327 .or(self.streaming_statement_timeout_ms),
328 })
329 }
330}
331
332impl Merge for SequentialLayer {
333 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
334 Ok(SequentialLayer {
335 committer: self.committer.merge(other.committer)?,
336 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
337 fanout: other.fanout.or(self.fanout),
338 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
339 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
340 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
341 })
342 }
343}
344
345impl Merge for ConcurrentLayer {
346 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
347 Ok(ConcurrentLayer {
348 committer: self.committer.merge(other.committer)?,
349 pruner: self.pruner.merge(other.pruner)?,
350 fanout: other.fanout.or(self.fanout),
351 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
352 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
353 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
354 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
355 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
356 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
357 })
358 }
359}
360
361impl Merge for CommitterLayer {
362 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
363 Ok(CommitterLayer {
364 write_concurrency: other.write_concurrency.or(self.write_concurrency),
365 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
366 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
367 })
368 }
369}
370
371impl Merge for PrunerLayer {
372 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
375 Ok(PrunerLayer {
376 interval_ms: other.interval_ms.or(self.interval_ms),
377 delay_ms: other.delay_ms.or(self.delay_ms),
378 retention: match (other.retention, self.retention) {
379 (Some(a), Some(b)) => Some(a.max(b)),
380 (Some(a), _) | (_, Some(a)) => Some(a),
381 (None, None) => None,
382 },
383 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
384 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
385 })
386 }
387}
388
389impl Merge for PipelineLayer {
390 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
391 Ok(PipelineLayer {
392 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
393 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
394 sum_displays: self.sum_displays.merge(other.sum_displays)?,
395 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
396 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
397 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
398 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
399 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
400 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
401 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
402 kv_objects: self.kv_objects.merge(other.kv_objects)?,
403 kv_packages: self.kv_packages.merge(other.kv_packages)?,
404 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
405 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
406 obj_versions: self.obj_versions.merge(other.obj_versions)?,
407 tx_affected_addresses: self
408 .tx_affected_addresses
409 .merge(other.tx_affected_addresses)?,
410 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
411 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
412 tx_calls: self.tx_calls.merge(other.tx_calls)?,
413 tx_digests: self.tx_digests.merge(other.tx_digests)?,
414 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
415 })
416 }
417}
418
419impl<T: Merge> Merge for Option<T> {
420 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
421 Ok(match (self, other) {
422 (Some(a), Some(b)) => Some(a.merge(b)?),
423 (Some(a), _) | (_, Some(a)) => Some(a),
424 (None, None) => None,
425 })
426 }
427}
428
429impl From<IngestionConfig> for IngestionLayer {
430 fn from(config: IngestionConfig) -> Self {
431 Self {
432 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
433 ingest_concurrency: Some(config.ingest_concurrency),
434 retry_interval_ms: Some(config.retry_interval_ms),
435 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
436 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
437 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
438 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
439 }
440 }
441}
442
443impl From<SequentialConfig> for SequentialLayer {
444 fn from(config: SequentialConfig) -> Self {
445 Self {
446 committer: Some(config.committer.into()),
447 checkpoint_lag: Some(config.checkpoint_lag),
448 fanout: config.fanout,
449 min_eager_rows: config.min_eager_rows,
450 max_batch_checkpoints: config.max_batch_checkpoints,
451 processor_channel_size: config.processor_channel_size,
452 }
453 }
454}
455
456impl From<ConcurrentConfig> for ConcurrentLayer {
457 fn from(config: ConcurrentConfig) -> Self {
458 Self {
459 committer: Some(config.committer.into()),
460 pruner: config.pruner.map(Into::into),
461 fanout: config.fanout,
462 min_eager_rows: config.min_eager_rows,
463 max_pending_rows: config.max_pending_rows,
464 max_watermark_updates: config.max_watermark_updates,
465 processor_channel_size: config.processor_channel_size,
466 collector_channel_size: config.collector_channel_size,
467 committer_channel_size: config.committer_channel_size,
468 }
469 }
470}
471
472impl From<CommitterConfig> for CommitterLayer {
473 fn from(config: CommitterConfig) -> Self {
474 Self {
475 write_concurrency: Some(config.write_concurrency),
476 collect_interval_ms: Some(config.collect_interval_ms),
477 watermark_interval_ms: Some(config.watermark_interval_ms),
478 }
479 }
480}
481
482impl From<PrunerConfig> for PrunerLayer {
483 fn from(config: PrunerConfig) -> Self {
484 Self {
485 interval_ms: Some(config.interval_ms),
486 delay_ms: Some(config.delay_ms),
487 retention: Some(config.retention),
488 max_chunk_size: Some(config.max_chunk_size),
489 prune_concurrency: Some(config.prune_concurrency),
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 macro_rules! assert_matches {
499 ($value:expr, $pattern:pat $(,)?) => {
500 let value = $value;
501 assert!(
502 matches!(value, $pattern),
503 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
504 stringify!($pattern)
505 );
506 };
507 }
508
509 #[test]
510 fn merge_recursive() {
511 let this = PipelineLayer {
512 sum_displays: Some(SequentialLayer {
513 committer: Some(CommitterLayer {
514 write_concurrency: Some(10),
515 collect_interval_ms: Some(1000),
516 watermark_interval_ms: None,
517 }),
518 checkpoint_lag: Some(100),
519 ..Default::default()
520 }),
521 ev_emit_mod: Some(ConcurrentLayer {
522 committer: Some(CommitterLayer {
523 write_concurrency: Some(5),
524 collect_interval_ms: Some(500),
525 watermark_interval_ms: None,
526 }),
527 ..Default::default()
528 }),
529 ..Default::default()
530 };
531
532 let that = PipelineLayer {
533 sum_displays: Some(SequentialLayer {
534 committer: Some(CommitterLayer {
535 write_concurrency: Some(5),
536 collect_interval_ms: None,
537 watermark_interval_ms: Some(500),
538 }),
539 checkpoint_lag: Some(200),
540 ..Default::default()
541 }),
542 ev_emit_mod: None,
543 ..Default::default()
544 };
545
546 let this_then_that = this.clone().merge(that.clone()).unwrap();
547 let that_then_this = that.clone().merge(this.clone()).unwrap();
548
549 assert_matches!(
550 this_then_that,
551 PipelineLayer {
552 sum_displays: Some(SequentialLayer {
553 committer: Some(CommitterLayer {
554 write_concurrency: Some(5),
555 collect_interval_ms: Some(1000),
556 watermark_interval_ms: Some(500),
557 ..
558 }),
559 checkpoint_lag: Some(200),
560 ..
561 }),
562 ev_emit_mod: Some(ConcurrentLayer {
563 committer: Some(CommitterLayer {
564 write_concurrency: Some(5),
565 collect_interval_ms: Some(500),
566 watermark_interval_ms: None,
567 ..
568 }),
569 pruner: None,
570 ..
571 }),
572 ..
573 },
574 );
575
576 assert_matches!(
577 that_then_this,
578 PipelineLayer {
579 sum_displays: Some(SequentialLayer {
580 committer: Some(CommitterLayer {
581 write_concurrency: Some(10),
582 collect_interval_ms: Some(1000),
583 watermark_interval_ms: Some(500),
584 ..
585 }),
586 checkpoint_lag: Some(100),
587 ..
588 }),
589 ev_emit_mod: Some(ConcurrentLayer {
590 committer: Some(CommitterLayer {
591 write_concurrency: Some(5),
592 collect_interval_ms: Some(500),
593 watermark_interval_ms: None,
594 ..
595 }),
596 pruner: None,
597 ..
598 }),
599 ..
600 },
601 );
602 }
603
604 #[test]
605 fn merge_pruner() {
606 let this = PrunerLayer {
607 interval_ms: None,
608 delay_ms: Some(100),
609 retention: Some(200),
610 max_chunk_size: Some(300),
611 prune_concurrency: Some(1),
612 };
613
614 let that = PrunerLayer {
615 interval_ms: Some(400),
616 delay_ms: None,
617 retention: Some(500),
618 max_chunk_size: Some(600),
619 prune_concurrency: Some(2),
620 };
621
622 let this_then_that = this.clone().merge(that.clone()).unwrap();
623 let that_then_this = that.clone().merge(this.clone()).unwrap();
624
625 assert_matches!(
626 this_then_that,
627 PrunerLayer {
628 interval_ms: Some(400),
629 delay_ms: Some(100),
630 retention: Some(500),
631 max_chunk_size: Some(600),
632 prune_concurrency: Some(2),
633 },
634 );
635
636 assert_matches!(
637 that_then_this,
638 PrunerLayer {
639 interval_ms: Some(400),
640 delay_ms: Some(100),
641 retention: Some(500),
642 max_chunk_size: Some(300),
643 prune_concurrency: Some(1),
644 },
645 );
646 }
647
648 #[test]
649 fn finish_concurrent_unpruned_override() {
650 let layer = ConcurrentLayer {
651 committer: None,
652 pruner: None,
653 ..Default::default()
654 };
655
656 let base = ConcurrentConfig {
657 committer: CommitterConfig {
658 write_concurrency: 5,
659 collect_interval_ms: 50,
660 watermark_interval_ms: 500,
661 ..Default::default()
662 },
663 pruner: Some(PrunerConfig::default()),
664 ..Default::default()
665 };
666
667 assert_matches!(
668 layer.finish(base).unwrap(),
669 ConcurrentConfig {
670 committer: CommitterConfig {
671 write_concurrency: 5,
672 collect_interval_ms: 50,
673 watermark_interval_ms: 500,
674 ..
675 },
676 pruner: None,
677 ..
678 },
679 );
680 }
681
682 #[test]
683 fn finish_concurrent_no_pruner() {
684 let layer = ConcurrentLayer {
685 committer: None,
686 pruner: None,
687 ..Default::default()
688 };
689
690 let base = ConcurrentConfig {
691 committer: CommitterConfig {
692 write_concurrency: 5,
693 collect_interval_ms: 50,
694 watermark_interval_ms: 500,
695 ..Default::default()
696 },
697 pruner: None,
698 ..Default::default()
699 };
700
701 assert_matches!(
702 layer.finish(base).unwrap(),
703 ConcurrentConfig {
704 committer: CommitterConfig {
705 write_concurrency: 5,
706 collect_interval_ms: 50,
707 watermark_interval_ms: 500,
708 ..
709 },
710 pruner: None,
711 ..
712 },
713 );
714 }
715
716 #[test]
717 fn finish_concurrent_pruner() {
718 let layer = ConcurrentLayer {
719 committer: None,
720 pruner: Some(PrunerLayer {
721 interval_ms: Some(1000),
722 ..Default::default()
723 }),
724 ..Default::default()
725 };
726
727 let base = ConcurrentConfig {
728 committer: CommitterConfig {
729 write_concurrency: 5,
730 collect_interval_ms: 50,
731 watermark_interval_ms: 500,
732 ..Default::default()
733 },
734 pruner: Some(PrunerConfig {
735 interval_ms: 100,
736 delay_ms: 200,
737 retention: 300,
738 max_chunk_size: 400,
739 prune_concurrency: 1,
740 }),
741 ..Default::default()
742 };
743
744 assert_matches!(
745 layer.finish(base).unwrap(),
746 ConcurrentConfig {
747 committer: CommitterConfig {
748 write_concurrency: 5,
749 collect_interval_ms: 50,
750 watermark_interval_ms: 500,
751 ..
752 },
753 pruner: Some(PrunerConfig {
754 interval_ms: 1000,
755 delay_ms: 200,
756 retention: 300,
757 max_chunk_size: 400,
758 prune_concurrency: 1,
759 }),
760 ..
761 },
762 );
763 }
764
765 #[test]
766 fn detect_unrecognized_fields() {
767 let err = toml::from_str::<IndexerConfig>(
768 r#"
769 i_dont_exist = "foo"
770 "#,
771 )
772 .unwrap_err();
773
774 assert!(
775 err.to_string().contains("i_dont_exist"),
776 "Unexpected error: {err}"
777 );
778 }
779}