sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline;
8use sui_indexer_alt_framework::pipeline::CommitterConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
10use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
11use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
12use tracing::warn;
13
14/// Trait for merging configuration structs together.
15pub trait Merge: Sized {
16    fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23    /// How checkpoints are read by the indexer.
24    pub ingestion: IngestionLayer,
25
26    /// Default configuration for committers that is shared by all pipelines. Pipelines can
27    /// override individual settings in their own configuration sections.
28    pub committer: CommitterLayer,
29
30    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
31    /// override individual settings in their own configuration sections. Concurrent pipelines
32    /// still need to specify a pruner configuration (although it can be empty) to indicate that
33    /// they want to enable pruning, but when they do, any missing values will be filled in by this
34    /// config.
35    pub pruner: PrunerLayer,
36
37    /// Per-pipeline configurations.
38    pub pipeline: PipelineLayer,
39}
40
41// Configuration layers apply overrides over a base configuration. When reading configs from a
42// file, we read them into layer types, and then apply those layers onto an existing configuration
43// (such as the default configuration) to `finish()` them.
44//
45// Treating configs as layers allows us to support configuration merging, where multiple
46// configuration files can be combined into one final configuration. Having a separate type for
47// reading configs also allows us to detect and warn against unrecognised fields.
48
49#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53    pub ingest_concurrency: Option<ConcurrencyConfig>,
54    pub retry_interval_ms: Option<u64>,
55    pub streaming_backoff_initial_batch_size: Option<usize>,
56    pub streaming_backoff_max_batch_size: Option<usize>,
57    pub streaming_connection_timeout_ms: Option<u64>,
58    pub streaming_statement_timeout_ms: Option<u64>,
59
60    /// Deprecated: accepted (and ignored) so old configs don't fail to parse. Replaced by
61    /// per-pipeline `ingestion.subscriber-channel-size`.
62    pub checkpoint_buffer_size: Option<usize>,
63}
64
65#[DefaultConfig]
66#[derive(Clone, Default, Debug)]
67#[serde(deny_unknown_fields)]
68pub struct SequentialLayer {
69    pub committer: Option<CommitterLayer>,
70    pub ingestion: Option<PipelineIngestionLayer>,
71    pub fanout: Option<ConcurrencyConfig>,
72    pub min_eager_rows: Option<usize>,
73    pub max_batch_checkpoints: Option<usize>,
74    pub processor_channel_size: Option<usize>,
75}
76
77#[DefaultConfig]
78#[derive(Clone, Default, Debug)]
79#[serde(deny_unknown_fields)]
80pub struct ConcurrentLayer {
81    pub committer: Option<CommitterLayer>,
82    pub ingestion: Option<PipelineIngestionLayer>,
83    pub pruner: Option<PrunerLayer>,
84    pub fanout: Option<ConcurrencyConfig>,
85    pub min_eager_rows: Option<usize>,
86    pub max_pending_rows: Option<usize>,
87    pub max_watermark_updates: Option<usize>,
88    pub processor_channel_size: Option<usize>,
89    pub collector_channel_size: Option<usize>,
90    pub committer_channel_size: Option<usize>,
91}
92
93#[DefaultConfig]
94#[derive(Clone, Default, Debug)]
95#[serde(deny_unknown_fields)]
96pub struct PipelineIngestionLayer {
97    pub subscriber_channel_size: Option<usize>,
98}
99
100#[DefaultConfig]
101#[derive(Clone, Default, Debug)]
102#[serde(deny_unknown_fields)]
103pub struct CommitterLayer {
104    pub write_concurrency: Option<usize>,
105    pub collect_interval_ms: Option<u64>,
106    pub watermark_interval_ms: Option<u64>,
107}
108
109#[DefaultConfig]
110#[derive(Clone, Default, Debug)]
111#[serde(deny_unknown_fields)]
112pub struct PrunerLayer {
113    pub interval_ms: Option<u64>,
114    pub delay_ms: Option<u64>,
115    pub retention: Option<u64>,
116    pub max_chunk_size: Option<u64>,
117    pub prune_concurrency: Option<u64>,
118}
119
120#[DefaultConfig]
121#[derive(Clone, Default, Debug)]
122#[serde(rename_all = "snake_case", deny_unknown_fields)]
123pub struct PipelineLayer {
124    // Sequential pipelines
125    pub sum_displays: Option<SequentialLayer>,
126
127    // All concurrent pipelines
128    pub cp_bloom_blocks: Option<ConcurrentLayer>,
129    pub cp_blooms: Option<ConcurrentLayer>,
130    pub cp_sequence_numbers: Option<ConcurrentLayer>,
131    pub ev_emit_mod: Option<ConcurrentLayer>,
132    pub ev_struct_inst: Option<ConcurrentLayer>,
133    pub kv_checkpoints: Option<ConcurrentLayer>,
134    pub kv_epoch_ends: Option<ConcurrentLayer>,
135    pub kv_epoch_starts: Option<ConcurrentLayer>,
136    pub kv_feature_flags: Option<ConcurrentLayer>,
137    pub kv_objects: Option<ConcurrentLayer>,
138    pub kv_packages: Option<ConcurrentLayer>,
139    pub kv_protocol_configs: Option<ConcurrentLayer>,
140    pub kv_transactions: Option<ConcurrentLayer>,
141    pub obj_versions: Option<ConcurrentLayer>,
142    pub tx_affected_addresses: Option<ConcurrentLayer>,
143    pub tx_affected_objects: Option<ConcurrentLayer>,
144    pub tx_balance_changes: Option<ConcurrentLayer>,
145    pub tx_calls: Option<ConcurrentLayer>,
146    pub tx_digests: Option<ConcurrentLayer>,
147    pub tx_kinds: Option<ConcurrentLayer>,
148}
149
150impl IndexerConfig {
151    /// Generate an example configuration, suitable for demonstrating the fields available to
152    /// configure.
153    pub fn example() -> Self {
154        let mut example: Self = Default::default();
155
156        example.ingestion = IngestionConfig::default().into();
157        example.committer = CommitterConfig::default().into();
158        example.pruner = PrunerConfig::default().into();
159        example.pipeline = PipelineLayer::example();
160
161        example
162    }
163
164    /// Generate a configuration suitable for testing. This is the same as the example
165    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
166    /// less time waiting.
167    pub fn for_test() -> Self {
168        Self::example()
169            .merge(IndexerConfig {
170                ingestion: IngestionLayer {
171                    retry_interval_ms: Some(10),
172                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
173                    ..Default::default()
174                },
175                committer: CommitterLayer {
176                    collect_interval_ms: Some(50),
177                    watermark_interval_ms: Some(50),
178                    write_concurrency: Some(1),
179                },
180                pruner: PrunerLayer {
181                    interval_ms: Some(50),
182                    delay_ms: Some(0),
183                    ..Default::default()
184                },
185                ..Default::default()
186            })
187            .expect("Merge failed for test configuration")
188    }
189}
190
191impl IngestionLayer {
192    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
193        if self.checkpoint_buffer_size.is_some() {
194            warn!(
195                "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
196                 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
197                 section if you need to override the default."
198            );
199        }
200
201        Ok(IngestionConfig {
202            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
203            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
204            streaming_backoff_initial_batch_size: self
205                .streaming_backoff_initial_batch_size
206                .unwrap_or(base.streaming_backoff_initial_batch_size),
207            streaming_backoff_max_batch_size: self
208                .streaming_backoff_max_batch_size
209                .unwrap_or(base.streaming_backoff_max_batch_size),
210            streaming_connection_timeout_ms: self
211                .streaming_connection_timeout_ms
212                .unwrap_or(base.streaming_connection_timeout_ms),
213            streaming_statement_timeout_ms: self
214                .streaming_statement_timeout_ms
215                .unwrap_or(base.streaming_statement_timeout_ms),
216        })
217    }
218}
219
220impl SequentialLayer {
221    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
222        Ok(SequentialConfig {
223            committer: if let Some(committer) = self.committer {
224                committer.finish(base.committer)?
225            } else {
226                base.committer
227            },
228            ingestion: if let Some(ingestion) = self.ingestion {
229                ingestion.finish(base.ingestion)
230            } else {
231                base.ingestion
232            },
233            fanout: self.fanout.or(base.fanout),
234            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
235            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
236            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
237        })
238    }
239}
240
241impl ConcurrentLayer {
242    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
243    /// appear in the layer *and* in the base.
244    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
245        Ok(ConcurrentConfig {
246            committer: if let Some(committer) = self.committer {
247                committer.finish(base.committer)?
248            } else {
249                base.committer
250            },
251            ingestion: if let Some(ingestion) = self.ingestion {
252                ingestion.finish(base.ingestion)
253            } else {
254                base.ingestion
255            },
256            pruner: match (self.pruner, base.pruner) {
257                (None, _) | (_, None) => None,
258                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
259            },
260            fanout: self.fanout.or(base.fanout),
261            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
262            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
263            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
264            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
265            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
266            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
267        })
268    }
269}
270
271impl PipelineIngestionLayer {
272    pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
273        pipeline::IngestionConfig {
274            subscriber_channel_size: self
275                .subscriber_channel_size
276                .or(base.subscriber_channel_size),
277        }
278    }
279}
280
281impl CommitterLayer {
282    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
283        Ok(CommitterConfig {
284            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
285            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
286            watermark_interval_ms: self
287                .watermark_interval_ms
288                .unwrap_or(base.watermark_interval_ms),
289            watermark_interval_jitter_ms: 0,
290        })
291    }
292}
293
294impl PrunerLayer {
295    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
296        Ok(PrunerConfig {
297            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
298            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
299            retention: self.retention.unwrap_or(base.retention),
300            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
301            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
302        })
303    }
304}
305
306impl PipelineLayer {
307    /// Generate an example configuration, suitable for demonstrating the fields available to
308    /// configure.
309    pub fn example() -> Self {
310        PipelineLayer {
311            cp_blooms: Some(Default::default()),
312            cp_bloom_blocks: Some(Default::default()),
313            sum_displays: Some(Default::default()),
314            cp_sequence_numbers: Some(Default::default()),
315            ev_emit_mod: Some(Default::default()),
316            ev_struct_inst: Some(Default::default()),
317            kv_checkpoints: Some(Default::default()),
318            kv_epoch_ends: Some(Default::default()),
319            kv_epoch_starts: Some(Default::default()),
320            kv_feature_flags: Some(Default::default()),
321            kv_objects: Some(Default::default()),
322            kv_packages: Some(Default::default()),
323            kv_protocol_configs: Some(Default::default()),
324            kv_transactions: Some(Default::default()),
325            obj_versions: Some(Default::default()),
326            tx_affected_addresses: Some(Default::default()),
327            tx_affected_objects: Some(Default::default()),
328            tx_balance_changes: Some(Default::default()),
329            tx_calls: Some(Default::default()),
330            tx_digests: Some(Default::default()),
331            tx_kinds: Some(Default::default()),
332        }
333    }
334}
335
336impl Merge for IndexerConfig {
337    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
338        Ok(IndexerConfig {
339            ingestion: self.ingestion.merge(other.ingestion)?,
340            committer: self.committer.merge(other.committer)?,
341            pruner: self.pruner.merge(other.pruner)?,
342            pipeline: self.pipeline.merge(other.pipeline)?,
343        })
344    }
345}
346
347impl Merge for IngestionLayer {
348    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
349        Ok(IngestionLayer {
350            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
351            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
352            streaming_backoff_initial_batch_size: other
353                .streaming_backoff_initial_batch_size
354                .or(self.streaming_backoff_initial_batch_size),
355            streaming_backoff_max_batch_size: other
356                .streaming_backoff_max_batch_size
357                .or(self.streaming_backoff_max_batch_size),
358            streaming_connection_timeout_ms: other
359                .streaming_connection_timeout_ms
360                .or(self.streaming_connection_timeout_ms),
361            streaming_statement_timeout_ms: other
362                .streaming_statement_timeout_ms
363                .or(self.streaming_statement_timeout_ms),
364            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
365        })
366    }
367}
368
369impl Merge for SequentialLayer {
370    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
371        Ok(SequentialLayer {
372            committer: self.committer.merge(other.committer)?,
373            ingestion: self.ingestion.merge(other.ingestion)?,
374            fanout: other.fanout.or(self.fanout),
375            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
376            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
377            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
378        })
379    }
380}
381
382impl Merge for ConcurrentLayer {
383    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
384        Ok(ConcurrentLayer {
385            committer: self.committer.merge(other.committer)?,
386            ingestion: self.ingestion.merge(other.ingestion)?,
387            pruner: self.pruner.merge(other.pruner)?,
388            fanout: other.fanout.or(self.fanout),
389            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
390            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
391            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
392            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
393            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
394            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
395        })
396    }
397}
398
399impl Merge for PipelineIngestionLayer {
400    fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
401        Ok(PipelineIngestionLayer {
402            subscriber_channel_size: other
403                .subscriber_channel_size
404                .or(self.subscriber_channel_size),
405        })
406    }
407}
408
409impl Merge for CommitterLayer {
410    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
411        Ok(CommitterLayer {
412            write_concurrency: other.write_concurrency.or(self.write_concurrency),
413            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
414            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
415        })
416    }
417}
418
419impl Merge for PrunerLayer {
420    /// Last write takes precedence for all fields except the `retention`, which takes the max of
421    /// all available values.
422    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
423        Ok(PrunerLayer {
424            interval_ms: other.interval_ms.or(self.interval_ms),
425            delay_ms: other.delay_ms.or(self.delay_ms),
426            retention: match (other.retention, self.retention) {
427                (Some(a), Some(b)) => Some(a.max(b)),
428                (Some(a), _) | (_, Some(a)) => Some(a),
429                (None, None) => None,
430            },
431            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
432            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
433        })
434    }
435}
436
437impl Merge for PipelineLayer {
438    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
439        Ok(PipelineLayer {
440            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
441            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
442            sum_displays: self.sum_displays.merge(other.sum_displays)?,
443            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
444            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
445            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
446            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
447            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
448            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
449            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
450            kv_objects: self.kv_objects.merge(other.kv_objects)?,
451            kv_packages: self.kv_packages.merge(other.kv_packages)?,
452            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
453            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
454            obj_versions: self.obj_versions.merge(other.obj_versions)?,
455            tx_affected_addresses: self
456                .tx_affected_addresses
457                .merge(other.tx_affected_addresses)?,
458            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
459            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
460            tx_calls: self.tx_calls.merge(other.tx_calls)?,
461            tx_digests: self.tx_digests.merge(other.tx_digests)?,
462            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
463        })
464    }
465}
466
467impl<T: Merge> Merge for Option<T> {
468    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
469        Ok(match (self, other) {
470            (Some(a), Some(b)) => Some(a.merge(b)?),
471            (Some(a), _) | (_, Some(a)) => Some(a),
472            (None, None) => None,
473        })
474    }
475}
476
477impl From<IngestionConfig> for IngestionLayer {
478    fn from(config: IngestionConfig) -> Self {
479        Self {
480            ingest_concurrency: Some(config.ingest_concurrency),
481            retry_interval_ms: Some(config.retry_interval_ms),
482            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
483            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
484            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
485            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
486            checkpoint_buffer_size: None,
487        }
488    }
489}
490
491impl From<SequentialConfig> for SequentialLayer {
492    fn from(config: SequentialConfig) -> Self {
493        Self {
494            committer: Some(config.committer.into()),
495            ingestion: Some(config.ingestion.into()),
496            fanout: config.fanout,
497            min_eager_rows: config.min_eager_rows,
498            max_batch_checkpoints: config.max_batch_checkpoints,
499            processor_channel_size: config.processor_channel_size,
500        }
501    }
502}
503
504impl From<ConcurrentConfig> for ConcurrentLayer {
505    fn from(config: ConcurrentConfig) -> Self {
506        Self {
507            committer: Some(config.committer.into()),
508            ingestion: Some(config.ingestion.into()),
509            pruner: config.pruner.map(Into::into),
510            fanout: config.fanout,
511            min_eager_rows: config.min_eager_rows,
512            max_pending_rows: config.max_pending_rows,
513            max_watermark_updates: config.max_watermark_updates,
514            processor_channel_size: config.processor_channel_size,
515            collector_channel_size: config.collector_channel_size,
516            committer_channel_size: config.committer_channel_size,
517        }
518    }
519}
520
521impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
522    fn from(config: pipeline::IngestionConfig) -> Self {
523        Self {
524            subscriber_channel_size: config.subscriber_channel_size,
525        }
526    }
527}
528
529impl From<CommitterConfig> for CommitterLayer {
530    fn from(config: CommitterConfig) -> Self {
531        Self {
532            write_concurrency: Some(config.write_concurrency),
533            collect_interval_ms: Some(config.collect_interval_ms),
534            watermark_interval_ms: Some(config.watermark_interval_ms),
535        }
536    }
537}
538
539impl From<PrunerConfig> for PrunerLayer {
540    fn from(config: PrunerConfig) -> Self {
541        Self {
542            interval_ms: Some(config.interval_ms),
543            delay_ms: Some(config.delay_ms),
544            retention: Some(config.retention),
545            max_chunk_size: Some(config.max_chunk_size),
546            prune_concurrency: Some(config.prune_concurrency),
547        }
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    macro_rules! assert_matches {
556        ($value:expr, $pattern:pat $(,)?) => {
557            let value = $value;
558            assert!(
559                matches!(value, $pattern),
560                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
561                stringify!($pattern)
562            );
563        };
564    }
565
566    #[test]
567    fn merge_recursive() {
568        let this = PipelineLayer {
569            sum_displays: Some(SequentialLayer {
570                committer: Some(CommitterLayer {
571                    write_concurrency: Some(10),
572                    collect_interval_ms: Some(1000),
573                    watermark_interval_ms: None,
574                }),
575                min_eager_rows: Some(100),
576                ..Default::default()
577            }),
578            ev_emit_mod: Some(ConcurrentLayer {
579                committer: Some(CommitterLayer {
580                    write_concurrency: Some(5),
581                    collect_interval_ms: Some(500),
582                    watermark_interval_ms: None,
583                }),
584                ..Default::default()
585            }),
586            ..Default::default()
587        };
588
589        let that = PipelineLayer {
590            sum_displays: Some(SequentialLayer {
591                committer: Some(CommitterLayer {
592                    write_concurrency: Some(5),
593                    collect_interval_ms: None,
594                    watermark_interval_ms: Some(500),
595                }),
596                min_eager_rows: Some(200),
597                ..Default::default()
598            }),
599            ev_emit_mod: None,
600            ..Default::default()
601        };
602
603        let this_then_that = this.clone().merge(that.clone()).unwrap();
604        let that_then_this = that.clone().merge(this.clone()).unwrap();
605
606        assert_matches!(
607            this_then_that,
608            PipelineLayer {
609                sum_displays: Some(SequentialLayer {
610                    committer: Some(CommitterLayer {
611                        write_concurrency: Some(5),
612                        collect_interval_ms: Some(1000),
613                        watermark_interval_ms: Some(500),
614                        ..
615                    }),
616                    min_eager_rows: Some(200),
617                    ..
618                }),
619                ev_emit_mod: Some(ConcurrentLayer {
620                    committer: Some(CommitterLayer {
621                        write_concurrency: Some(5),
622                        collect_interval_ms: Some(500),
623                        watermark_interval_ms: None,
624                        ..
625                    }),
626                    pruner: None,
627                    ..
628                }),
629                ..
630            },
631        );
632
633        assert_matches!(
634            that_then_this,
635            PipelineLayer {
636                sum_displays: Some(SequentialLayer {
637                    committer: Some(CommitterLayer {
638                        write_concurrency: Some(10),
639                        collect_interval_ms: Some(1000),
640                        watermark_interval_ms: Some(500),
641                        ..
642                    }),
643                    min_eager_rows: Some(100),
644                    ..
645                }),
646                ev_emit_mod: Some(ConcurrentLayer {
647                    committer: Some(CommitterLayer {
648                        write_concurrency: Some(5),
649                        collect_interval_ms: Some(500),
650                        watermark_interval_ms: None,
651                        ..
652                    }),
653                    pruner: None,
654                    ..
655                }),
656                ..
657            },
658        );
659    }
660
661    #[test]
662    fn merge_pruner() {
663        let this = PrunerLayer {
664            interval_ms: None,
665            delay_ms: Some(100),
666            retention: Some(200),
667            max_chunk_size: Some(300),
668            prune_concurrency: Some(1),
669        };
670
671        let that = PrunerLayer {
672            interval_ms: Some(400),
673            delay_ms: None,
674            retention: Some(500),
675            max_chunk_size: Some(600),
676            prune_concurrency: Some(2),
677        };
678
679        let this_then_that = this.clone().merge(that.clone()).unwrap();
680        let that_then_this = that.clone().merge(this.clone()).unwrap();
681
682        assert_matches!(
683            this_then_that,
684            PrunerLayer {
685                interval_ms: Some(400),
686                delay_ms: Some(100),
687                retention: Some(500),
688                max_chunk_size: Some(600),
689                prune_concurrency: Some(2),
690            },
691        );
692
693        assert_matches!(
694            that_then_this,
695            PrunerLayer {
696                interval_ms: Some(400),
697                delay_ms: Some(100),
698                retention: Some(500),
699                max_chunk_size: Some(300),
700                prune_concurrency: Some(1),
701            },
702        );
703    }
704
705    #[test]
706    fn finish_concurrent_unpruned_override() {
707        let layer = ConcurrentLayer {
708            committer: None,
709            pruner: None,
710            ..Default::default()
711        };
712
713        let base = ConcurrentConfig {
714            committer: CommitterConfig {
715                write_concurrency: 5,
716                collect_interval_ms: 50,
717                watermark_interval_ms: 500,
718                ..Default::default()
719            },
720            pruner: Some(PrunerConfig::default()),
721            ..Default::default()
722        };
723
724        assert_matches!(
725            layer.finish(base).unwrap(),
726            ConcurrentConfig {
727                committer: CommitterConfig {
728                    write_concurrency: 5,
729                    collect_interval_ms: 50,
730                    watermark_interval_ms: 500,
731                    ..
732                },
733                pruner: None,
734                ..
735            },
736        );
737    }
738
739    #[test]
740    fn finish_concurrent_no_pruner() {
741        let layer = ConcurrentLayer {
742            committer: None,
743            pruner: None,
744            ..Default::default()
745        };
746
747        let base = ConcurrentConfig {
748            committer: CommitterConfig {
749                write_concurrency: 5,
750                collect_interval_ms: 50,
751                watermark_interval_ms: 500,
752                ..Default::default()
753            },
754            pruner: None,
755            ..Default::default()
756        };
757
758        assert_matches!(
759            layer.finish(base).unwrap(),
760            ConcurrentConfig {
761                committer: CommitterConfig {
762                    write_concurrency: 5,
763                    collect_interval_ms: 50,
764                    watermark_interval_ms: 500,
765                    ..
766                },
767                pruner: None,
768                ..
769            },
770        );
771    }
772
773    #[test]
774    fn finish_concurrent_pruner() {
775        let layer = ConcurrentLayer {
776            committer: None,
777            pruner: Some(PrunerLayer {
778                interval_ms: Some(1000),
779                ..Default::default()
780            }),
781            ..Default::default()
782        };
783
784        let base = ConcurrentConfig {
785            committer: CommitterConfig {
786                write_concurrency: 5,
787                collect_interval_ms: 50,
788                watermark_interval_ms: 500,
789                ..Default::default()
790            },
791            pruner: Some(PrunerConfig {
792                interval_ms: 100,
793                delay_ms: 200,
794                retention: 300,
795                max_chunk_size: 400,
796                prune_concurrency: 1,
797            }),
798            ..Default::default()
799        };
800
801        assert_matches!(
802            layer.finish(base).unwrap(),
803            ConcurrentConfig {
804                committer: CommitterConfig {
805                    write_concurrency: 5,
806                    collect_interval_ms: 50,
807                    watermark_interval_ms: 500,
808                    ..
809                },
810                pruner: Some(PrunerConfig {
811                    interval_ms: 1000,
812                    delay_ms: 200,
813                    retention: 300,
814                    max_chunk_size: 400,
815                    prune_concurrency: 1,
816                }),
817                ..
818            },
819        );
820    }
821
822    #[test]
823    fn detect_unrecognized_fields() {
824        let err = toml::from_str::<IndexerConfig>(
825            r#"
826            i_dont_exist = "foo"
827            "#,
828        )
829        .unwrap_err();
830
831        assert!(
832            err.to_string().contains("i_dont_exist"),
833            "Unexpected error: {err}"
834        );
835    }
836
837    #[test]
838    fn deprecated_checkpoint_buffer_size_parses() {
839        let config: IndexerConfig = toml::from_str(
840            r#"
841            [ingestion]
842            checkpoint-buffer-size = 5000
843            "#,
844        )
845        .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
846
847        assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
848    }
849}