1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline::CommitterConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
10use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
11
12pub trait Merge: Sized {
14 fn merge(self, other: Self) -> anyhow::Result<Self>;
15}
16
17#[DefaultConfig]
18#[derive(Clone, Default, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct IndexerConfig {
21 pub ingestion: IngestionLayer,
23
24 pub committer: CommitterLayer,
27
28 pub pruner: PrunerLayer,
34
35 pub pipeline: PipelineLayer,
37}
38
39#[DefaultConfig]
48#[derive(Clone, Default, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct IngestionLayer {
51 pub checkpoint_buffer_size: Option<usize>,
52 pub ingest_concurrency: Option<ConcurrencyConfig>,
53 pub retry_interval_ms: Option<u64>,
54 pub streaming_backoff_initial_batch_size: Option<usize>,
55 pub streaming_backoff_max_batch_size: Option<usize>,
56 pub streaming_connection_timeout_ms: Option<u64>,
57 pub streaming_statement_timeout_ms: Option<u64>,
58}
59
60#[DefaultConfig]
61#[derive(Clone, Default, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct SequentialLayer {
64 pub committer: Option<CommitterLayer>,
65 pub checkpoint_lag: Option<u64>,
66 pub fanout: Option<ConcurrencyConfig>,
67 pub min_eager_rows: Option<usize>,
68 pub max_batch_checkpoints: Option<usize>,
69 pub processor_channel_size: Option<usize>,
70}
71
72#[DefaultConfig]
73#[derive(Clone, Default, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ConcurrentLayer {
76 pub committer: Option<CommitterLayer>,
77 pub pruner: Option<PrunerLayer>,
78 pub fanout: Option<ConcurrencyConfig>,
79 pub min_eager_rows: Option<usize>,
80 pub max_pending_rows: Option<usize>,
81 pub max_watermark_updates: Option<usize>,
82 pub processor_channel_size: Option<usize>,
83 pub collector_channel_size: Option<usize>,
84 pub committer_channel_size: Option<usize>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct CommitterLayer {
91 pub write_concurrency: Option<usize>,
92 pub collect_interval_ms: Option<u64>,
93 pub watermark_interval_ms: Option<u64>,
94}
95
96#[DefaultConfig]
97#[derive(Clone, Default, Debug)]
98#[serde(deny_unknown_fields)]
99pub struct PrunerLayer {
100 pub interval_ms: Option<u64>,
101 pub delay_ms: Option<u64>,
102 pub retention: Option<u64>,
103 pub max_chunk_size: Option<u64>,
104 pub prune_concurrency: Option<u64>,
105}
106
107#[DefaultConfig]
108#[derive(Clone, Default, Debug)]
109#[serde(rename_all = "snake_case", deny_unknown_fields)]
110pub struct PipelineLayer {
111 pub sum_displays: Option<SequentialLayer>,
113
114 pub cp_bloom_blocks: Option<ConcurrentLayer>,
116 pub cp_blooms: Option<ConcurrentLayer>,
117 pub coin_balance_buckets: Option<ConcurrentLayer>,
118 pub obj_info: Option<ConcurrentLayer>,
119 pub cp_sequence_numbers: Option<ConcurrentLayer>,
120 pub ev_emit_mod: Option<ConcurrentLayer>,
121 pub ev_struct_inst: Option<ConcurrentLayer>,
122 pub kv_checkpoints: Option<ConcurrentLayer>,
123 pub kv_epoch_ends: Option<ConcurrentLayer>,
124 pub kv_epoch_starts: Option<ConcurrentLayer>,
125 pub kv_feature_flags: Option<ConcurrentLayer>,
126 pub kv_objects: Option<ConcurrentLayer>,
127 pub kv_packages: Option<ConcurrentLayer>,
128 pub kv_protocol_configs: Option<ConcurrentLayer>,
129 pub kv_transactions: Option<ConcurrentLayer>,
130 pub obj_versions: Option<ConcurrentLayer>,
131 pub tx_affected_addresses: Option<ConcurrentLayer>,
132 pub tx_affected_objects: Option<ConcurrentLayer>,
133 pub tx_balance_changes: Option<ConcurrentLayer>,
134 pub tx_calls: Option<ConcurrentLayer>,
135 pub tx_digests: Option<ConcurrentLayer>,
136 pub tx_kinds: Option<ConcurrentLayer>,
137}
138
139impl IndexerConfig {
140 pub fn example() -> Self {
143 let mut example: Self = Default::default();
144
145 example.ingestion = IngestionConfig::default().into();
146 example.committer = CommitterConfig::default().into();
147 example.pruner = PrunerConfig::default().into();
148 example.pipeline = PipelineLayer::example();
149
150 example
151 }
152
153 pub fn for_test() -> Self {
157 Self::example()
158 .merge(IndexerConfig {
159 ingestion: IngestionLayer {
160 retry_interval_ms: Some(10),
161 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
162 ..Default::default()
163 },
164 committer: CommitterLayer {
165 collect_interval_ms: Some(50),
166 watermark_interval_ms: Some(50),
167 write_concurrency: Some(1),
168 },
169 pruner: PrunerLayer {
170 interval_ms: Some(50),
171 delay_ms: Some(0),
172 ..Default::default()
173 },
174 ..Default::default()
175 })
176 .expect("Merge failed for test configuration")
177 }
178}
179
180impl IngestionLayer {
181 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
182 Ok(IngestionConfig {
183 checkpoint_buffer_size: self
184 .checkpoint_buffer_size
185 .unwrap_or(base.checkpoint_buffer_size),
186 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
187 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
188 streaming_backoff_initial_batch_size: self
189 .streaming_backoff_initial_batch_size
190 .unwrap_or(base.streaming_backoff_initial_batch_size),
191 streaming_backoff_max_batch_size: self
192 .streaming_backoff_max_batch_size
193 .unwrap_or(base.streaming_backoff_max_batch_size),
194 streaming_connection_timeout_ms: self
195 .streaming_connection_timeout_ms
196 .unwrap_or(base.streaming_connection_timeout_ms),
197 streaming_statement_timeout_ms: self
198 .streaming_statement_timeout_ms
199 .unwrap_or(base.streaming_statement_timeout_ms),
200 })
201 }
202}
203
204impl SequentialLayer {
205 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
206 Ok(SequentialConfig {
207 committer: if let Some(committer) = self.committer {
208 committer.finish(base.committer)?
209 } else {
210 base.committer
211 },
212 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
213 fanout: self.fanout.or(base.fanout),
214 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
215 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
216 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
217 })
218 }
219}
220
221impl ConcurrentLayer {
222 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
225 Ok(ConcurrentConfig {
226 committer: if let Some(committer) = self.committer {
227 committer.finish(base.committer)?
228 } else {
229 base.committer
230 },
231 pruner: match (self.pruner, base.pruner) {
232 (None, _) | (_, None) => None,
233 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
234 },
235 fanout: self.fanout.or(base.fanout),
236 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
237 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
238 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
239 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
240 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
241 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
242 })
243 }
244}
245
246impl CommitterLayer {
247 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
248 Ok(CommitterConfig {
249 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
250 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
251 watermark_interval_ms: self
252 .watermark_interval_ms
253 .unwrap_or(base.watermark_interval_ms),
254 watermark_interval_jitter_ms: 0,
255 })
256 }
257}
258
259impl PrunerLayer {
260 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
261 Ok(PrunerConfig {
262 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
263 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
264 retention: self.retention.unwrap_or(base.retention),
265 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
266 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
267 })
268 }
269}
270
271impl PipelineLayer {
272 pub fn example() -> Self {
275 PipelineLayer {
276 cp_blooms: Some(Default::default()),
277 cp_bloom_blocks: Some(Default::default()),
278 coin_balance_buckets: Some(Default::default()),
279 obj_info: Some(Default::default()),
280 sum_displays: Some(Default::default()),
281 cp_sequence_numbers: Some(Default::default()),
282 ev_emit_mod: Some(Default::default()),
283 ev_struct_inst: Some(Default::default()),
284 kv_checkpoints: Some(Default::default()),
285 kv_epoch_ends: Some(Default::default()),
286 kv_epoch_starts: Some(Default::default()),
287 kv_feature_flags: Some(Default::default()),
288 kv_objects: Some(Default::default()),
289 kv_packages: Some(Default::default()),
290 kv_protocol_configs: Some(Default::default()),
291 kv_transactions: Some(Default::default()),
292 obj_versions: Some(Default::default()),
293 tx_affected_addresses: Some(Default::default()),
294 tx_affected_objects: Some(Default::default()),
295 tx_balance_changes: Some(Default::default()),
296 tx_calls: Some(Default::default()),
297 tx_digests: Some(Default::default()),
298 tx_kinds: Some(Default::default()),
299 }
300 }
301}
302
303impl Merge for IndexerConfig {
304 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
305 Ok(IndexerConfig {
306 ingestion: self.ingestion.merge(other.ingestion)?,
307 committer: self.committer.merge(other.committer)?,
308 pruner: self.pruner.merge(other.pruner)?,
309 pipeline: self.pipeline.merge(other.pipeline)?,
310 })
311 }
312}
313
314impl Merge for IngestionLayer {
315 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
316 Ok(IngestionLayer {
317 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
318 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
319 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
320 streaming_backoff_initial_batch_size: other
321 .streaming_backoff_initial_batch_size
322 .or(self.streaming_backoff_initial_batch_size),
323 streaming_backoff_max_batch_size: other
324 .streaming_backoff_max_batch_size
325 .or(self.streaming_backoff_max_batch_size),
326 streaming_connection_timeout_ms: other
327 .streaming_connection_timeout_ms
328 .or(self.streaming_connection_timeout_ms),
329 streaming_statement_timeout_ms: other
330 .streaming_statement_timeout_ms
331 .or(self.streaming_statement_timeout_ms),
332 })
333 }
334}
335
336impl Merge for SequentialLayer {
337 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
338 Ok(SequentialLayer {
339 committer: self.committer.merge(other.committer)?,
340 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
341 fanout: other.fanout.or(self.fanout),
342 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
343 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
344 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
345 })
346 }
347}
348
349impl Merge for ConcurrentLayer {
350 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
351 Ok(ConcurrentLayer {
352 committer: self.committer.merge(other.committer)?,
353 pruner: self.pruner.merge(other.pruner)?,
354 fanout: other.fanout.or(self.fanout),
355 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
356 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
357 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
358 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
359 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
360 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
361 })
362 }
363}
364
365impl Merge for CommitterLayer {
366 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
367 Ok(CommitterLayer {
368 write_concurrency: other.write_concurrency.or(self.write_concurrency),
369 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
370 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
371 })
372 }
373}
374
375impl Merge for PrunerLayer {
376 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
379 Ok(PrunerLayer {
380 interval_ms: other.interval_ms.or(self.interval_ms),
381 delay_ms: other.delay_ms.or(self.delay_ms),
382 retention: match (other.retention, self.retention) {
383 (Some(a), Some(b)) => Some(a.max(b)),
384 (Some(a), _) | (_, Some(a)) => Some(a),
385 (None, None) => None,
386 },
387 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
388 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
389 })
390 }
391}
392
393impl Merge for PipelineLayer {
394 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
395 Ok(PipelineLayer {
396 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
397 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
398 coin_balance_buckets: self
399 .coin_balance_buckets
400 .merge(other.coin_balance_buckets)?,
401 obj_info: self.obj_info.merge(other.obj_info)?,
402 sum_displays: self.sum_displays.merge(other.sum_displays)?,
403 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
404 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
405 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
406 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
407 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
408 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
409 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
410 kv_objects: self.kv_objects.merge(other.kv_objects)?,
411 kv_packages: self.kv_packages.merge(other.kv_packages)?,
412 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
413 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
414 obj_versions: self.obj_versions.merge(other.obj_versions)?,
415 tx_affected_addresses: self
416 .tx_affected_addresses
417 .merge(other.tx_affected_addresses)?,
418 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
419 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
420 tx_calls: self.tx_calls.merge(other.tx_calls)?,
421 tx_digests: self.tx_digests.merge(other.tx_digests)?,
422 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
423 })
424 }
425}
426
427impl<T: Merge> Merge for Option<T> {
428 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
429 Ok(match (self, other) {
430 (Some(a), Some(b)) => Some(a.merge(b)?),
431 (Some(a), _) | (_, Some(a)) => Some(a),
432 (None, None) => None,
433 })
434 }
435}
436
437impl From<IngestionConfig> for IngestionLayer {
438 fn from(config: IngestionConfig) -> Self {
439 Self {
440 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
441 ingest_concurrency: Some(config.ingest_concurrency),
442 retry_interval_ms: Some(config.retry_interval_ms),
443 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
444 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
445 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
446 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
447 }
448 }
449}
450
451impl From<SequentialConfig> for SequentialLayer {
452 fn from(config: SequentialConfig) -> Self {
453 Self {
454 committer: Some(config.committer.into()),
455 checkpoint_lag: Some(config.checkpoint_lag),
456 fanout: config.fanout,
457 min_eager_rows: config.min_eager_rows,
458 max_batch_checkpoints: config.max_batch_checkpoints,
459 processor_channel_size: config.processor_channel_size,
460 }
461 }
462}
463
464impl From<ConcurrentConfig> for ConcurrentLayer {
465 fn from(config: ConcurrentConfig) -> Self {
466 Self {
467 committer: Some(config.committer.into()),
468 pruner: config.pruner.map(Into::into),
469 fanout: config.fanout,
470 min_eager_rows: config.min_eager_rows,
471 max_pending_rows: config.max_pending_rows,
472 max_watermark_updates: config.max_watermark_updates,
473 processor_channel_size: config.processor_channel_size,
474 collector_channel_size: config.collector_channel_size,
475 committer_channel_size: config.committer_channel_size,
476 }
477 }
478}
479
480impl From<CommitterConfig> for CommitterLayer {
481 fn from(config: CommitterConfig) -> Self {
482 Self {
483 write_concurrency: Some(config.write_concurrency),
484 collect_interval_ms: Some(config.collect_interval_ms),
485 watermark_interval_ms: Some(config.watermark_interval_ms),
486 }
487 }
488}
489
490impl From<PrunerConfig> for PrunerLayer {
491 fn from(config: PrunerConfig) -> Self {
492 Self {
493 interval_ms: Some(config.interval_ms),
494 delay_ms: Some(config.delay_ms),
495 retention: Some(config.retention),
496 max_chunk_size: Some(config.max_chunk_size),
497 prune_concurrency: Some(config.prune_concurrency),
498 }
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 macro_rules! assert_matches {
507 ($value:expr, $pattern:pat $(,)?) => {
508 let value = $value;
509 assert!(
510 matches!(value, $pattern),
511 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
512 stringify!($pattern)
513 );
514 };
515 }
516
517 #[test]
518 fn merge_recursive() {
519 let this = PipelineLayer {
520 sum_displays: Some(SequentialLayer {
521 committer: Some(CommitterLayer {
522 write_concurrency: Some(10),
523 collect_interval_ms: Some(1000),
524 watermark_interval_ms: None,
525 }),
526 checkpoint_lag: Some(100),
527 ..Default::default()
528 }),
529 ev_emit_mod: Some(ConcurrentLayer {
530 committer: Some(CommitterLayer {
531 write_concurrency: Some(5),
532 collect_interval_ms: Some(500),
533 watermark_interval_ms: None,
534 }),
535 ..Default::default()
536 }),
537 ..Default::default()
538 };
539
540 let that = PipelineLayer {
541 sum_displays: Some(SequentialLayer {
542 committer: Some(CommitterLayer {
543 write_concurrency: Some(5),
544 collect_interval_ms: None,
545 watermark_interval_ms: Some(500),
546 }),
547 checkpoint_lag: Some(200),
548 ..Default::default()
549 }),
550 ev_emit_mod: None,
551 ..Default::default()
552 };
553
554 let this_then_that = this.clone().merge(that.clone()).unwrap();
555 let that_then_this = that.clone().merge(this.clone()).unwrap();
556
557 assert_matches!(
558 this_then_that,
559 PipelineLayer {
560 sum_displays: Some(SequentialLayer {
561 committer: Some(CommitterLayer {
562 write_concurrency: Some(5),
563 collect_interval_ms: Some(1000),
564 watermark_interval_ms: Some(500),
565 ..
566 }),
567 checkpoint_lag: Some(200),
568 ..
569 }),
570 ev_emit_mod: Some(ConcurrentLayer {
571 committer: Some(CommitterLayer {
572 write_concurrency: Some(5),
573 collect_interval_ms: Some(500),
574 watermark_interval_ms: None,
575 ..
576 }),
577 pruner: None,
578 ..
579 }),
580 ..
581 },
582 );
583
584 assert_matches!(
585 that_then_this,
586 PipelineLayer {
587 sum_displays: Some(SequentialLayer {
588 committer: Some(CommitterLayer {
589 write_concurrency: Some(10),
590 collect_interval_ms: Some(1000),
591 watermark_interval_ms: Some(500),
592 ..
593 }),
594 checkpoint_lag: Some(100),
595 ..
596 }),
597 ev_emit_mod: Some(ConcurrentLayer {
598 committer: Some(CommitterLayer {
599 write_concurrency: Some(5),
600 collect_interval_ms: Some(500),
601 watermark_interval_ms: None,
602 ..
603 }),
604 pruner: None,
605 ..
606 }),
607 ..
608 },
609 );
610 }
611
612 #[test]
613 fn merge_pruner() {
614 let this = PrunerLayer {
615 interval_ms: None,
616 delay_ms: Some(100),
617 retention: Some(200),
618 max_chunk_size: Some(300),
619 prune_concurrency: Some(1),
620 };
621
622 let that = PrunerLayer {
623 interval_ms: Some(400),
624 delay_ms: None,
625 retention: Some(500),
626 max_chunk_size: Some(600),
627 prune_concurrency: Some(2),
628 };
629
630 let this_then_that = this.clone().merge(that.clone()).unwrap();
631 let that_then_this = that.clone().merge(this.clone()).unwrap();
632
633 assert_matches!(
634 this_then_that,
635 PrunerLayer {
636 interval_ms: Some(400),
637 delay_ms: Some(100),
638 retention: Some(500),
639 max_chunk_size: Some(600),
640 prune_concurrency: Some(2),
641 },
642 );
643
644 assert_matches!(
645 that_then_this,
646 PrunerLayer {
647 interval_ms: Some(400),
648 delay_ms: Some(100),
649 retention: Some(500),
650 max_chunk_size: Some(300),
651 prune_concurrency: Some(1),
652 },
653 );
654 }
655
656 #[test]
657 fn finish_concurrent_unpruned_override() {
658 let layer = ConcurrentLayer {
659 committer: None,
660 pruner: None,
661 ..Default::default()
662 };
663
664 let base = ConcurrentConfig {
665 committer: CommitterConfig {
666 write_concurrency: 5,
667 collect_interval_ms: 50,
668 watermark_interval_ms: 500,
669 ..Default::default()
670 },
671 pruner: Some(PrunerConfig::default()),
672 ..Default::default()
673 };
674
675 assert_matches!(
676 layer.finish(base).unwrap(),
677 ConcurrentConfig {
678 committer: CommitterConfig {
679 write_concurrency: 5,
680 collect_interval_ms: 50,
681 watermark_interval_ms: 500,
682 ..
683 },
684 pruner: None,
685 ..
686 },
687 );
688 }
689
690 #[test]
691 fn finish_concurrent_no_pruner() {
692 let layer = ConcurrentLayer {
693 committer: None,
694 pruner: None,
695 ..Default::default()
696 };
697
698 let base = ConcurrentConfig {
699 committer: CommitterConfig {
700 write_concurrency: 5,
701 collect_interval_ms: 50,
702 watermark_interval_ms: 500,
703 ..Default::default()
704 },
705 pruner: None,
706 ..Default::default()
707 };
708
709 assert_matches!(
710 layer.finish(base).unwrap(),
711 ConcurrentConfig {
712 committer: CommitterConfig {
713 write_concurrency: 5,
714 collect_interval_ms: 50,
715 watermark_interval_ms: 500,
716 ..
717 },
718 pruner: None,
719 ..
720 },
721 );
722 }
723
724 #[test]
725 fn finish_concurrent_pruner() {
726 let layer = ConcurrentLayer {
727 committer: None,
728 pruner: Some(PrunerLayer {
729 interval_ms: Some(1000),
730 ..Default::default()
731 }),
732 ..Default::default()
733 };
734
735 let base = ConcurrentConfig {
736 committer: CommitterConfig {
737 write_concurrency: 5,
738 collect_interval_ms: 50,
739 watermark_interval_ms: 500,
740 ..Default::default()
741 },
742 pruner: Some(PrunerConfig {
743 interval_ms: 100,
744 delay_ms: 200,
745 retention: 300,
746 max_chunk_size: 400,
747 prune_concurrency: 1,
748 }),
749 ..Default::default()
750 };
751
752 assert_matches!(
753 layer.finish(base).unwrap(),
754 ConcurrentConfig {
755 committer: CommitterConfig {
756 write_concurrency: 5,
757 collect_interval_ms: 50,
758 watermark_interval_ms: 500,
759 ..
760 },
761 pruner: Some(PrunerConfig {
762 interval_ms: 1000,
763 delay_ms: 200,
764 retention: 300,
765 max_chunk_size: 400,
766 prune_concurrency: 1,
767 }),
768 ..
769 },
770 );
771 }
772
773 #[test]
774 fn detect_unrecognized_fields() {
775 let err = toml::from_str::<IndexerConfig>(
776 r#"
777 i_dont_exist = "foo"
778 "#,
779 )
780 .unwrap_err();
781
782 assert!(
783 err.to_string().contains("i_dont_exist"),
784 "Unexpected error: {err}"
785 );
786 }
787}