1use std::num::NonZeroUsize;
5
6use serde::Deserialize;
7use serde::Serialize;
8use sui_indexer_alt_framework::config::ConcurrencyConfig;
9use sui_indexer_alt_framework::ingestion::IngestionConfig;
10use sui_indexer_alt_framework::pipeline;
11use sui_indexer_alt_framework::pipeline::CommitterConfig;
12use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
13use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
14use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
15use tracing::warn;
16
17pub trait Merge: Sized {
19 fn merge(self, other: Self) -> anyhow::Result<Self>;
20}
21
22#[derive(Clone, Default, Debug, Deserialize, Serialize)]
23#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
24pub struct IndexerConfig {
25 pub ingestion: IngestionLayer,
27
28 pub committer: CommitterLayer,
31
32 pub pruner: PrunerLayer,
38
39 pub pipeline: PipelineLayer,
41}
42
43#[derive(Clone, Default, Debug, Deserialize, Serialize)]
52#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
53pub struct IngestionLayer {
54 pub ingest_concurrency: Option<ConcurrencyConfig>,
55 pub retry_interval_ms: Option<u64>,
56 pub streaming_backoff_initial_batch_size: Option<NonZeroUsize>,
57 pub streaming_backoff_max_batch_size: Option<usize>,
58 pub streaming_connection_timeout_ms: Option<u64>,
59 pub streaming_statement_timeout_ms: Option<u64>,
60
61 pub checkpoint_buffer_size: Option<usize>,
64}
65
66#[derive(Clone, Default, Debug, Deserialize, Serialize)]
67#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
68pub struct SequentialLayer {
69 pub committer: Option<CommitterLayer>,
70 pub ingestion: Option<PipelineIngestionLayer>,
71 pub fanout: Option<ConcurrencyConfig>,
72 pub min_eager_rows: Option<usize>,
73 pub max_pending_rows: Option<usize>,
74 pub max_batch_checkpoints: Option<usize>,
75 pub processor_channel_size: Option<usize>,
76 pub pipeline_depth: Option<usize>,
77}
78
79#[derive(Clone, Default, Debug, Deserialize, Serialize)]
80#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
81pub struct ConcurrentLayer {
82 pub committer: Option<CommitterLayer>,
83 pub ingestion: Option<PipelineIngestionLayer>,
84 pub pruner: Option<PrunerLayer>,
85 pub fanout: Option<ConcurrencyConfig>,
86 pub min_eager_rows: Option<usize>,
87 pub max_pending_rows: Option<usize>,
88 pub max_watermark_updates: Option<usize>,
89 pub processor_channel_size: Option<usize>,
90 pub collector_channel_size: Option<usize>,
91 pub committer_channel_size: Option<usize>,
92}
93
94#[derive(Clone, Default, Debug, Deserialize, Serialize)]
95#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
96pub struct PipelineIngestionLayer {
97 pub subscriber_channel_size: Option<usize>,
98}
99
100#[derive(Clone, Default, Debug, Deserialize, Serialize)]
101#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
102pub struct CommitterLayer {
103 pub write_concurrency: Option<usize>,
104 pub collect_interval_ms: Option<u64>,
105 pub watermark_interval_ms: Option<u64>,
106}
107
108#[derive(Clone, Default, Debug, Deserialize, Serialize)]
109#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
110pub struct PrunerLayer {
111 pub interval_ms: Option<u64>,
112 pub delay_ms: Option<u64>,
113 pub retention: Option<u64>,
114 pub max_chunk_size: Option<u64>,
115 pub prune_concurrency: Option<u64>,
116}
117
118#[derive(Clone, Default, Debug, Deserialize, Serialize)]
119#[serde(default, rename_all = "snake_case", deny_unknown_fields)]
120pub struct PipelineLayer {
121 pub sum_displays: Option<SequentialLayer>,
123
124 pub cp_bloom_blocks: Option<ConcurrentLayer>,
126 pub cp_blooms: Option<ConcurrentLayer>,
127 pub cp_digests: Option<ConcurrentLayer>,
128 pub cp_sequence_numbers: Option<ConcurrentLayer>,
129 pub ev_emit_mod: Option<ConcurrentLayer>,
130 pub ev_struct_inst: Option<ConcurrentLayer>,
131 pub kv_checkpoints: Option<ConcurrentLayer>,
132 pub kv_epoch_ends: Option<ConcurrentLayer>,
133 pub kv_epoch_starts: Option<ConcurrentLayer>,
134 pub kv_feature_flags: Option<ConcurrentLayer>,
135 pub kv_objects: Option<ConcurrentLayer>,
136 pub kv_packages: Option<ConcurrentLayer>,
137 pub kv_protocol_configs: Option<ConcurrentLayer>,
138 pub kv_transactions: Option<ConcurrentLayer>,
139 pub obj_versions: Option<ConcurrentLayer>,
140 pub tx_affected_addresses: Option<ConcurrentLayer>,
141 pub tx_affected_objects: Option<ConcurrentLayer>,
142 pub tx_balance_changes: Option<ConcurrentLayer>,
143 pub tx_calls: Option<ConcurrentLayer>,
144 pub tx_digests: Option<ConcurrentLayer>,
145 pub tx_kinds: Option<ConcurrentLayer>,
146}
147
148impl IndexerConfig {
149 pub fn example() -> Self {
152 let mut example: Self = Default::default();
153
154 example.ingestion = IngestionConfig::default().into();
155 example.committer = CommitterConfig::default().into();
156 example.pruner = PrunerConfig::default().into();
157 example.pipeline = PipelineLayer::example();
158
159 example
160 }
161
162 pub fn for_test() -> Self {
166 Self::example()
167 .merge(IndexerConfig {
168 ingestion: IngestionLayer {
169 retry_interval_ms: Some(10),
170 ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
171 ..Default::default()
172 },
173 committer: CommitterLayer {
174 collect_interval_ms: Some(50),
175 watermark_interval_ms: Some(50),
176 write_concurrency: Some(1),
177 },
178 pruner: PrunerLayer {
179 interval_ms: Some(50),
180 delay_ms: Some(0),
181 ..Default::default()
182 },
183 ..Default::default()
184 })
185 .expect("Merge failed for test configuration")
186 }
187}
188
189impl IngestionLayer {
190 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
191 if self.checkpoint_buffer_size.is_some() {
192 warn!(
193 "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
194 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
195 section if you need to override the default."
196 );
197 }
198
199 Ok(IngestionConfig {
200 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
201 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
202 streaming_backoff_initial_batch_size: self
203 .streaming_backoff_initial_batch_size
204 .unwrap_or(base.streaming_backoff_initial_batch_size),
205 streaming_backoff_max_batch_size: self
206 .streaming_backoff_max_batch_size
207 .unwrap_or(base.streaming_backoff_max_batch_size),
208 streaming_connection_timeout_ms: self
209 .streaming_connection_timeout_ms
210 .unwrap_or(base.streaming_connection_timeout_ms),
211 streaming_statement_timeout_ms: self
212 .streaming_statement_timeout_ms
213 .unwrap_or(base.streaming_statement_timeout_ms),
214 })
215 }
216}
217
218impl SequentialLayer {
219 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
220 Ok(SequentialConfig {
221 committer: if let Some(committer) = self.committer {
222 committer.finish(base.committer)?
223 } else {
224 base.committer
225 },
226 ingestion: if let Some(ingestion) = self.ingestion {
227 ingestion.finish(base.ingestion)
228 } else {
229 base.ingestion
230 },
231 fanout: self.fanout.or(base.fanout),
232 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
233 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
234 max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
235 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
236 pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
237 })
238 }
239}
240
241impl ConcurrentLayer {
242 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
245 Ok(ConcurrentConfig {
246 committer: if let Some(committer) = self.committer {
247 committer.finish(base.committer)?
248 } else {
249 base.committer
250 },
251 ingestion: if let Some(ingestion) = self.ingestion {
252 ingestion.finish(base.ingestion)
253 } else {
254 base.ingestion
255 },
256 pruner: match (self.pruner, base.pruner) {
257 (None, _) | (_, None) => None,
258 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
259 },
260 fanout: self.fanout.or(base.fanout),
261 min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
262 max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
263 max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
264 processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
265 collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
266 committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
267 })
268 }
269}
270
271impl PipelineIngestionLayer {
272 pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
273 pipeline::IngestionConfig {
274 subscriber_channel_size: self
275 .subscriber_channel_size
276 .or(base.subscriber_channel_size),
277 }
278 }
279}
280
281impl CommitterLayer {
282 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
283 Ok(CommitterConfig {
284 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
285 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
286 watermark_interval_ms: self
287 .watermark_interval_ms
288 .unwrap_or(base.watermark_interval_ms),
289 watermark_interval_jitter_ms: 0,
290 })
291 }
292}
293
294impl PrunerLayer {
295 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
296 Ok(PrunerConfig {
297 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
298 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
299 retention: self.retention.unwrap_or(base.retention),
300 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
301 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
302 })
303 }
304}
305
306impl PipelineLayer {
307 pub fn example() -> Self {
310 PipelineLayer {
311 cp_blooms: Some(Default::default()),
312 cp_bloom_blocks: Some(Default::default()),
313 cp_digests: Some(Default::default()),
314 sum_displays: Some(Default::default()),
315 cp_sequence_numbers: Some(Default::default()),
316 ev_emit_mod: Some(Default::default()),
317 ev_struct_inst: Some(Default::default()),
318 kv_checkpoints: Some(Default::default()),
319 kv_epoch_ends: Some(Default::default()),
320 kv_epoch_starts: Some(Default::default()),
321 kv_feature_flags: Some(Default::default()),
322 kv_objects: Some(Default::default()),
323 kv_packages: Some(Default::default()),
324 kv_protocol_configs: Some(Default::default()),
325 kv_transactions: Some(Default::default()),
326 obj_versions: Some(Default::default()),
327 tx_affected_addresses: Some(Default::default()),
328 tx_affected_objects: Some(Default::default()),
329 tx_balance_changes: Some(Default::default()),
330 tx_calls: Some(Default::default()),
331 tx_digests: Some(Default::default()),
332 tx_kinds: Some(Default::default()),
333 }
334 }
335}
336
337impl Merge for IndexerConfig {
338 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
339 Ok(IndexerConfig {
340 ingestion: self.ingestion.merge(other.ingestion)?,
341 committer: self.committer.merge(other.committer)?,
342 pruner: self.pruner.merge(other.pruner)?,
343 pipeline: self.pipeline.merge(other.pipeline)?,
344 })
345 }
346}
347
348impl Merge for IngestionLayer {
349 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
350 Ok(IngestionLayer {
351 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
352 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
353 streaming_backoff_initial_batch_size: other
354 .streaming_backoff_initial_batch_size
355 .or(self.streaming_backoff_initial_batch_size),
356 streaming_backoff_max_batch_size: other
357 .streaming_backoff_max_batch_size
358 .or(self.streaming_backoff_max_batch_size),
359 streaming_connection_timeout_ms: other
360 .streaming_connection_timeout_ms
361 .or(self.streaming_connection_timeout_ms),
362 streaming_statement_timeout_ms: other
363 .streaming_statement_timeout_ms
364 .or(self.streaming_statement_timeout_ms),
365 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
366 })
367 }
368}
369
370impl Merge for SequentialLayer {
371 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
372 Ok(SequentialLayer {
373 committer: self.committer.merge(other.committer)?,
374 ingestion: self.ingestion.merge(other.ingestion)?,
375 fanout: other.fanout.or(self.fanout),
376 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
377 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
378 max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
379 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
380 pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
381 })
382 }
383}
384
385impl Merge for ConcurrentLayer {
386 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
387 Ok(ConcurrentLayer {
388 committer: self.committer.merge(other.committer)?,
389 ingestion: self.ingestion.merge(other.ingestion)?,
390 pruner: self.pruner.merge(other.pruner)?,
391 fanout: other.fanout.or(self.fanout),
392 min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
393 max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
394 max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
395 processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
396 collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
397 committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
398 })
399 }
400}
401
402impl Merge for PipelineIngestionLayer {
403 fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
404 Ok(PipelineIngestionLayer {
405 subscriber_channel_size: other
406 .subscriber_channel_size
407 .or(self.subscriber_channel_size),
408 })
409 }
410}
411
412impl Merge for CommitterLayer {
413 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
414 Ok(CommitterLayer {
415 write_concurrency: other.write_concurrency.or(self.write_concurrency),
416 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
417 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
418 })
419 }
420}
421
422impl Merge for PrunerLayer {
423 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
426 Ok(PrunerLayer {
427 interval_ms: other.interval_ms.or(self.interval_ms),
428 delay_ms: other.delay_ms.or(self.delay_ms),
429 retention: match (other.retention, self.retention) {
430 (Some(a), Some(b)) => Some(a.max(b)),
431 (Some(a), _) | (_, Some(a)) => Some(a),
432 (None, None) => None,
433 },
434 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
435 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
436 })
437 }
438}
439
440impl Merge for PipelineLayer {
441 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
442 Ok(PipelineLayer {
443 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
444 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
445 cp_digests: self.cp_digests.merge(other.cp_digests)?,
446 sum_displays: self.sum_displays.merge(other.sum_displays)?,
447 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
448 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
449 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
450 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
451 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
452 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
453 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
454 kv_objects: self.kv_objects.merge(other.kv_objects)?,
455 kv_packages: self.kv_packages.merge(other.kv_packages)?,
456 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
457 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
458 obj_versions: self.obj_versions.merge(other.obj_versions)?,
459 tx_affected_addresses: self
460 .tx_affected_addresses
461 .merge(other.tx_affected_addresses)?,
462 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
463 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
464 tx_calls: self.tx_calls.merge(other.tx_calls)?,
465 tx_digests: self.tx_digests.merge(other.tx_digests)?,
466 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
467 })
468 }
469}
470
471impl<T: Merge> Merge for Option<T> {
472 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
473 Ok(match (self, other) {
474 (Some(a), Some(b)) => Some(a.merge(b)?),
475 (Some(a), _) | (_, Some(a)) => Some(a),
476 (None, None) => None,
477 })
478 }
479}
480
481impl From<IngestionConfig> for IngestionLayer {
482 fn from(config: IngestionConfig) -> Self {
483 Self {
484 ingest_concurrency: Some(config.ingest_concurrency),
485 retry_interval_ms: Some(config.retry_interval_ms),
486 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
487 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
488 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
489 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
490 checkpoint_buffer_size: None,
491 }
492 }
493}
494
495impl From<SequentialConfig> for SequentialLayer {
496 fn from(config: SequentialConfig) -> Self {
497 Self {
498 committer: Some(config.committer.into()),
499 ingestion: Some(config.ingestion.into()),
500 fanout: config.fanout,
501 min_eager_rows: config.min_eager_rows,
502 max_pending_rows: config.max_pending_rows,
503 max_batch_checkpoints: config.max_batch_checkpoints,
504 processor_channel_size: config.processor_channel_size,
505 pipeline_depth: config.pipeline_depth,
506 }
507 }
508}
509
510impl From<ConcurrentConfig> for ConcurrentLayer {
511 fn from(config: ConcurrentConfig) -> Self {
512 Self {
513 committer: Some(config.committer.into()),
514 ingestion: Some(config.ingestion.into()),
515 pruner: config.pruner.map(Into::into),
516 fanout: config.fanout,
517 min_eager_rows: config.min_eager_rows,
518 max_pending_rows: config.max_pending_rows,
519 max_watermark_updates: config.max_watermark_updates,
520 processor_channel_size: config.processor_channel_size,
521 collector_channel_size: config.collector_channel_size,
522 committer_channel_size: config.committer_channel_size,
523 }
524 }
525}
526
527impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
528 fn from(config: pipeline::IngestionConfig) -> Self {
529 Self {
530 subscriber_channel_size: config.subscriber_channel_size,
531 }
532 }
533}
534
535impl From<CommitterConfig> for CommitterLayer {
536 fn from(config: CommitterConfig) -> Self {
537 Self {
538 write_concurrency: Some(config.write_concurrency),
539 collect_interval_ms: Some(config.collect_interval_ms),
540 watermark_interval_ms: Some(config.watermark_interval_ms),
541 }
542 }
543}
544
545impl From<PrunerConfig> for PrunerLayer {
546 fn from(config: PrunerConfig) -> Self {
547 Self {
548 interval_ms: Some(config.interval_ms),
549 delay_ms: Some(config.delay_ms),
550 retention: Some(config.retention),
551 max_chunk_size: Some(config.max_chunk_size),
552 prune_concurrency: Some(config.prune_concurrency),
553 }
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 macro_rules! assert_matches {
562 ($value:expr, $pattern:pat $(,)?) => {
563 let value = $value;
564 assert!(
565 matches!(value, $pattern),
566 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
567 stringify!($pattern)
568 );
569 };
570 }
571
572 #[test]
573 fn merge_recursive() {
574 let this = PipelineLayer {
575 sum_displays: Some(SequentialLayer {
576 committer: Some(CommitterLayer {
577 write_concurrency: Some(10),
578 collect_interval_ms: Some(1000),
579 watermark_interval_ms: None,
580 }),
581 min_eager_rows: Some(100),
582 ..Default::default()
583 }),
584 ev_emit_mod: Some(ConcurrentLayer {
585 committer: Some(CommitterLayer {
586 write_concurrency: Some(5),
587 collect_interval_ms: Some(500),
588 watermark_interval_ms: None,
589 }),
590 ..Default::default()
591 }),
592 ..Default::default()
593 };
594
595 let that = PipelineLayer {
596 sum_displays: Some(SequentialLayer {
597 committer: Some(CommitterLayer {
598 write_concurrency: Some(5),
599 collect_interval_ms: None,
600 watermark_interval_ms: Some(500),
601 }),
602 min_eager_rows: Some(200),
603 ..Default::default()
604 }),
605 ev_emit_mod: None,
606 ..Default::default()
607 };
608
609 let this_then_that = this.clone().merge(that.clone()).unwrap();
610 let that_then_this = that.clone().merge(this.clone()).unwrap();
611
612 assert_matches!(
613 this_then_that,
614 PipelineLayer {
615 sum_displays: Some(SequentialLayer {
616 committer: Some(CommitterLayer {
617 write_concurrency: Some(5),
618 collect_interval_ms: Some(1000),
619 watermark_interval_ms: Some(500),
620 ..
621 }),
622 min_eager_rows: Some(200),
623 ..
624 }),
625 ev_emit_mod: Some(ConcurrentLayer {
626 committer: Some(CommitterLayer {
627 write_concurrency: Some(5),
628 collect_interval_ms: Some(500),
629 watermark_interval_ms: None,
630 ..
631 }),
632 pruner: None,
633 ..
634 }),
635 ..
636 },
637 );
638
639 assert_matches!(
640 that_then_this,
641 PipelineLayer {
642 sum_displays: Some(SequentialLayer {
643 committer: Some(CommitterLayer {
644 write_concurrency: Some(10),
645 collect_interval_ms: Some(1000),
646 watermark_interval_ms: Some(500),
647 ..
648 }),
649 min_eager_rows: Some(100),
650 ..
651 }),
652 ev_emit_mod: Some(ConcurrentLayer {
653 committer: Some(CommitterLayer {
654 write_concurrency: Some(5),
655 collect_interval_ms: Some(500),
656 watermark_interval_ms: None,
657 ..
658 }),
659 pruner: None,
660 ..
661 }),
662 ..
663 },
664 );
665 }
666
667 #[test]
668 fn merge_pruner() {
669 let this = PrunerLayer {
670 interval_ms: None,
671 delay_ms: Some(100),
672 retention: Some(200),
673 max_chunk_size: Some(300),
674 prune_concurrency: Some(1),
675 };
676
677 let that = PrunerLayer {
678 interval_ms: Some(400),
679 delay_ms: None,
680 retention: Some(500),
681 max_chunk_size: Some(600),
682 prune_concurrency: Some(2),
683 };
684
685 let this_then_that = this.clone().merge(that.clone()).unwrap();
686 let that_then_this = that.clone().merge(this.clone()).unwrap();
687
688 assert_matches!(
689 this_then_that,
690 PrunerLayer {
691 interval_ms: Some(400),
692 delay_ms: Some(100),
693 retention: Some(500),
694 max_chunk_size: Some(600),
695 prune_concurrency: Some(2),
696 },
697 );
698
699 assert_matches!(
700 that_then_this,
701 PrunerLayer {
702 interval_ms: Some(400),
703 delay_ms: Some(100),
704 retention: Some(500),
705 max_chunk_size: Some(300),
706 prune_concurrency: Some(1),
707 },
708 );
709 }
710
711 #[test]
712 fn finish_concurrent_unpruned_override() {
713 let layer = ConcurrentLayer {
714 committer: None,
715 pruner: None,
716 ..Default::default()
717 };
718
719 let base = ConcurrentConfig {
720 committer: CommitterConfig {
721 write_concurrency: 5,
722 collect_interval_ms: 50,
723 watermark_interval_ms: 500,
724 ..Default::default()
725 },
726 pruner: Some(PrunerConfig::default()),
727 ..Default::default()
728 };
729
730 assert_matches!(
731 layer.finish(base).unwrap(),
732 ConcurrentConfig {
733 committer: CommitterConfig {
734 write_concurrency: 5,
735 collect_interval_ms: 50,
736 watermark_interval_ms: 500,
737 ..
738 },
739 pruner: None,
740 ..
741 },
742 );
743 }
744
745 #[test]
746 fn finish_concurrent_no_pruner() {
747 let layer = ConcurrentLayer {
748 committer: None,
749 pruner: None,
750 ..Default::default()
751 };
752
753 let base = ConcurrentConfig {
754 committer: CommitterConfig {
755 write_concurrency: 5,
756 collect_interval_ms: 50,
757 watermark_interval_ms: 500,
758 ..Default::default()
759 },
760 pruner: None,
761 ..Default::default()
762 };
763
764 assert_matches!(
765 layer.finish(base).unwrap(),
766 ConcurrentConfig {
767 committer: CommitterConfig {
768 write_concurrency: 5,
769 collect_interval_ms: 50,
770 watermark_interval_ms: 500,
771 ..
772 },
773 pruner: None,
774 ..
775 },
776 );
777 }
778
779 #[test]
780 fn finish_concurrent_pruner() {
781 let layer = ConcurrentLayer {
782 committer: None,
783 pruner: Some(PrunerLayer {
784 interval_ms: Some(1000),
785 ..Default::default()
786 }),
787 ..Default::default()
788 };
789
790 let base = ConcurrentConfig {
791 committer: CommitterConfig {
792 write_concurrency: 5,
793 collect_interval_ms: 50,
794 watermark_interval_ms: 500,
795 ..Default::default()
796 },
797 pruner: Some(PrunerConfig {
798 interval_ms: 100,
799 delay_ms: 200,
800 retention: 300,
801 max_chunk_size: 400,
802 prune_concurrency: 1,
803 }),
804 ..Default::default()
805 };
806
807 assert_matches!(
808 layer.finish(base).unwrap(),
809 ConcurrentConfig {
810 committer: CommitterConfig {
811 write_concurrency: 5,
812 collect_interval_ms: 50,
813 watermark_interval_ms: 500,
814 ..
815 },
816 pruner: Some(PrunerConfig {
817 interval_ms: 1000,
818 delay_ms: 200,
819 retention: 300,
820 max_chunk_size: 400,
821 prune_concurrency: 1,
822 }),
823 ..
824 },
825 );
826 }
827
828 #[test]
829 fn detect_unrecognized_fields() {
830 let err = toml::from_str::<IndexerConfig>(
831 r#"
832 i_dont_exist = "foo"
833 "#,
834 )
835 .unwrap_err();
836
837 assert!(
838 err.to_string().contains("i_dont_exist"),
839 "Unexpected error: {err}"
840 );
841 }
842
843 #[test]
844 fn deprecated_checkpoint_buffer_size_parses() {
845 let config: IndexerConfig = toml::from_str(
846 r#"
847 [ingestion]
848 checkpoint-buffer-size = 5000
849 "#,
850 )
851 .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
852
853 assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
854 }
855}