1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline;
8use sui_indexer_alt_framework::pipeline::CommitterConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
10use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
11use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
12use tracing::warn;
13
14pub trait Merge: Sized {
16 fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23 pub ingestion: IngestionLayer,
25
26 pub committer: CommitterLayer,
29
30 pub pruner: PrunerLayer,
36
37 pub pipeline: PipelineLayer,
39}
40
41#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53 pub ingest_concurrency: Option<ConcurrencyConfig>,
54 pub retry_interval_ms: Option<u64>,
55 pub streaming_backoff_initial_batch_size: Option<usize>,
56 pub streaming_backoff_max_batch_size: Option<usize>,
57 pub streaming_connection_timeout_ms: Option<u64>,
58 pub streaming_statement_timeout_ms: Option<u64>,
59
60 pub checkpoint_buffer_size: Option<usize>,
63}
64
65#[DefaultConfig]
66#[derive(Clone, Default, Debug)]
67#[serde(deny_unknown_fields)]
68pub struct SequentialLayer {
69 pub committer: Option<CommitterLayer>,
70 pub ingestion: Option<PipelineIngestionLayer>,
71 pub fanout: Option<ConcurrencyConfig>,
72 pub min_eager_rows: Option<usize>,
73 pub max_pending_rows: Option<usize>,
74 pub max_batch_checkpoints: Option<usize>,
75 pub processor_channel_size: Option<usize>,
76 pub pipeline_depth: Option<usize>,
77}
78
79#[DefaultConfig]
80#[derive(Clone, Default, Debug)]
81#[serde(deny_unknown_fields)]
82pub struct ConcurrentLayer {
83 pub committer: Option<CommitterLayer>,
84 pub ingestion: Option<PipelineIngestionLayer>,
85 pub pruner: Option<PrunerLayer>,
86 pub fanout: Option<ConcurrencyConfig>,
87 pub min_eager_rows: Option<usize>,
88 pub max_pending_rows: Option<usize>,
89 pub max_watermark_updates: Option<usize>,
90 pub processor_channel_size: Option<usize>,
91 pub collector_channel_size: Option<usize>,
92 pub committer_channel_size: Option<usize>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct PipelineIngestionLayer {
99 pub subscriber_channel_size: Option<usize>,
100}
101
102#[DefaultConfig]
103#[derive(Clone, Default, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct CommitterLayer {
106 pub write_concurrency: Option<usize>,
107 pub collect_interval_ms: Option<u64>,
108 pub watermark_interval_ms: Option<u64>,
109}
110
111#[DefaultConfig]
112#[derive(Clone, Default, Debug)]
113#[serde(deny_unknown_fields)]
114pub struct PrunerLayer {
115 pub interval_ms: Option<u64>,
116 pub delay_ms: Option<u64>,
117 pub retention: Option<u64>,
118 pub max_chunk_size: Option<u64>,
119 pub prune_concurrency: Option<u64>,
120}
121
122#[DefaultConfig]
123#[derive(Clone, Default, Debug)]
124#[serde(rename_all = "snake_case", deny_unknown_fields)]
125pub struct PipelineLayer {
126 pub sum_displays: Option<SequentialLayer>,
128
129 pub cp_bloom_blocks: Option<ConcurrentLayer>,
131 pub cp_blooms: Option<ConcurrentLayer>,
132 pub cp_sequence_numbers: Option<ConcurrentLayer>,
133 pub ev_emit_mod: Option<ConcurrentLayer>,
134 pub ev_struct_inst: Option<ConcurrentLayer>,
135 pub kv_checkpoints: Option<ConcurrentLayer>,
136 pub kv_epoch_ends: Option<ConcurrentLayer>,
137 pub kv_epoch_starts: Option<ConcurrentLayer>,
138 pub kv_feature_flags: Option<ConcurrentLayer>,
139 pub kv_objects: Option<ConcurrentLayer>,
140 pub kv_packages: Option<ConcurrentLayer>,
141 pub kv_protocol_configs: Option<ConcurrentLayer>,
142 pub kv_transactions: Option<ConcurrentLayer>,
143 pub obj_versions: Option<ConcurrentLayer>,
144 pub tx_affected_addresses: Option<ConcurrentLayer>,
145 pub tx_affected_objects: Option<ConcurrentLayer>,
146 pub tx_balance_changes: Option<ConcurrentLayer>,
147 pub tx_calls: Option<ConcurrentLayer>,
148 pub tx_digests: Option<ConcurrentLayer>,
149 pub tx_kinds: Option<ConcurrentLayer>,
150}
151
152impl IndexerConfig {
153 pub fn example() -> Self {
156 let mut example: Self = Default::default();
157
158 example.ingestion = IngestionConfig::default().into();
159 example.committer = CommitterConfig::default().into();
160 example.pruner = PrunerConfig::default().into();
161 example.pipeline = PipelineLayer::example();
162
163 example
164 }
165
166 pub fn for_test() -> Self {
170 Self::example()
171 .merge(IndexerConfig {
172 ingestion: IngestionLayer {
173 retry_interval_ms: Some(10),
174 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
175 ..Default::default()
176 },
177 committer: CommitterLayer {
178 collect_interval_ms: Some(50),
179 watermark_interval_ms: Some(50),
180 write_concurrency: Some(1),
181 },
182 pruner: PrunerLayer {
183 interval_ms: Some(50),
184 delay_ms: Some(0),
185 ..Default::default()
186 },
187 ..Default::default()
188 })
189 .expect("Merge failed for test configuration")
190 }
191}
192
193impl IngestionLayer {
194 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
195 if self.checkpoint_buffer_size.is_some() {
196 warn!(
197 "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
198 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
199 section if you need to override the default."
200 );
201 }
202
203 Ok(IngestionConfig {
204 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
205 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
206 streaming_backoff_initial_batch_size: self
207 .streaming_backoff_initial_batch_size
208 .unwrap_or(base.streaming_backoff_initial_batch_size),
209 streaming_backoff_max_batch_size: self
210 .streaming_backoff_max_batch_size
211 .unwrap_or(base.streaming_backoff_max_batch_size),
212 streaming_connection_timeout_ms: self
213 .streaming_connection_timeout_ms
214 .unwrap_or(base.streaming_connection_timeout_ms),
215 streaming_statement_timeout_ms: self
216 .streaming_statement_timeout_ms
217 .unwrap_or(base.streaming_statement_timeout_ms),
218 })
219 }
220}
221
222impl SequentialLayer {
223 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
224 Ok(SequentialConfig {
225 committer: if let Some(committer) = self.committer {
226 committer.finish(base.committer)?
227 } else {
228 base.committer
229 },
230 ingestion: if let Some(ingestion) = self.ingestion {
231 ingestion.finish(base.ingestion)
232 } else {
233 base.ingestion
234 },
235 fanout: self.fanout.or(base.fanout),
236 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
237 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
238 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
239 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
240 pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
241 })
242 }
243}
244
245impl ConcurrentLayer {
246 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
249 Ok(ConcurrentConfig {
250 committer: if let Some(committer) = self.committer {
251 committer.finish(base.committer)?
252 } else {
253 base.committer
254 },
255 ingestion: if let Some(ingestion) = self.ingestion {
256 ingestion.finish(base.ingestion)
257 } else {
258 base.ingestion
259 },
260 pruner: match (self.pruner, base.pruner) {
261 (None, _) | (_, None) => None,
262 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
263 },
264 fanout: self.fanout.or(base.fanout),
265 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
266 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
267 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
268 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
269 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
270 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
271 })
272 }
273}
274
275impl PipelineIngestionLayer {
276 pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
277 pipeline::IngestionConfig {
278 subscriber_channel_size: self
279 .subscriber_channel_size
280 .or(base.subscriber_channel_size),
281 }
282 }
283}
284
285impl CommitterLayer {
286 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
287 Ok(CommitterConfig {
288 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
289 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
290 watermark_interval_ms: self
291 .watermark_interval_ms
292 .unwrap_or(base.watermark_interval_ms),
293 watermark_interval_jitter_ms: 0,
294 })
295 }
296}
297
298impl PrunerLayer {
299 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
300 Ok(PrunerConfig {
301 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
302 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
303 retention: self.retention.unwrap_or(base.retention),
304 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
305 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
306 })
307 }
308}
309
310impl PipelineLayer {
311 pub fn example() -> Self {
314 PipelineLayer {
315 cp_blooms: Some(Default::default()),
316 cp_bloom_blocks: Some(Default::default()),
317 sum_displays: Some(Default::default()),
318 cp_sequence_numbers: Some(Default::default()),
319 ev_emit_mod: Some(Default::default()),
320 ev_struct_inst: Some(Default::default()),
321 kv_checkpoints: Some(Default::default()),
322 kv_epoch_ends: Some(Default::default()),
323 kv_epoch_starts: Some(Default::default()),
324 kv_feature_flags: Some(Default::default()),
325 kv_objects: Some(Default::default()),
326 kv_packages: Some(Default::default()),
327 kv_protocol_configs: Some(Default::default()),
328 kv_transactions: Some(Default::default()),
329 obj_versions: Some(Default::default()),
330 tx_affected_addresses: Some(Default::default()),
331 tx_affected_objects: Some(Default::default()),
332 tx_balance_changes: Some(Default::default()),
333 tx_calls: Some(Default::default()),
334 tx_digests: Some(Default::default()),
335 tx_kinds: Some(Default::default()),
336 }
337 }
338}
339
340impl Merge for IndexerConfig {
341 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
342 Ok(IndexerConfig {
343 ingestion: self.ingestion.merge(other.ingestion)?,
344 committer: self.committer.merge(other.committer)?,
345 pruner: self.pruner.merge(other.pruner)?,
346 pipeline: self.pipeline.merge(other.pipeline)?,
347 })
348 }
349}
350
351impl Merge for IngestionLayer {
352 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
353 Ok(IngestionLayer {
354 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
355 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
356 streaming_backoff_initial_batch_size: other
357 .streaming_backoff_initial_batch_size
358 .or(self.streaming_backoff_initial_batch_size),
359 streaming_backoff_max_batch_size: other
360 .streaming_backoff_max_batch_size
361 .or(self.streaming_backoff_max_batch_size),
362 streaming_connection_timeout_ms: other
363 .streaming_connection_timeout_ms
364 .or(self.streaming_connection_timeout_ms),
365 streaming_statement_timeout_ms: other
366 .streaming_statement_timeout_ms
367 .or(self.streaming_statement_timeout_ms),
368 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
369 })
370 }
371}
372
373impl Merge for SequentialLayer {
374 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
375 Ok(SequentialLayer {
376 committer: self.committer.merge(other.committer)?,
377 ingestion: self.ingestion.merge(other.ingestion)?,
378 fanout: other.fanout.or(self.fanout),
379 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
380 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
381 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
382 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
383 pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
384 })
385 }
386}
387
388impl Merge for ConcurrentLayer {
389 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
390 Ok(ConcurrentLayer {
391 committer: self.committer.merge(other.committer)?,
392 ingestion: self.ingestion.merge(other.ingestion)?,
393 pruner: self.pruner.merge(other.pruner)?,
394 fanout: other.fanout.or(self.fanout),
395 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
396 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
397 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
398 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
399 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
400 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
401 })
402 }
403}
404
405impl Merge for PipelineIngestionLayer {
406 fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
407 Ok(PipelineIngestionLayer {
408 subscriber_channel_size: other
409 .subscriber_channel_size
410 .or(self.subscriber_channel_size),
411 })
412 }
413}
414
415impl Merge for CommitterLayer {
416 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
417 Ok(CommitterLayer {
418 write_concurrency: other.write_concurrency.or(self.write_concurrency),
419 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
420 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
421 })
422 }
423}
424
425impl Merge for PrunerLayer {
426 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
429 Ok(PrunerLayer {
430 interval_ms: other.interval_ms.or(self.interval_ms),
431 delay_ms: other.delay_ms.or(self.delay_ms),
432 retention: match (other.retention, self.retention) {
433 (Some(a), Some(b)) => Some(a.max(b)),
434 (Some(a), _) | (_, Some(a)) => Some(a),
435 (None, None) => None,
436 },
437 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
438 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
439 })
440 }
441}
442
443impl Merge for PipelineLayer {
444 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
445 Ok(PipelineLayer {
446 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
447 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
448 sum_displays: self.sum_displays.merge(other.sum_displays)?,
449 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
450 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
451 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
452 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
453 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
454 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
455 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
456 kv_objects: self.kv_objects.merge(other.kv_objects)?,
457 kv_packages: self.kv_packages.merge(other.kv_packages)?,
458 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
459 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
460 obj_versions: self.obj_versions.merge(other.obj_versions)?,
461 tx_affected_addresses: self
462 .tx_affected_addresses
463 .merge(other.tx_affected_addresses)?,
464 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
465 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
466 tx_calls: self.tx_calls.merge(other.tx_calls)?,
467 tx_digests: self.tx_digests.merge(other.tx_digests)?,
468 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
469 })
470 }
471}
472
473impl<T: Merge> Merge for Option<T> {
474 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
475 Ok(match (self, other) {
476 (Some(a), Some(b)) => Some(a.merge(b)?),
477 (Some(a), _) | (_, Some(a)) => Some(a),
478 (None, None) => None,
479 })
480 }
481}
482
483impl From<IngestionConfig> for IngestionLayer {
484 fn from(config: IngestionConfig) -> Self {
485 Self {
486 ingest_concurrency: Some(config.ingest_concurrency),
487 retry_interval_ms: Some(config.retry_interval_ms),
488 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
489 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
490 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
491 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
492 checkpoint_buffer_size: None,
493 }
494 }
495}
496
497impl From<SequentialConfig> for SequentialLayer {
498 fn from(config: SequentialConfig) -> Self {
499 Self {
500 committer: Some(config.committer.into()),
501 ingestion: Some(config.ingestion.into()),
502 fanout: config.fanout,
503 min_eager_rows: config.min_eager_rows,
504 max_pending_rows: config.max_pending_rows,
505 max_batch_checkpoints: config.max_batch_checkpoints,
506 processor_channel_size: config.processor_channel_size,
507 pipeline_depth: config.pipeline_depth,
508 }
509 }
510}
511
512impl From<ConcurrentConfig> for ConcurrentLayer {
513 fn from(config: ConcurrentConfig) -> Self {
514 Self {
515 committer: Some(config.committer.into()),
516 ingestion: Some(config.ingestion.into()),
517 pruner: config.pruner.map(Into::into),
518 fanout: config.fanout,
519 min_eager_rows: config.min_eager_rows,
520 max_pending_rows: config.max_pending_rows,
521 max_watermark_updates: config.max_watermark_updates,
522 processor_channel_size: config.processor_channel_size,
523 collector_channel_size: config.collector_channel_size,
524 committer_channel_size: config.committer_channel_size,
525 }
526 }
527}
528
529impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
530 fn from(config: pipeline::IngestionConfig) -> Self {
531 Self {
532 subscriber_channel_size: config.subscriber_channel_size,
533 }
534 }
535}
536
537impl From<CommitterConfig> for CommitterLayer {
538 fn from(config: CommitterConfig) -> Self {
539 Self {
540 write_concurrency: Some(config.write_concurrency),
541 collect_interval_ms: Some(config.collect_interval_ms),
542 watermark_interval_ms: Some(config.watermark_interval_ms),
543 }
544 }
545}
546
547impl From<PrunerConfig> for PrunerLayer {
548 fn from(config: PrunerConfig) -> Self {
549 Self {
550 interval_ms: Some(config.interval_ms),
551 delay_ms: Some(config.delay_ms),
552 retention: Some(config.retention),
553 max_chunk_size: Some(config.max_chunk_size),
554 prune_concurrency: Some(config.prune_concurrency),
555 }
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562
563 macro_rules! assert_matches {
564 ($value:expr, $pattern:pat $(,)?) => {
565 let value = $value;
566 assert!(
567 matches!(value, $pattern),
568 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
569 stringify!($pattern)
570 );
571 };
572 }
573
574 #[test]
575 fn merge_recursive() {
576 let this = PipelineLayer {
577 sum_displays: Some(SequentialLayer {
578 committer: Some(CommitterLayer {
579 write_concurrency: Some(10),
580 collect_interval_ms: Some(1000),
581 watermark_interval_ms: None,
582 }),
583 min_eager_rows: Some(100),
584 ..Default::default()
585 }),
586 ev_emit_mod: Some(ConcurrentLayer {
587 committer: Some(CommitterLayer {
588 write_concurrency: Some(5),
589 collect_interval_ms: Some(500),
590 watermark_interval_ms: None,
591 }),
592 ..Default::default()
593 }),
594 ..Default::default()
595 };
596
597 let that = PipelineLayer {
598 sum_displays: Some(SequentialLayer {
599 committer: Some(CommitterLayer {
600 write_concurrency: Some(5),
601 collect_interval_ms: None,
602 watermark_interval_ms: Some(500),
603 }),
604 min_eager_rows: Some(200),
605 ..Default::default()
606 }),
607 ev_emit_mod: None,
608 ..Default::default()
609 };
610
611 let this_then_that = this.clone().merge(that.clone()).unwrap();
612 let that_then_this = that.clone().merge(this.clone()).unwrap();
613
614 assert_matches!(
615 this_then_that,
616 PipelineLayer {
617 sum_displays: Some(SequentialLayer {
618 committer: Some(CommitterLayer {
619 write_concurrency: Some(5),
620 collect_interval_ms: Some(1000),
621 watermark_interval_ms: Some(500),
622 ..
623 }),
624 min_eager_rows: Some(200),
625 ..
626 }),
627 ev_emit_mod: Some(ConcurrentLayer {
628 committer: Some(CommitterLayer {
629 write_concurrency: Some(5),
630 collect_interval_ms: Some(500),
631 watermark_interval_ms: None,
632 ..
633 }),
634 pruner: None,
635 ..
636 }),
637 ..
638 },
639 );
640
641 assert_matches!(
642 that_then_this,
643 PipelineLayer {
644 sum_displays: Some(SequentialLayer {
645 committer: Some(CommitterLayer {
646 write_concurrency: Some(10),
647 collect_interval_ms: Some(1000),
648 watermark_interval_ms: Some(500),
649 ..
650 }),
651 min_eager_rows: Some(100),
652 ..
653 }),
654 ev_emit_mod: Some(ConcurrentLayer {
655 committer: Some(CommitterLayer {
656 write_concurrency: Some(5),
657 collect_interval_ms: Some(500),
658 watermark_interval_ms: None,
659 ..
660 }),
661 pruner: None,
662 ..
663 }),
664 ..
665 },
666 );
667 }
668
669 #[test]
670 fn merge_pruner() {
671 let this = PrunerLayer {
672 interval_ms: None,
673 delay_ms: Some(100),
674 retention: Some(200),
675 max_chunk_size: Some(300),
676 prune_concurrency: Some(1),
677 };
678
679 let that = PrunerLayer {
680 interval_ms: Some(400),
681 delay_ms: None,
682 retention: Some(500),
683 max_chunk_size: Some(600),
684 prune_concurrency: Some(2),
685 };
686
687 let this_then_that = this.clone().merge(that.clone()).unwrap();
688 let that_then_this = that.clone().merge(this.clone()).unwrap();
689
690 assert_matches!(
691 this_then_that,
692 PrunerLayer {
693 interval_ms: Some(400),
694 delay_ms: Some(100),
695 retention: Some(500),
696 max_chunk_size: Some(600),
697 prune_concurrency: Some(2),
698 },
699 );
700
701 assert_matches!(
702 that_then_this,
703 PrunerLayer {
704 interval_ms: Some(400),
705 delay_ms: Some(100),
706 retention: Some(500),
707 max_chunk_size: Some(300),
708 prune_concurrency: Some(1),
709 },
710 );
711 }
712
713 #[test]
714 fn finish_concurrent_unpruned_override() {
715 let layer = ConcurrentLayer {
716 committer: None,
717 pruner: None,
718 ..Default::default()
719 };
720
721 let base = ConcurrentConfig {
722 committer: CommitterConfig {
723 write_concurrency: 5,
724 collect_interval_ms: 50,
725 watermark_interval_ms: 500,
726 ..Default::default()
727 },
728 pruner: Some(PrunerConfig::default()),
729 ..Default::default()
730 };
731
732 assert_matches!(
733 layer.finish(base).unwrap(),
734 ConcurrentConfig {
735 committer: CommitterConfig {
736 write_concurrency: 5,
737 collect_interval_ms: 50,
738 watermark_interval_ms: 500,
739 ..
740 },
741 pruner: None,
742 ..
743 },
744 );
745 }
746
747 #[test]
748 fn finish_concurrent_no_pruner() {
749 let layer = ConcurrentLayer {
750 committer: None,
751 pruner: None,
752 ..Default::default()
753 };
754
755 let base = ConcurrentConfig {
756 committer: CommitterConfig {
757 write_concurrency: 5,
758 collect_interval_ms: 50,
759 watermark_interval_ms: 500,
760 ..Default::default()
761 },
762 pruner: None,
763 ..Default::default()
764 };
765
766 assert_matches!(
767 layer.finish(base).unwrap(),
768 ConcurrentConfig {
769 committer: CommitterConfig {
770 write_concurrency: 5,
771 collect_interval_ms: 50,
772 watermark_interval_ms: 500,
773 ..
774 },
775 pruner: None,
776 ..
777 },
778 );
779 }
780
781 #[test]
782 fn finish_concurrent_pruner() {
783 let layer = ConcurrentLayer {
784 committer: None,
785 pruner: Some(PrunerLayer {
786 interval_ms: Some(1000),
787 ..Default::default()
788 }),
789 ..Default::default()
790 };
791
792 let base = ConcurrentConfig {
793 committer: CommitterConfig {
794 write_concurrency: 5,
795 collect_interval_ms: 50,
796 watermark_interval_ms: 500,
797 ..Default::default()
798 },
799 pruner: Some(PrunerConfig {
800 interval_ms: 100,
801 delay_ms: 200,
802 retention: 300,
803 max_chunk_size: 400,
804 prune_concurrency: 1,
805 }),
806 ..Default::default()
807 };
808
809 assert_matches!(
810 layer.finish(base).unwrap(),
811 ConcurrentConfig {
812 committer: CommitterConfig {
813 write_concurrency: 5,
814 collect_interval_ms: 50,
815 watermark_interval_ms: 500,
816 ..
817 },
818 pruner: Some(PrunerConfig {
819 interval_ms: 1000,
820 delay_ms: 200,
821 retention: 300,
822 max_chunk_size: 400,
823 prune_concurrency: 1,
824 }),
825 ..
826 },
827 );
828 }
829
830 #[test]
831 fn detect_unrecognized_fields() {
832 let err = toml::from_str::<IndexerConfig>(
833 r#"
834 i_dont_exist = "foo"
835 "#,
836 )
837 .unwrap_err();
838
839 assert!(
840 err.to_string().contains("i_dont_exist"),
841 "Unexpected error: {err}"
842 );
843 }
844
845 #[test]
846 fn deprecated_checkpoint_buffer_size_parses() {
847 let config: IndexerConfig = toml::from_str(
848 r#"
849 [ingestion]
850 checkpoint-buffer-size = 5000
851 "#,
852 )
853 .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
854
855 assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
856 }
857}