1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline;
8use sui_indexer_alt_framework::pipeline::CommitterConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
10use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
11use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
12use tracing::warn;
13
14pub trait Merge: Sized {
16 fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23 pub ingestion: IngestionLayer,
25
26 pub committer: CommitterLayer,
29
30 pub pruner: PrunerLayer,
36
37 pub pipeline: PipelineLayer,
39}
40
41#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53 pub ingest_concurrency: Option<ConcurrencyConfig>,
54 pub retry_interval_ms: Option<u64>,
55 pub streaming_backoff_initial_batch_size: Option<usize>,
56 pub streaming_backoff_max_batch_size: Option<usize>,
57 pub streaming_connection_timeout_ms: Option<u64>,
58 pub streaming_statement_timeout_ms: Option<u64>,
59
60 pub checkpoint_buffer_size: Option<usize>,
63}
64
65#[DefaultConfig]
66#[derive(Clone, Default, Debug)]
67#[serde(deny_unknown_fields)]
68pub struct SequentialLayer {
69 pub committer: Option<CommitterLayer>,
70 pub ingestion: Option<PipelineIngestionLayer>,
71 pub fanout: Option<ConcurrencyConfig>,
72 pub min_eager_rows: Option<usize>,
73 pub max_batch_checkpoints: Option<usize>,
74 pub processor_channel_size: Option<usize>,
75}
76
77#[DefaultConfig]
78#[derive(Clone, Default, Debug)]
79#[serde(deny_unknown_fields)]
80pub struct ConcurrentLayer {
81 pub committer: Option<CommitterLayer>,
82 pub ingestion: Option<PipelineIngestionLayer>,
83 pub pruner: Option<PrunerLayer>,
84 pub fanout: Option<ConcurrencyConfig>,
85 pub min_eager_rows: Option<usize>,
86 pub max_pending_rows: Option<usize>,
87 pub max_watermark_updates: Option<usize>,
88 pub processor_channel_size: Option<usize>,
89 pub collector_channel_size: Option<usize>,
90 pub committer_channel_size: Option<usize>,
91}
92
93#[DefaultConfig]
94#[derive(Clone, Default, Debug)]
95#[serde(deny_unknown_fields)]
96pub struct PipelineIngestionLayer {
97 pub subscriber_channel_size: Option<usize>,
98}
99
100#[DefaultConfig]
101#[derive(Clone, Default, Debug)]
102#[serde(deny_unknown_fields)]
103pub struct CommitterLayer {
104 pub write_concurrency: Option<usize>,
105 pub collect_interval_ms: Option<u64>,
106 pub watermark_interval_ms: Option<u64>,
107}
108
109#[DefaultConfig]
110#[derive(Clone, Default, Debug)]
111#[serde(deny_unknown_fields)]
112pub struct PrunerLayer {
113 pub interval_ms: Option<u64>,
114 pub delay_ms: Option<u64>,
115 pub retention: Option<u64>,
116 pub max_chunk_size: Option<u64>,
117 pub prune_concurrency: Option<u64>,
118}
119
120#[DefaultConfig]
121#[derive(Clone, Default, Debug)]
122#[serde(rename_all = "snake_case", deny_unknown_fields)]
123pub struct PipelineLayer {
124 pub sum_displays: Option<SequentialLayer>,
126
127 pub cp_bloom_blocks: Option<ConcurrentLayer>,
129 pub cp_blooms: Option<ConcurrentLayer>,
130 pub cp_sequence_numbers: Option<ConcurrentLayer>,
131 pub ev_emit_mod: Option<ConcurrentLayer>,
132 pub ev_struct_inst: Option<ConcurrentLayer>,
133 pub kv_checkpoints: Option<ConcurrentLayer>,
134 pub kv_epoch_ends: Option<ConcurrentLayer>,
135 pub kv_epoch_starts: Option<ConcurrentLayer>,
136 pub kv_feature_flags: Option<ConcurrentLayer>,
137 pub kv_objects: Option<ConcurrentLayer>,
138 pub kv_packages: Option<ConcurrentLayer>,
139 pub kv_protocol_configs: Option<ConcurrentLayer>,
140 pub kv_transactions: Option<ConcurrentLayer>,
141 pub obj_versions: Option<ConcurrentLayer>,
142 pub tx_affected_addresses: Option<ConcurrentLayer>,
143 pub tx_affected_objects: Option<ConcurrentLayer>,
144 pub tx_balance_changes: Option<ConcurrentLayer>,
145 pub tx_calls: Option<ConcurrentLayer>,
146 pub tx_digests: Option<ConcurrentLayer>,
147 pub tx_kinds: Option<ConcurrentLayer>,
148}
149
150impl IndexerConfig {
151 pub fn example() -> Self {
154 let mut example: Self = Default::default();
155
156 example.ingestion = IngestionConfig::default().into();
157 example.committer = CommitterConfig::default().into();
158 example.pruner = PrunerConfig::default().into();
159 example.pipeline = PipelineLayer::example();
160
161 example
162 }
163
164 pub fn for_test() -> Self {
168 Self::example()
169 .merge(IndexerConfig {
170 ingestion: IngestionLayer {
171 retry_interval_ms: Some(10),
172 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
173 ..Default::default()
174 },
175 committer: CommitterLayer {
176 collect_interval_ms: Some(50),
177 watermark_interval_ms: Some(50),
178 write_concurrency: Some(1),
179 },
180 pruner: PrunerLayer {
181 interval_ms: Some(50),
182 delay_ms: Some(0),
183 ..Default::default()
184 },
185 ..Default::default()
186 })
187 .expect("Merge failed for test configuration")
188 }
189}
190
191impl IngestionLayer {
192 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
193 if self.checkpoint_buffer_size.is_some() {
194 warn!(
195 "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
196 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
197 section if you need to override the default."
198 );
199 }
200
201 Ok(IngestionConfig {
202 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
203 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
204 streaming_backoff_initial_batch_size: self
205 .streaming_backoff_initial_batch_size
206 .unwrap_or(base.streaming_backoff_initial_batch_size),
207 streaming_backoff_max_batch_size: self
208 .streaming_backoff_max_batch_size
209 .unwrap_or(base.streaming_backoff_max_batch_size),
210 streaming_connection_timeout_ms: self
211 .streaming_connection_timeout_ms
212 .unwrap_or(base.streaming_connection_timeout_ms),
213 streaming_statement_timeout_ms: self
214 .streaming_statement_timeout_ms
215 .unwrap_or(base.streaming_statement_timeout_ms),
216 })
217 }
218}
219
220impl SequentialLayer {
221 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
222 Ok(SequentialConfig {
223 committer: if let Some(committer) = self.committer {
224 committer.finish(base.committer)?
225 } else {
226 base.committer
227 },
228 ingestion: if let Some(ingestion) = self.ingestion {
229 ingestion.finish(base.ingestion)
230 } else {
231 base.ingestion
232 },
233 fanout: self.fanout.or(base.fanout),
234 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
235 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
236 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
237 })
238 }
239}
240
241impl ConcurrentLayer {
242 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
245 Ok(ConcurrentConfig {
246 committer: if let Some(committer) = self.committer {
247 committer.finish(base.committer)?
248 } else {
249 base.committer
250 },
251 ingestion: if let Some(ingestion) = self.ingestion {
252 ingestion.finish(base.ingestion)
253 } else {
254 base.ingestion
255 },
256 pruner: match (self.pruner, base.pruner) {
257 (None, _) | (_, None) => None,
258 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
259 },
260 fanout: self.fanout.or(base.fanout),
261 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
262 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
263 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
264 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
265 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
266 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
267 })
268 }
269}
270
271impl PipelineIngestionLayer {
272 pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
273 pipeline::IngestionConfig {
274 subscriber_channel_size: self
275 .subscriber_channel_size
276 .or(base.subscriber_channel_size),
277 }
278 }
279}
280
281impl CommitterLayer {
282 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
283 Ok(CommitterConfig {
284 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
285 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
286 watermark_interval_ms: self
287 .watermark_interval_ms
288 .unwrap_or(base.watermark_interval_ms),
289 watermark_interval_jitter_ms: 0,
290 })
291 }
292}
293
294impl PrunerLayer {
295 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
296 Ok(PrunerConfig {
297 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
298 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
299 retention: self.retention.unwrap_or(base.retention),
300 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
301 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
302 })
303 }
304}
305
306impl PipelineLayer {
307 pub fn example() -> Self {
310 PipelineLayer {
311 cp_blooms: Some(Default::default()),
312 cp_bloom_blocks: Some(Default::default()),
313 sum_displays: Some(Default::default()),
314 cp_sequence_numbers: Some(Default::default()),
315 ev_emit_mod: Some(Default::default()),
316 ev_struct_inst: Some(Default::default()),
317 kv_checkpoints: Some(Default::default()),
318 kv_epoch_ends: Some(Default::default()),
319 kv_epoch_starts: Some(Default::default()),
320 kv_feature_flags: Some(Default::default()),
321 kv_objects: Some(Default::default()),
322 kv_packages: Some(Default::default()),
323 kv_protocol_configs: Some(Default::default()),
324 kv_transactions: Some(Default::default()),
325 obj_versions: Some(Default::default()),
326 tx_affected_addresses: Some(Default::default()),
327 tx_affected_objects: Some(Default::default()),
328 tx_balance_changes: Some(Default::default()),
329 tx_calls: Some(Default::default()),
330 tx_digests: Some(Default::default()),
331 tx_kinds: Some(Default::default()),
332 }
333 }
334}
335
336impl Merge for IndexerConfig {
337 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
338 Ok(IndexerConfig {
339 ingestion: self.ingestion.merge(other.ingestion)?,
340 committer: self.committer.merge(other.committer)?,
341 pruner: self.pruner.merge(other.pruner)?,
342 pipeline: self.pipeline.merge(other.pipeline)?,
343 })
344 }
345}
346
347impl Merge for IngestionLayer {
348 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
349 Ok(IngestionLayer {
350 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
351 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
352 streaming_backoff_initial_batch_size: other
353 .streaming_backoff_initial_batch_size
354 .or(self.streaming_backoff_initial_batch_size),
355 streaming_backoff_max_batch_size: other
356 .streaming_backoff_max_batch_size
357 .or(self.streaming_backoff_max_batch_size),
358 streaming_connection_timeout_ms: other
359 .streaming_connection_timeout_ms
360 .or(self.streaming_connection_timeout_ms),
361 streaming_statement_timeout_ms: other
362 .streaming_statement_timeout_ms
363 .or(self.streaming_statement_timeout_ms),
364 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
365 })
366 }
367}
368
369impl Merge for SequentialLayer {
370 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
371 Ok(SequentialLayer {
372 committer: self.committer.merge(other.committer)?,
373 ingestion: self.ingestion.merge(other.ingestion)?,
374 fanout: other.fanout.or(self.fanout),
375 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
376 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
377 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
378 })
379 }
380}
381
382impl Merge for ConcurrentLayer {
383 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
384 Ok(ConcurrentLayer {
385 committer: self.committer.merge(other.committer)?,
386 ingestion: self.ingestion.merge(other.ingestion)?,
387 pruner: self.pruner.merge(other.pruner)?,
388 fanout: other.fanout.or(self.fanout),
389 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
390 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
391 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
392 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
393 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
394 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
395 })
396 }
397}
398
399impl Merge for PipelineIngestionLayer {
400 fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
401 Ok(PipelineIngestionLayer {
402 subscriber_channel_size: other
403 .subscriber_channel_size
404 .or(self.subscriber_channel_size),
405 })
406 }
407}
408
409impl Merge for CommitterLayer {
410 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
411 Ok(CommitterLayer {
412 write_concurrency: other.write_concurrency.or(self.write_concurrency),
413 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
414 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
415 })
416 }
417}
418
419impl Merge for PrunerLayer {
420 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
423 Ok(PrunerLayer {
424 interval_ms: other.interval_ms.or(self.interval_ms),
425 delay_ms: other.delay_ms.or(self.delay_ms),
426 retention: match (other.retention, self.retention) {
427 (Some(a), Some(b)) => Some(a.max(b)),
428 (Some(a), _) | (_, Some(a)) => Some(a),
429 (None, None) => None,
430 },
431 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
432 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
433 })
434 }
435}
436
437impl Merge for PipelineLayer {
438 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
439 Ok(PipelineLayer {
440 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
441 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
442 sum_displays: self.sum_displays.merge(other.sum_displays)?,
443 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
444 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
445 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
446 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
447 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
448 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
449 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
450 kv_objects: self.kv_objects.merge(other.kv_objects)?,
451 kv_packages: self.kv_packages.merge(other.kv_packages)?,
452 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
453 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
454 obj_versions: self.obj_versions.merge(other.obj_versions)?,
455 tx_affected_addresses: self
456 .tx_affected_addresses
457 .merge(other.tx_affected_addresses)?,
458 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
459 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
460 tx_calls: self.tx_calls.merge(other.tx_calls)?,
461 tx_digests: self.tx_digests.merge(other.tx_digests)?,
462 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
463 })
464 }
465}
466
467impl<T: Merge> Merge for Option<T> {
468 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
469 Ok(match (self, other) {
470 (Some(a), Some(b)) => Some(a.merge(b)?),
471 (Some(a), _) | (_, Some(a)) => Some(a),
472 (None, None) => None,
473 })
474 }
475}
476
477impl From<IngestionConfig> for IngestionLayer {
478 fn from(config: IngestionConfig) -> Self {
479 Self {
480 ingest_concurrency: Some(config.ingest_concurrency),
481 retry_interval_ms: Some(config.retry_interval_ms),
482 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
483 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
484 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
485 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
486 checkpoint_buffer_size: None,
487 }
488 }
489}
490
491impl From<SequentialConfig> for SequentialLayer {
492 fn from(config: SequentialConfig) -> Self {
493 Self {
494 committer: Some(config.committer.into()),
495 ingestion: Some(config.ingestion.into()),
496 fanout: config.fanout,
497 min_eager_rows: config.min_eager_rows,
498 max_batch_checkpoints: config.max_batch_checkpoints,
499 processor_channel_size: config.processor_channel_size,
500 }
501 }
502}
503
504impl From<ConcurrentConfig> for ConcurrentLayer {
505 fn from(config: ConcurrentConfig) -> Self {
506 Self {
507 committer: Some(config.committer.into()),
508 ingestion: Some(config.ingestion.into()),
509 pruner: config.pruner.map(Into::into),
510 fanout: config.fanout,
511 min_eager_rows: config.min_eager_rows,
512 max_pending_rows: config.max_pending_rows,
513 max_watermark_updates: config.max_watermark_updates,
514 processor_channel_size: config.processor_channel_size,
515 collector_channel_size: config.collector_channel_size,
516 committer_channel_size: config.committer_channel_size,
517 }
518 }
519}
520
521impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
522 fn from(config: pipeline::IngestionConfig) -> Self {
523 Self {
524 subscriber_channel_size: config.subscriber_channel_size,
525 }
526 }
527}
528
529impl From<CommitterConfig> for CommitterLayer {
530 fn from(config: CommitterConfig) -> Self {
531 Self {
532 write_concurrency: Some(config.write_concurrency),
533 collect_interval_ms: Some(config.collect_interval_ms),
534 watermark_interval_ms: Some(config.watermark_interval_ms),
535 }
536 }
537}
538
539impl From<PrunerConfig> for PrunerLayer {
540 fn from(config: PrunerConfig) -> Self {
541 Self {
542 interval_ms: Some(config.interval_ms),
543 delay_ms: Some(config.delay_ms),
544 retention: Some(config.retention),
545 max_chunk_size: Some(config.max_chunk_size),
546 prune_concurrency: Some(config.prune_concurrency),
547 }
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 macro_rules! assert_matches {
556 ($value:expr, $pattern:pat $(,)?) => {
557 let value = $value;
558 assert!(
559 matches!(value, $pattern),
560 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
561 stringify!($pattern)
562 );
563 };
564 }
565
566 #[test]
567 fn merge_recursive() {
568 let this = PipelineLayer {
569 sum_displays: Some(SequentialLayer {
570 committer: Some(CommitterLayer {
571 write_concurrency: Some(10),
572 collect_interval_ms: Some(1000),
573 watermark_interval_ms: None,
574 }),
575 min_eager_rows: Some(100),
576 ..Default::default()
577 }),
578 ev_emit_mod: Some(ConcurrentLayer {
579 committer: Some(CommitterLayer {
580 write_concurrency: Some(5),
581 collect_interval_ms: Some(500),
582 watermark_interval_ms: None,
583 }),
584 ..Default::default()
585 }),
586 ..Default::default()
587 };
588
589 let that = PipelineLayer {
590 sum_displays: Some(SequentialLayer {
591 committer: Some(CommitterLayer {
592 write_concurrency: Some(5),
593 collect_interval_ms: None,
594 watermark_interval_ms: Some(500),
595 }),
596 min_eager_rows: Some(200),
597 ..Default::default()
598 }),
599 ev_emit_mod: None,
600 ..Default::default()
601 };
602
603 let this_then_that = this.clone().merge(that.clone()).unwrap();
604 let that_then_this = that.clone().merge(this.clone()).unwrap();
605
606 assert_matches!(
607 this_then_that,
608 PipelineLayer {
609 sum_displays: Some(SequentialLayer {
610 committer: Some(CommitterLayer {
611 write_concurrency: Some(5),
612 collect_interval_ms: Some(1000),
613 watermark_interval_ms: Some(500),
614 ..
615 }),
616 min_eager_rows: Some(200),
617 ..
618 }),
619 ev_emit_mod: Some(ConcurrentLayer {
620 committer: Some(CommitterLayer {
621 write_concurrency: Some(5),
622 collect_interval_ms: Some(500),
623 watermark_interval_ms: None,
624 ..
625 }),
626 pruner: None,
627 ..
628 }),
629 ..
630 },
631 );
632
633 assert_matches!(
634 that_then_this,
635 PipelineLayer {
636 sum_displays: Some(SequentialLayer {
637 committer: Some(CommitterLayer {
638 write_concurrency: Some(10),
639 collect_interval_ms: Some(1000),
640 watermark_interval_ms: Some(500),
641 ..
642 }),
643 min_eager_rows: Some(100),
644 ..
645 }),
646 ev_emit_mod: Some(ConcurrentLayer {
647 committer: Some(CommitterLayer {
648 write_concurrency: Some(5),
649 collect_interval_ms: Some(500),
650 watermark_interval_ms: None,
651 ..
652 }),
653 pruner: None,
654 ..
655 }),
656 ..
657 },
658 );
659 }
660
661 #[test]
662 fn merge_pruner() {
663 let this = PrunerLayer {
664 interval_ms: None,
665 delay_ms: Some(100),
666 retention: Some(200),
667 max_chunk_size: Some(300),
668 prune_concurrency: Some(1),
669 };
670
671 let that = PrunerLayer {
672 interval_ms: Some(400),
673 delay_ms: None,
674 retention: Some(500),
675 max_chunk_size: Some(600),
676 prune_concurrency: Some(2),
677 };
678
679 let this_then_that = this.clone().merge(that.clone()).unwrap();
680 let that_then_this = that.clone().merge(this.clone()).unwrap();
681
682 assert_matches!(
683 this_then_that,
684 PrunerLayer {
685 interval_ms: Some(400),
686 delay_ms: Some(100),
687 retention: Some(500),
688 max_chunk_size: Some(600),
689 prune_concurrency: Some(2),
690 },
691 );
692
693 assert_matches!(
694 that_then_this,
695 PrunerLayer {
696 interval_ms: Some(400),
697 delay_ms: Some(100),
698 retention: Some(500),
699 max_chunk_size: Some(300),
700 prune_concurrency: Some(1),
701 },
702 );
703 }
704
705 #[test]
706 fn finish_concurrent_unpruned_override() {
707 let layer = ConcurrentLayer {
708 committer: None,
709 pruner: None,
710 ..Default::default()
711 };
712
713 let base = ConcurrentConfig {
714 committer: CommitterConfig {
715 write_concurrency: 5,
716 collect_interval_ms: 50,
717 watermark_interval_ms: 500,
718 ..Default::default()
719 },
720 pruner: Some(PrunerConfig::default()),
721 ..Default::default()
722 };
723
724 assert_matches!(
725 layer.finish(base).unwrap(),
726 ConcurrentConfig {
727 committer: CommitterConfig {
728 write_concurrency: 5,
729 collect_interval_ms: 50,
730 watermark_interval_ms: 500,
731 ..
732 },
733 pruner: None,
734 ..
735 },
736 );
737 }
738
739 #[test]
740 fn finish_concurrent_no_pruner() {
741 let layer = ConcurrentLayer {
742 committer: None,
743 pruner: None,
744 ..Default::default()
745 };
746
747 let base = ConcurrentConfig {
748 committer: CommitterConfig {
749 write_concurrency: 5,
750 collect_interval_ms: 50,
751 watermark_interval_ms: 500,
752 ..Default::default()
753 },
754 pruner: None,
755 ..Default::default()
756 };
757
758 assert_matches!(
759 layer.finish(base).unwrap(),
760 ConcurrentConfig {
761 committer: CommitterConfig {
762 write_concurrency: 5,
763 collect_interval_ms: 50,
764 watermark_interval_ms: 500,
765 ..
766 },
767 pruner: None,
768 ..
769 },
770 );
771 }
772
773 #[test]
774 fn finish_concurrent_pruner() {
775 let layer = ConcurrentLayer {
776 committer: None,
777 pruner: Some(PrunerLayer {
778 interval_ms: Some(1000),
779 ..Default::default()
780 }),
781 ..Default::default()
782 };
783
784 let base = ConcurrentConfig {
785 committer: CommitterConfig {
786 write_concurrency: 5,
787 collect_interval_ms: 50,
788 watermark_interval_ms: 500,
789 ..Default::default()
790 },
791 pruner: Some(PrunerConfig {
792 interval_ms: 100,
793 delay_ms: 200,
794 retention: 300,
795 max_chunk_size: 400,
796 prune_concurrency: 1,
797 }),
798 ..Default::default()
799 };
800
801 assert_matches!(
802 layer.finish(base).unwrap(),
803 ConcurrentConfig {
804 committer: CommitterConfig {
805 write_concurrency: 5,
806 collect_interval_ms: 50,
807 watermark_interval_ms: 500,
808 ..
809 },
810 pruner: Some(PrunerConfig {
811 interval_ms: 1000,
812 delay_ms: 200,
813 retention: 300,
814 max_chunk_size: 400,
815 prune_concurrency: 1,
816 }),
817 ..
818 },
819 );
820 }
821
822 #[test]
823 fn detect_unrecognized_fields() {
824 let err = toml::from_str::<IndexerConfig>(
825 r#"
826 i_dont_exist = "foo"
827 "#,
828 )
829 .unwrap_err();
830
831 assert!(
832 err.to_string().contains("i_dont_exist"),
833 "Unexpected error: {err}"
834 );
835 }
836
837 #[test]
838 fn deprecated_checkpoint_buffer_size_parses() {
839 let config: IndexerConfig = toml::from_str(
840 r#"
841 [ingestion]
842 checkpoint-buffer-size = 5000
843 "#,
844 )
845 .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
846
847 assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
848 }
849}