1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11pub trait Merge: Sized {
13 fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20 pub ingestion: IngestionLayer,
22
23 pub committer: CommitterLayer,
26
27 pub pruner: PrunerLayer,
33
34 pub pipeline: PipelineLayer,
36}
37
38#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50 pub checkpoint_buffer_size: Option<usize>,
51 pub ingest_concurrency: Option<usize>,
52 pub retry_interval_ms: Option<u64>,
53 pub streaming_backoff_initial_batch_size: Option<usize>,
54 pub streaming_backoff_max_batch_size: Option<usize>,
55 pub streaming_connection_timeout_ms: Option<u64>,
56 pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63 pub committer: Option<CommitterLayer>,
64 pub checkpoint_lag: Option<u64>,
65 pub fanout: Option<usize>,
66 pub min_eager_rows: Option<usize>,
67 pub max_batch_checkpoints: Option<usize>,
68}
69
70#[DefaultConfig]
71#[derive(Clone, Default, Debug)]
72#[serde(deny_unknown_fields)]
73pub struct ConcurrentLayer {
74 pub committer: Option<CommitterLayer>,
75 pub pruner: Option<PrunerLayer>,
76 pub fanout: Option<usize>,
77 pub min_eager_rows: Option<usize>,
78 pub max_pending_rows: Option<usize>,
79 pub max_watermark_updates: Option<usize>,
80}
81
82#[DefaultConfig]
83#[derive(Clone, Default, Debug)]
84#[serde(deny_unknown_fields)]
85pub struct CommitterLayer {
86 pub write_concurrency: Option<usize>,
87 pub collect_interval_ms: Option<u64>,
88 pub watermark_interval_ms: Option<u64>,
89}
90
91#[DefaultConfig]
92#[derive(Clone, Default, Debug)]
93#[serde(deny_unknown_fields)]
94pub struct PrunerLayer {
95 pub interval_ms: Option<u64>,
96 pub delay_ms: Option<u64>,
97 pub retention: Option<u64>,
98 pub max_chunk_size: Option<u64>,
99 pub prune_concurrency: Option<u64>,
100}
101
102#[DefaultConfig]
103#[derive(Clone, Default, Debug)]
104#[serde(rename_all = "snake_case", deny_unknown_fields)]
105pub struct PipelineLayer {
106 pub sum_displays: Option<SequentialLayer>,
108
109 pub cp_bloom_blocks: Option<ConcurrentLayer>,
111 pub cp_blooms: Option<ConcurrentLayer>,
112 pub coin_balance_buckets: Option<ConcurrentLayer>,
113 pub obj_info: Option<ConcurrentLayer>,
114 pub cp_sequence_numbers: Option<ConcurrentLayer>,
115 pub ev_emit_mod: Option<ConcurrentLayer>,
116 pub ev_struct_inst: Option<ConcurrentLayer>,
117 pub kv_checkpoints: Option<ConcurrentLayer>,
118 pub kv_epoch_ends: Option<ConcurrentLayer>,
119 pub kv_epoch_starts: Option<ConcurrentLayer>,
120 pub kv_feature_flags: Option<ConcurrentLayer>,
121 pub kv_objects: Option<ConcurrentLayer>,
122 pub kv_packages: Option<ConcurrentLayer>,
123 pub kv_protocol_configs: Option<ConcurrentLayer>,
124 pub kv_transactions: Option<ConcurrentLayer>,
125 pub obj_versions: Option<ConcurrentLayer>,
126 pub tx_affected_addresses: Option<ConcurrentLayer>,
127 pub tx_affected_objects: Option<ConcurrentLayer>,
128 pub tx_balance_changes: Option<ConcurrentLayer>,
129 pub tx_calls: Option<ConcurrentLayer>,
130 pub tx_digests: Option<ConcurrentLayer>,
131 pub tx_kinds: Option<ConcurrentLayer>,
132}
133
134impl IndexerConfig {
135 pub fn example() -> Self {
138 let mut example: Self = Default::default();
139
140 example.ingestion = IngestionConfig::default().into();
141 example.committer = CommitterConfig::default().into();
142 example.pruner = PrunerConfig::default().into();
143 example.pipeline = PipelineLayer::example();
144
145 example
146 }
147
148 pub fn for_test() -> Self {
152 Self::example()
153 .merge(IndexerConfig {
154 ingestion: IngestionLayer {
155 retry_interval_ms: Some(10),
156 ingest_concurrency: Some(1),
157 ..Default::default()
158 },
159 committer: CommitterLayer {
160 collect_interval_ms: Some(50),
161 watermark_interval_ms: Some(50),
162 write_concurrency: Some(1),
163 },
164 pruner: PrunerLayer {
165 interval_ms: Some(50),
166 delay_ms: Some(0),
167 ..Default::default()
168 },
169 ..Default::default()
170 })
171 .expect("Merge failed for test configuration")
172 }
173}
174
175impl IngestionLayer {
176 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
177 Ok(IngestionConfig {
178 checkpoint_buffer_size: self
179 .checkpoint_buffer_size
180 .unwrap_or(base.checkpoint_buffer_size),
181 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
182 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
183 streaming_backoff_initial_batch_size: self
184 .streaming_backoff_initial_batch_size
185 .unwrap_or(base.streaming_backoff_initial_batch_size),
186 streaming_backoff_max_batch_size: self
187 .streaming_backoff_max_batch_size
188 .unwrap_or(base.streaming_backoff_max_batch_size),
189 streaming_connection_timeout_ms: self
190 .streaming_connection_timeout_ms
191 .unwrap_or(base.streaming_connection_timeout_ms),
192 streaming_statement_timeout_ms: self
193 .streaming_statement_timeout_ms
194 .unwrap_or(base.streaming_statement_timeout_ms),
195 })
196 }
197}
198
199impl SequentialLayer {
200 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
201 Ok(SequentialConfig {
202 committer: if let Some(committer) = self.committer {
203 committer.finish(base.committer)?
204 } else {
205 base.committer
206 },
207 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
208 fanout: self.fanout.or(base.fanout),
209 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
210 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
211 })
212 }
213}
214
215impl ConcurrentLayer {
216 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
219 Ok(ConcurrentConfig {
220 committer: if let Some(committer) = self.committer {
221 committer.finish(base.committer)?
222 } else {
223 base.committer
224 },
225 pruner: match (self.pruner, base.pruner) {
226 (None, _) | (_, None) => None,
227 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
228 },
229 fanout: self.fanout.or(base.fanout),
230 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
231 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
232 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
233 })
234 }
235}
236
237impl CommitterLayer {
238 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
239 Ok(CommitterConfig {
240 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
241 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
242 watermark_interval_ms: self
243 .watermark_interval_ms
244 .unwrap_or(base.watermark_interval_ms),
245 watermark_interval_jitter_ms: 0,
246 })
247 }
248}
249
250impl PrunerLayer {
251 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
252 Ok(PrunerConfig {
253 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
254 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
255 retention: self.retention.unwrap_or(base.retention),
256 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
257 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
258 })
259 }
260}
261
262impl PipelineLayer {
263 pub fn example() -> Self {
266 PipelineLayer {
267 cp_blooms: Some(Default::default()),
268 cp_bloom_blocks: Some(Default::default()),
269 coin_balance_buckets: Some(Default::default()),
270 obj_info: Some(Default::default()),
271 sum_displays: Some(Default::default()),
272 cp_sequence_numbers: Some(Default::default()),
273 ev_emit_mod: Some(Default::default()),
274 ev_struct_inst: Some(Default::default()),
275 kv_checkpoints: Some(Default::default()),
276 kv_epoch_ends: Some(Default::default()),
277 kv_epoch_starts: Some(Default::default()),
278 kv_feature_flags: Some(Default::default()),
279 kv_objects: Some(Default::default()),
280 kv_packages: Some(Default::default()),
281 kv_protocol_configs: Some(Default::default()),
282 kv_transactions: Some(Default::default()),
283 obj_versions: Some(Default::default()),
284 tx_affected_addresses: Some(Default::default()),
285 tx_affected_objects: Some(Default::default()),
286 tx_balance_changes: Some(Default::default()),
287 tx_calls: Some(Default::default()),
288 tx_digests: Some(Default::default()),
289 tx_kinds: Some(Default::default()),
290 }
291 }
292}
293
294impl Merge for IndexerConfig {
295 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
296 Ok(IndexerConfig {
297 ingestion: self.ingestion.merge(other.ingestion)?,
298 committer: self.committer.merge(other.committer)?,
299 pruner: self.pruner.merge(other.pruner)?,
300 pipeline: self.pipeline.merge(other.pipeline)?,
301 })
302 }
303}
304
305impl Merge for IngestionLayer {
306 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
307 Ok(IngestionLayer {
308 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
309 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
310 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
311 streaming_backoff_initial_batch_size: other
312 .streaming_backoff_initial_batch_size
313 .or(self.streaming_backoff_initial_batch_size),
314 streaming_backoff_max_batch_size: other
315 .streaming_backoff_max_batch_size
316 .or(self.streaming_backoff_max_batch_size),
317 streaming_connection_timeout_ms: other
318 .streaming_connection_timeout_ms
319 .or(self.streaming_connection_timeout_ms),
320 streaming_statement_timeout_ms: other
321 .streaming_statement_timeout_ms
322 .or(self.streaming_statement_timeout_ms),
323 })
324 }
325}
326
327impl Merge for SequentialLayer {
328 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
329 Ok(SequentialLayer {
330 committer: self.committer.merge(other.committer)?,
331 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
332 fanout: other.fanout.or(self.fanout),
333 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
334 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
335 })
336 }
337}
338
339impl Merge for ConcurrentLayer {
340 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
341 Ok(ConcurrentLayer {
342 committer: self.committer.merge(other.committer)?,
343 pruner: self.pruner.merge(other.pruner)?,
344 fanout: other.fanout.or(self.fanout),
345 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
346 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
347 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
348 })
349 }
350}
351
352impl Merge for CommitterLayer {
353 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
354 Ok(CommitterLayer {
355 write_concurrency: other.write_concurrency.or(self.write_concurrency),
356 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
357 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
358 })
359 }
360}
361
362impl Merge for PrunerLayer {
363 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
366 Ok(PrunerLayer {
367 interval_ms: other.interval_ms.or(self.interval_ms),
368 delay_ms: other.delay_ms.or(self.delay_ms),
369 retention: match (other.retention, self.retention) {
370 (Some(a), Some(b)) => Some(a.max(b)),
371 (Some(a), _) | (_, Some(a)) => Some(a),
372 (None, None) => None,
373 },
374 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
375 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
376 })
377 }
378}
379
380impl Merge for PipelineLayer {
381 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
382 Ok(PipelineLayer {
383 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
384 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
385 coin_balance_buckets: self
386 .coin_balance_buckets
387 .merge(other.coin_balance_buckets)?,
388 obj_info: self.obj_info.merge(other.obj_info)?,
389 sum_displays: self.sum_displays.merge(other.sum_displays)?,
390 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
391 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
392 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
393 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
394 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
395 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
396 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
397 kv_objects: self.kv_objects.merge(other.kv_objects)?,
398 kv_packages: self.kv_packages.merge(other.kv_packages)?,
399 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
400 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
401 obj_versions: self.obj_versions.merge(other.obj_versions)?,
402 tx_affected_addresses: self
403 .tx_affected_addresses
404 .merge(other.tx_affected_addresses)?,
405 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
406 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
407 tx_calls: self.tx_calls.merge(other.tx_calls)?,
408 tx_digests: self.tx_digests.merge(other.tx_digests)?,
409 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
410 })
411 }
412}
413
414impl<T: Merge> Merge for Option<T> {
415 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
416 Ok(match (self, other) {
417 (Some(a), Some(b)) => Some(a.merge(b)?),
418 (Some(a), _) | (_, Some(a)) => Some(a),
419 (None, None) => None,
420 })
421 }
422}
423
424impl From<IngestionConfig> for IngestionLayer {
425 fn from(config: IngestionConfig) -> Self {
426 Self {
427 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
428 ingest_concurrency: Some(config.ingest_concurrency),
429 retry_interval_ms: Some(config.retry_interval_ms),
430 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
431 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
432 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
433 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
434 }
435 }
436}
437
438impl From<SequentialConfig> for SequentialLayer {
439 fn from(config: SequentialConfig) -> Self {
440 Self {
441 committer: Some(config.committer.into()),
442 checkpoint_lag: Some(config.checkpoint_lag),
443 fanout: config.fanout,
444 min_eager_rows: config.min_eager_rows,
445 max_batch_checkpoints: config.max_batch_checkpoints,
446 }
447 }
448}
449
450impl From<ConcurrentConfig> for ConcurrentLayer {
451 fn from(config: ConcurrentConfig) -> Self {
452 Self {
453 committer: Some(config.committer.into()),
454 pruner: config.pruner.map(Into::into),
455 fanout: config.fanout,
456 min_eager_rows: config.min_eager_rows,
457 max_pending_rows: config.max_pending_rows,
458 max_watermark_updates: config.max_watermark_updates,
459 }
460 }
461}
462
463impl From<CommitterConfig> for CommitterLayer {
464 fn from(config: CommitterConfig) -> Self {
465 Self {
466 write_concurrency: Some(config.write_concurrency),
467 collect_interval_ms: Some(config.collect_interval_ms),
468 watermark_interval_ms: Some(config.watermark_interval_ms),
469 }
470 }
471}
472
473impl From<PrunerConfig> for PrunerLayer {
474 fn from(config: PrunerConfig) -> Self {
475 Self {
476 interval_ms: Some(config.interval_ms),
477 delay_ms: Some(config.delay_ms),
478 retention: Some(config.retention),
479 max_chunk_size: Some(config.max_chunk_size),
480 prune_concurrency: Some(config.prune_concurrency),
481 }
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488
489 macro_rules! assert_matches {
490 ($value:expr, $pattern:pat $(,)?) => {
491 let value = $value;
492 assert!(
493 matches!(value, $pattern),
494 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
495 stringify!($pattern)
496 );
497 };
498 }
499
500 #[test]
501 fn merge_recursive() {
502 let this = PipelineLayer {
503 sum_displays: Some(SequentialLayer {
504 committer: Some(CommitterLayer {
505 write_concurrency: Some(10),
506 collect_interval_ms: Some(1000),
507 watermark_interval_ms: None,
508 }),
509 checkpoint_lag: Some(100),
510 ..Default::default()
511 }),
512 ev_emit_mod: Some(ConcurrentLayer {
513 committer: Some(CommitterLayer {
514 write_concurrency: Some(5),
515 collect_interval_ms: Some(500),
516 watermark_interval_ms: None,
517 }),
518 ..Default::default()
519 }),
520 ..Default::default()
521 };
522
523 let that = PipelineLayer {
524 sum_displays: Some(SequentialLayer {
525 committer: Some(CommitterLayer {
526 write_concurrency: Some(5),
527 collect_interval_ms: None,
528 watermark_interval_ms: Some(500),
529 }),
530 checkpoint_lag: Some(200),
531 ..Default::default()
532 }),
533 ev_emit_mod: None,
534 ..Default::default()
535 };
536
537 let this_then_that = this.clone().merge(that.clone()).unwrap();
538 let that_then_this = that.clone().merge(this.clone()).unwrap();
539
540 assert_matches!(
541 this_then_that,
542 PipelineLayer {
543 sum_displays: Some(SequentialLayer {
544 committer: Some(CommitterLayer {
545 write_concurrency: Some(5),
546 collect_interval_ms: Some(1000),
547 watermark_interval_ms: Some(500),
548 ..
549 }),
550 checkpoint_lag: Some(200),
551 ..
552 }),
553 ev_emit_mod: Some(ConcurrentLayer {
554 committer: Some(CommitterLayer {
555 write_concurrency: Some(5),
556 collect_interval_ms: Some(500),
557 watermark_interval_ms: None,
558 ..
559 }),
560 pruner: None,
561 ..
562 }),
563 ..
564 },
565 );
566
567 assert_matches!(
568 that_then_this,
569 PipelineLayer {
570 sum_displays: Some(SequentialLayer {
571 committer: Some(CommitterLayer {
572 write_concurrency: Some(10),
573 collect_interval_ms: Some(1000),
574 watermark_interval_ms: Some(500),
575 ..
576 }),
577 checkpoint_lag: Some(100),
578 ..
579 }),
580 ev_emit_mod: Some(ConcurrentLayer {
581 committer: Some(CommitterLayer {
582 write_concurrency: Some(5),
583 collect_interval_ms: Some(500),
584 watermark_interval_ms: None,
585 ..
586 }),
587 pruner: None,
588 ..
589 }),
590 ..
591 },
592 );
593 }
594
595 #[test]
596 fn merge_pruner() {
597 let this = PrunerLayer {
598 interval_ms: None,
599 delay_ms: Some(100),
600 retention: Some(200),
601 max_chunk_size: Some(300),
602 prune_concurrency: Some(1),
603 };
604
605 let that = PrunerLayer {
606 interval_ms: Some(400),
607 delay_ms: None,
608 retention: Some(500),
609 max_chunk_size: Some(600),
610 prune_concurrency: Some(2),
611 };
612
613 let this_then_that = this.clone().merge(that.clone()).unwrap();
614 let that_then_this = that.clone().merge(this.clone()).unwrap();
615
616 assert_matches!(
617 this_then_that,
618 PrunerLayer {
619 interval_ms: Some(400),
620 delay_ms: Some(100),
621 retention: Some(500),
622 max_chunk_size: Some(600),
623 prune_concurrency: Some(2),
624 },
625 );
626
627 assert_matches!(
628 that_then_this,
629 PrunerLayer {
630 interval_ms: Some(400),
631 delay_ms: Some(100),
632 retention: Some(500),
633 max_chunk_size: Some(300),
634 prune_concurrency: Some(1),
635 },
636 );
637 }
638
639 #[test]
640 fn finish_concurrent_unpruned_override() {
641 let layer = ConcurrentLayer {
642 committer: None,
643 pruner: None,
644 ..Default::default()
645 };
646
647 let base = ConcurrentConfig {
648 committer: CommitterConfig {
649 write_concurrency: 5,
650 collect_interval_ms: 50,
651 watermark_interval_ms: 500,
652 ..Default::default()
653 },
654 pruner: Some(PrunerConfig::default()),
655 ..Default::default()
656 };
657
658 assert_matches!(
659 layer.finish(base).unwrap(),
660 ConcurrentConfig {
661 committer: CommitterConfig {
662 write_concurrency: 5,
663 collect_interval_ms: 50,
664 watermark_interval_ms: 500,
665 ..
666 },
667 pruner: None,
668 ..
669 },
670 );
671 }
672
673 #[test]
674 fn finish_concurrent_no_pruner() {
675 let layer = ConcurrentLayer {
676 committer: None,
677 pruner: None,
678 ..Default::default()
679 };
680
681 let base = ConcurrentConfig {
682 committer: CommitterConfig {
683 write_concurrency: 5,
684 collect_interval_ms: 50,
685 watermark_interval_ms: 500,
686 ..Default::default()
687 },
688 pruner: None,
689 ..Default::default()
690 };
691
692 assert_matches!(
693 layer.finish(base).unwrap(),
694 ConcurrentConfig {
695 committer: CommitterConfig {
696 write_concurrency: 5,
697 collect_interval_ms: 50,
698 watermark_interval_ms: 500,
699 ..
700 },
701 pruner: None,
702 ..
703 },
704 );
705 }
706
707 #[test]
708 fn finish_concurrent_pruner() {
709 let layer = ConcurrentLayer {
710 committer: None,
711 pruner: Some(PrunerLayer {
712 interval_ms: Some(1000),
713 ..Default::default()
714 }),
715 ..Default::default()
716 };
717
718 let base = ConcurrentConfig {
719 committer: CommitterConfig {
720 write_concurrency: 5,
721 collect_interval_ms: 50,
722 watermark_interval_ms: 500,
723 ..Default::default()
724 },
725 pruner: Some(PrunerConfig {
726 interval_ms: 100,
727 delay_ms: 200,
728 retention: 300,
729 max_chunk_size: 400,
730 prune_concurrency: 1,
731 }),
732 ..Default::default()
733 };
734
735 assert_matches!(
736 layer.finish(base).unwrap(),
737 ConcurrentConfig {
738 committer: CommitterConfig {
739 write_concurrency: 5,
740 collect_interval_ms: 50,
741 watermark_interval_ms: 500,
742 ..
743 },
744 pruner: Some(PrunerConfig {
745 interval_ms: 1000,
746 delay_ms: 200,
747 retention: 300,
748 max_chunk_size: 400,
749 prune_concurrency: 1,
750 }),
751 ..
752 },
753 );
754 }
755
756 #[test]
757 fn detect_unrecognized_fields() {
758 let err = toml::from_str::<IndexerConfig>(
759 r#"
760 i_dont_exist = "foo"
761 "#,
762 )
763 .unwrap_err();
764
765 assert!(
766 err.to_string().contains("i_dont_exist"),
767 "Unexpected error: {err}"
768 );
769 }
770}