1use std::num::NonZeroUsize;
5
6use sui_default_config::DefaultConfig;
7use sui_indexer_alt_framework::config::ConcurrencyConfig;
8use sui_indexer_alt_framework::ingestion::IngestionConfig;
9use sui_indexer_alt_framework::pipeline;
10use sui_indexer_alt_framework::pipeline::CommitterConfig;
11use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
12use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
13use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
14use tracing::warn;
15
16pub trait Merge: Sized {
18 fn merge(self, other: Self) -> anyhow::Result<Self>;
19}
20
21#[DefaultConfig]
22#[derive(Clone, Default, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct IndexerConfig {
25 pub ingestion: IngestionLayer,
27
28 pub committer: CommitterLayer,
31
32 pub pruner: PrunerLayer,
38
39 pub pipeline: PipelineLayer,
41}
42
43#[DefaultConfig]
52#[derive(Clone, Default, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct IngestionLayer {
55 pub ingest_concurrency: Option<ConcurrencyConfig>,
56 pub retry_interval_ms: Option<u64>,
57 pub streaming_backoff_initial_batch_size: Option<NonZeroUsize>,
58 pub streaming_backoff_max_batch_size: Option<usize>,
59 pub streaming_connection_timeout_ms: Option<u64>,
60 pub streaming_statement_timeout_ms: Option<u64>,
61
62 pub checkpoint_buffer_size: Option<usize>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct SequentialLayer {
71 pub committer: Option<CommitterLayer>,
72 pub ingestion: Option<PipelineIngestionLayer>,
73 pub fanout: Option<ConcurrencyConfig>,
74 pub min_eager_rows: Option<usize>,
75 pub max_pending_rows: Option<usize>,
76 pub max_batch_checkpoints: Option<usize>,
77 pub processor_channel_size: Option<usize>,
78 pub pipeline_depth: Option<usize>,
79}
80
81#[DefaultConfig]
82#[derive(Clone, Default, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ConcurrentLayer {
85 pub committer: Option<CommitterLayer>,
86 pub ingestion: Option<PipelineIngestionLayer>,
87 pub pruner: Option<PrunerLayer>,
88 pub fanout: Option<ConcurrencyConfig>,
89 pub min_eager_rows: Option<usize>,
90 pub max_pending_rows: Option<usize>,
91 pub max_watermark_updates: Option<usize>,
92 pub processor_channel_size: Option<usize>,
93 pub collector_channel_size: Option<usize>,
94 pub committer_channel_size: Option<usize>,
95}
96
97#[DefaultConfig]
98#[derive(Clone, Default, Debug)]
99#[serde(deny_unknown_fields)]
100pub struct PipelineIngestionLayer {
101 pub subscriber_channel_size: Option<usize>,
102}
103
104#[DefaultConfig]
105#[derive(Clone, Default, Debug)]
106#[serde(deny_unknown_fields)]
107pub struct CommitterLayer {
108 pub write_concurrency: Option<usize>,
109 pub collect_interval_ms: Option<u64>,
110 pub watermark_interval_ms: Option<u64>,
111}
112
113#[DefaultConfig]
114#[derive(Clone, Default, Debug)]
115#[serde(deny_unknown_fields)]
116pub struct PrunerLayer {
117 pub interval_ms: Option<u64>,
118 pub delay_ms: Option<u64>,
119 pub retention: Option<u64>,
120 pub max_chunk_size: Option<u64>,
121 pub prune_concurrency: Option<u64>,
122}
123
124#[DefaultConfig]
125#[derive(Clone, Default, Debug)]
126#[serde(rename_all = "snake_case", deny_unknown_fields)]
127pub struct PipelineLayer {
128 pub sum_displays: Option<SequentialLayer>,
130
131 pub cp_bloom_blocks: Option<ConcurrentLayer>,
133 pub cp_blooms: Option<ConcurrentLayer>,
134 pub cp_digests: Option<ConcurrentLayer>,
135 pub cp_sequence_numbers: Option<ConcurrentLayer>,
136 pub ev_emit_mod: Option<ConcurrentLayer>,
137 pub ev_struct_inst: Option<ConcurrentLayer>,
138 pub kv_checkpoints: Option<ConcurrentLayer>,
139 pub kv_epoch_ends: Option<ConcurrentLayer>,
140 pub kv_epoch_starts: Option<ConcurrentLayer>,
141 pub kv_feature_flags: Option<ConcurrentLayer>,
142 pub kv_objects: Option<ConcurrentLayer>,
143 pub kv_packages: Option<ConcurrentLayer>,
144 pub kv_protocol_configs: Option<ConcurrentLayer>,
145 pub kv_transactions: Option<ConcurrentLayer>,
146 pub obj_versions: Option<ConcurrentLayer>,
147 pub tx_affected_addresses: Option<ConcurrentLayer>,
148 pub tx_affected_objects: Option<ConcurrentLayer>,
149 pub tx_balance_changes: Option<ConcurrentLayer>,
150 pub tx_calls: Option<ConcurrentLayer>,
151 pub tx_digests: Option<ConcurrentLayer>,
152 pub tx_kinds: Option<ConcurrentLayer>,
153}
154
155impl IndexerConfig {
156 pub fn example() -> Self {
159 let mut example: Self = Default::default();
160
161 example.ingestion = IngestionConfig::default().into();
162 example.committer = CommitterConfig::default().into();
163 example.pruner = PrunerConfig::default().into();
164 example.pipeline = PipelineLayer::example();
165
166 example
167 }
168
169 pub fn for_test() -> Self {
173 Self::example()
174 .merge(IndexerConfig {
175 ingestion: IngestionLayer {
176 retry_interval_ms: Some(10),
177 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
178 ..Default::default()
179 },
180 committer: CommitterLayer {
181 collect_interval_ms: Some(50),
182 watermark_interval_ms: Some(50),
183 write_concurrency: Some(1),
184 },
185 pruner: PrunerLayer {
186 interval_ms: Some(50),
187 delay_ms: Some(0),
188 ..Default::default()
189 },
190 ..Default::default()
191 })
192 .expect("Merge failed for test configuration")
193 }
194}
195
196impl IngestionLayer {
197 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
198 if self.checkpoint_buffer_size.is_some() {
199 warn!(
200 "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
201 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
202 section if you need to override the default."
203 );
204 }
205
206 Ok(IngestionConfig {
207 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
208 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
209 streaming_backoff_initial_batch_size: self
210 .streaming_backoff_initial_batch_size
211 .unwrap_or(base.streaming_backoff_initial_batch_size),
212 streaming_backoff_max_batch_size: self
213 .streaming_backoff_max_batch_size
214 .unwrap_or(base.streaming_backoff_max_batch_size),
215 streaming_connection_timeout_ms: self
216 .streaming_connection_timeout_ms
217 .unwrap_or(base.streaming_connection_timeout_ms),
218 streaming_statement_timeout_ms: self
219 .streaming_statement_timeout_ms
220 .unwrap_or(base.streaming_statement_timeout_ms),
221 })
222 }
223}
224
225impl SequentialLayer {
226 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
227 Ok(SequentialConfig {
228 committer: if let Some(committer) = self.committer {
229 committer.finish(base.committer)?
230 } else {
231 base.committer
232 },
233 ingestion: if let Some(ingestion) = self.ingestion {
234 ingestion.finish(base.ingestion)
235 } else {
236 base.ingestion
237 },
238 fanout: self.fanout.or(base.fanout),
239 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
240 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
241 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
242 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
243 pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
244 })
245 }
246}
247
248impl ConcurrentLayer {
249 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
252 Ok(ConcurrentConfig {
253 committer: if let Some(committer) = self.committer {
254 committer.finish(base.committer)?
255 } else {
256 base.committer
257 },
258 ingestion: if let Some(ingestion) = self.ingestion {
259 ingestion.finish(base.ingestion)
260 } else {
261 base.ingestion
262 },
263 pruner: match (self.pruner, base.pruner) {
264 (None, _) | (_, None) => None,
265 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
266 },
267 fanout: self.fanout.or(base.fanout),
268 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
269 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
270 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
271 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
272 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
273 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
274 })
275 }
276}
277
278impl PipelineIngestionLayer {
279 pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
280 pipeline::IngestionConfig {
281 subscriber_channel_size: self
282 .subscriber_channel_size
283 .or(base.subscriber_channel_size),
284 }
285 }
286}
287
288impl CommitterLayer {
289 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
290 Ok(CommitterConfig {
291 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
292 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
293 watermark_interval_ms: self
294 .watermark_interval_ms
295 .unwrap_or(base.watermark_interval_ms),
296 watermark_interval_jitter_ms: 0,
297 })
298 }
299}
300
301impl PrunerLayer {
302 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
303 Ok(PrunerConfig {
304 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
305 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
306 retention: self.retention.unwrap_or(base.retention),
307 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
308 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
309 })
310 }
311}
312
313impl PipelineLayer {
314 pub fn example() -> Self {
317 PipelineLayer {
318 cp_blooms: Some(Default::default()),
319 cp_bloom_blocks: Some(Default::default()),
320 cp_digests: Some(Default::default()),
321 sum_displays: Some(Default::default()),
322 cp_sequence_numbers: Some(Default::default()),
323 ev_emit_mod: Some(Default::default()),
324 ev_struct_inst: Some(Default::default()),
325 kv_checkpoints: Some(Default::default()),
326 kv_epoch_ends: Some(Default::default()),
327 kv_epoch_starts: Some(Default::default()),
328 kv_feature_flags: Some(Default::default()),
329 kv_objects: Some(Default::default()),
330 kv_packages: Some(Default::default()),
331 kv_protocol_configs: Some(Default::default()),
332 kv_transactions: Some(Default::default()),
333 obj_versions: Some(Default::default()),
334 tx_affected_addresses: Some(Default::default()),
335 tx_affected_objects: Some(Default::default()),
336 tx_balance_changes: Some(Default::default()),
337 tx_calls: Some(Default::default()),
338 tx_digests: Some(Default::default()),
339 tx_kinds: Some(Default::default()),
340 }
341 }
342}
343
344impl Merge for IndexerConfig {
345 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
346 Ok(IndexerConfig {
347 ingestion: self.ingestion.merge(other.ingestion)?,
348 committer: self.committer.merge(other.committer)?,
349 pruner: self.pruner.merge(other.pruner)?,
350 pipeline: self.pipeline.merge(other.pipeline)?,
351 })
352 }
353}
354
355impl Merge for IngestionLayer {
356 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
357 Ok(IngestionLayer {
358 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
359 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
360 streaming_backoff_initial_batch_size: other
361 .streaming_backoff_initial_batch_size
362 .or(self.streaming_backoff_initial_batch_size),
363 streaming_backoff_max_batch_size: other
364 .streaming_backoff_max_batch_size
365 .or(self.streaming_backoff_max_batch_size),
366 streaming_connection_timeout_ms: other
367 .streaming_connection_timeout_ms
368 .or(self.streaming_connection_timeout_ms),
369 streaming_statement_timeout_ms: other
370 .streaming_statement_timeout_ms
371 .or(self.streaming_statement_timeout_ms),
372 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
373 })
374 }
375}
376
377impl Merge for SequentialLayer {
378 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
379 Ok(SequentialLayer {
380 committer: self.committer.merge(other.committer)?,
381 ingestion: self.ingestion.merge(other.ingestion)?,
382 fanout: other.fanout.or(self.fanout),
383 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
384 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
385 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
386 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
387 pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
388 })
389 }
390}
391
392impl Merge for ConcurrentLayer {
393 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
394 Ok(ConcurrentLayer {
395 committer: self.committer.merge(other.committer)?,
396 ingestion: self.ingestion.merge(other.ingestion)?,
397 pruner: self.pruner.merge(other.pruner)?,
398 fanout: other.fanout.or(self.fanout),
399 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
400 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
401 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
402 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
403 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
404 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
405 })
406 }
407}
408
409impl Merge for PipelineIngestionLayer {
410 fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
411 Ok(PipelineIngestionLayer {
412 subscriber_channel_size: other
413 .subscriber_channel_size
414 .or(self.subscriber_channel_size),
415 })
416 }
417}
418
419impl Merge for CommitterLayer {
420 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
421 Ok(CommitterLayer {
422 write_concurrency: other.write_concurrency.or(self.write_concurrency),
423 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
424 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
425 })
426 }
427}
428
429impl Merge for PrunerLayer {
430 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
433 Ok(PrunerLayer {
434 interval_ms: other.interval_ms.or(self.interval_ms),
435 delay_ms: other.delay_ms.or(self.delay_ms),
436 retention: match (other.retention, self.retention) {
437 (Some(a), Some(b)) => Some(a.max(b)),
438 (Some(a), _) | (_, Some(a)) => Some(a),
439 (None, None) => None,
440 },
441 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
442 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
443 })
444 }
445}
446
447impl Merge for PipelineLayer {
448 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
449 Ok(PipelineLayer {
450 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
451 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
452 cp_digests: self.cp_digests.merge(other.cp_digests)?,
453 sum_displays: self.sum_displays.merge(other.sum_displays)?,
454 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
455 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
456 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
457 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
458 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
459 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
460 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
461 kv_objects: self.kv_objects.merge(other.kv_objects)?,
462 kv_packages: self.kv_packages.merge(other.kv_packages)?,
463 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
464 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
465 obj_versions: self.obj_versions.merge(other.obj_versions)?,
466 tx_affected_addresses: self
467 .tx_affected_addresses
468 .merge(other.tx_affected_addresses)?,
469 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
470 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
471 tx_calls: self.tx_calls.merge(other.tx_calls)?,
472 tx_digests: self.tx_digests.merge(other.tx_digests)?,
473 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
474 })
475 }
476}
477
478impl<T: Merge> Merge for Option<T> {
479 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
480 Ok(match (self, other) {
481 (Some(a), Some(b)) => Some(a.merge(b)?),
482 (Some(a), _) | (_, Some(a)) => Some(a),
483 (None, None) => None,
484 })
485 }
486}
487
488impl From<IngestionConfig> for IngestionLayer {
489 fn from(config: IngestionConfig) -> Self {
490 Self {
491 ingest_concurrency: Some(config.ingest_concurrency),
492 retry_interval_ms: Some(config.retry_interval_ms),
493 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
494 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
495 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
496 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
497 checkpoint_buffer_size: None,
498 }
499 }
500}
501
502impl From<SequentialConfig> for SequentialLayer {
503 fn from(config: SequentialConfig) -> Self {
504 Self {
505 committer: Some(config.committer.into()),
506 ingestion: Some(config.ingestion.into()),
507 fanout: config.fanout,
508 min_eager_rows: config.min_eager_rows,
509 max_pending_rows: config.max_pending_rows,
510 max_batch_checkpoints: config.max_batch_checkpoints,
511 processor_channel_size: config.processor_channel_size,
512 pipeline_depth: config.pipeline_depth,
513 }
514 }
515}
516
517impl From<ConcurrentConfig> for ConcurrentLayer {
518 fn from(config: ConcurrentConfig) -> Self {
519 Self {
520 committer: Some(config.committer.into()),
521 ingestion: Some(config.ingestion.into()),
522 pruner: config.pruner.map(Into::into),
523 fanout: config.fanout,
524 min_eager_rows: config.min_eager_rows,
525 max_pending_rows: config.max_pending_rows,
526 max_watermark_updates: config.max_watermark_updates,
527 processor_channel_size: config.processor_channel_size,
528 collector_channel_size: config.collector_channel_size,
529 committer_channel_size: config.committer_channel_size,
530 }
531 }
532}
533
534impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
535 fn from(config: pipeline::IngestionConfig) -> Self {
536 Self {
537 subscriber_channel_size: config.subscriber_channel_size,
538 }
539 }
540}
541
542impl From<CommitterConfig> for CommitterLayer {
543 fn from(config: CommitterConfig) -> Self {
544 Self {
545 write_concurrency: Some(config.write_concurrency),
546 collect_interval_ms: Some(config.collect_interval_ms),
547 watermark_interval_ms: Some(config.watermark_interval_ms),
548 }
549 }
550}
551
552impl From<PrunerConfig> for PrunerLayer {
553 fn from(config: PrunerConfig) -> Self {
554 Self {
555 interval_ms: Some(config.interval_ms),
556 delay_ms: Some(config.delay_ms),
557 retention: Some(config.retention),
558 max_chunk_size: Some(config.max_chunk_size),
559 prune_concurrency: Some(config.prune_concurrency),
560 }
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 macro_rules! assert_matches {
569 ($value:expr, $pattern:pat $(,)?) => {
570 let value = $value;
571 assert!(
572 matches!(value, $pattern),
573 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
574 stringify!($pattern)
575 );
576 };
577 }
578
579 #[test]
580 fn merge_recursive() {
581 let this = PipelineLayer {
582 sum_displays: Some(SequentialLayer {
583 committer: Some(CommitterLayer {
584 write_concurrency: Some(10),
585 collect_interval_ms: Some(1000),
586 watermark_interval_ms: None,
587 }),
588 min_eager_rows: Some(100),
589 ..Default::default()
590 }),
591 ev_emit_mod: Some(ConcurrentLayer {
592 committer: Some(CommitterLayer {
593 write_concurrency: Some(5),
594 collect_interval_ms: Some(500),
595 watermark_interval_ms: None,
596 }),
597 ..Default::default()
598 }),
599 ..Default::default()
600 };
601
602 let that = PipelineLayer {
603 sum_displays: Some(SequentialLayer {
604 committer: Some(CommitterLayer {
605 write_concurrency: Some(5),
606 collect_interval_ms: None,
607 watermark_interval_ms: Some(500),
608 }),
609 min_eager_rows: Some(200),
610 ..Default::default()
611 }),
612 ev_emit_mod: None,
613 ..Default::default()
614 };
615
616 let this_then_that = this.clone().merge(that.clone()).unwrap();
617 let that_then_this = that.clone().merge(this.clone()).unwrap();
618
619 assert_matches!(
620 this_then_that,
621 PipelineLayer {
622 sum_displays: Some(SequentialLayer {
623 committer: Some(CommitterLayer {
624 write_concurrency: Some(5),
625 collect_interval_ms: Some(1000),
626 watermark_interval_ms: Some(500),
627 ..
628 }),
629 min_eager_rows: Some(200),
630 ..
631 }),
632 ev_emit_mod: Some(ConcurrentLayer {
633 committer: Some(CommitterLayer {
634 write_concurrency: Some(5),
635 collect_interval_ms: Some(500),
636 watermark_interval_ms: None,
637 ..
638 }),
639 pruner: None,
640 ..
641 }),
642 ..
643 },
644 );
645
646 assert_matches!(
647 that_then_this,
648 PipelineLayer {
649 sum_displays: Some(SequentialLayer {
650 committer: Some(CommitterLayer {
651 write_concurrency: Some(10),
652 collect_interval_ms: Some(1000),
653 watermark_interval_ms: Some(500),
654 ..
655 }),
656 min_eager_rows: Some(100),
657 ..
658 }),
659 ev_emit_mod: Some(ConcurrentLayer {
660 committer: Some(CommitterLayer {
661 write_concurrency: Some(5),
662 collect_interval_ms: Some(500),
663 watermark_interval_ms: None,
664 ..
665 }),
666 pruner: None,
667 ..
668 }),
669 ..
670 },
671 );
672 }
673
674 #[test]
675 fn merge_pruner() {
676 let this = PrunerLayer {
677 interval_ms: None,
678 delay_ms: Some(100),
679 retention: Some(200),
680 max_chunk_size: Some(300),
681 prune_concurrency: Some(1),
682 };
683
684 let that = PrunerLayer {
685 interval_ms: Some(400),
686 delay_ms: None,
687 retention: Some(500),
688 max_chunk_size: Some(600),
689 prune_concurrency: Some(2),
690 };
691
692 let this_then_that = this.clone().merge(that.clone()).unwrap();
693 let that_then_this = that.clone().merge(this.clone()).unwrap();
694
695 assert_matches!(
696 this_then_that,
697 PrunerLayer {
698 interval_ms: Some(400),
699 delay_ms: Some(100),
700 retention: Some(500),
701 max_chunk_size: Some(600),
702 prune_concurrency: Some(2),
703 },
704 );
705
706 assert_matches!(
707 that_then_this,
708 PrunerLayer {
709 interval_ms: Some(400),
710 delay_ms: Some(100),
711 retention: Some(500),
712 max_chunk_size: Some(300),
713 prune_concurrency: Some(1),
714 },
715 );
716 }
717
718 #[test]
719 fn finish_concurrent_unpruned_override() {
720 let layer = ConcurrentLayer {
721 committer: None,
722 pruner: None,
723 ..Default::default()
724 };
725
726 let base = ConcurrentConfig {
727 committer: CommitterConfig {
728 write_concurrency: 5,
729 collect_interval_ms: 50,
730 watermark_interval_ms: 500,
731 ..Default::default()
732 },
733 pruner: Some(PrunerConfig::default()),
734 ..Default::default()
735 };
736
737 assert_matches!(
738 layer.finish(base).unwrap(),
739 ConcurrentConfig {
740 committer: CommitterConfig {
741 write_concurrency: 5,
742 collect_interval_ms: 50,
743 watermark_interval_ms: 500,
744 ..
745 },
746 pruner: None,
747 ..
748 },
749 );
750 }
751
752 #[test]
753 fn finish_concurrent_no_pruner() {
754 let layer = ConcurrentLayer {
755 committer: None,
756 pruner: None,
757 ..Default::default()
758 };
759
760 let base = ConcurrentConfig {
761 committer: CommitterConfig {
762 write_concurrency: 5,
763 collect_interval_ms: 50,
764 watermark_interval_ms: 500,
765 ..Default::default()
766 },
767 pruner: None,
768 ..Default::default()
769 };
770
771 assert_matches!(
772 layer.finish(base).unwrap(),
773 ConcurrentConfig {
774 committer: CommitterConfig {
775 write_concurrency: 5,
776 collect_interval_ms: 50,
777 watermark_interval_ms: 500,
778 ..
779 },
780 pruner: None,
781 ..
782 },
783 );
784 }
785
786 #[test]
787 fn finish_concurrent_pruner() {
788 let layer = ConcurrentLayer {
789 committer: None,
790 pruner: Some(PrunerLayer {
791 interval_ms: Some(1000),
792 ..Default::default()
793 }),
794 ..Default::default()
795 };
796
797 let base = ConcurrentConfig {
798 committer: CommitterConfig {
799 write_concurrency: 5,
800 collect_interval_ms: 50,
801 watermark_interval_ms: 500,
802 ..Default::default()
803 },
804 pruner: Some(PrunerConfig {
805 interval_ms: 100,
806 delay_ms: 200,
807 retention: 300,
808 max_chunk_size: 400,
809 prune_concurrency: 1,
810 }),
811 ..Default::default()
812 };
813
814 assert_matches!(
815 layer.finish(base).unwrap(),
816 ConcurrentConfig {
817 committer: CommitterConfig {
818 write_concurrency: 5,
819 collect_interval_ms: 50,
820 watermark_interval_ms: 500,
821 ..
822 },
823 pruner: Some(PrunerConfig {
824 interval_ms: 1000,
825 delay_ms: 200,
826 retention: 300,
827 max_chunk_size: 400,
828 prune_concurrency: 1,
829 }),
830 ..
831 },
832 );
833 }
834
835 #[test]
836 fn detect_unrecognized_fields() {
837 let err = toml::from_str::<IndexerConfig>(
838 r#"
839 i_dont_exist = "foo"
840 "#,
841 )
842 .unwrap_err();
843
844 assert!(
845 err.to_string().contains("i_dont_exist"),
846 "Unexpected error: {err}"
847 );
848 }
849
850 #[test]
851 fn deprecated_checkpoint_buffer_size_parses() {
852 let config: IndexerConfig = toml::from_str(
853 r#"
854 [ingestion]
855 checkpoint-buffer-size = 5000
856 "#,
857 )
858 .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
859
860 assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
861 }
862}