sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline;
8use sui_indexer_alt_framework::pipeline::CommitterConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
10use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
11use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
12use tracing::warn;
13
14/// Trait for merging configuration structs together.
15pub trait Merge: Sized {
16    fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23    /// How checkpoints are read by the indexer.
24    pub ingestion: IngestionLayer,
25
26    /// Default configuration for committers that is shared by all pipelines. Pipelines can
27    /// override individual settings in their own configuration sections.
28    pub committer: CommitterLayer,
29
30    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
31    /// override individual settings in their own configuration sections. Concurrent pipelines
32    /// still need to specify a pruner configuration (although it can be empty) to indicate that
33    /// they want to enable pruning, but when they do, any missing values will be filled in by this
34    /// config.
35    pub pruner: PrunerLayer,
36
37    /// Per-pipeline configurations.
38    pub pipeline: PipelineLayer,
39}
40
41// Configuration layers apply overrides over a base configuration. When reading configs from a
42// file, we read them into layer types, and then apply those layers onto an existing configuration
43// (such as the default configuration) to `finish()` them.
44//
45// Treating configs as layers allows us to support configuration merging, where multiple
46// configuration files can be combined into one final configuration. Having a separate type for
47// reading configs also allows us to detect and warn against unrecognised fields.
48
49#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53    pub ingest_concurrency: Option<ConcurrencyConfig>,
54    pub retry_interval_ms: Option<u64>,
55    pub streaming_backoff_initial_batch_size: Option<usize>,
56    pub streaming_backoff_max_batch_size: Option<usize>,
57    pub streaming_connection_timeout_ms: Option<u64>,
58    pub streaming_statement_timeout_ms: Option<u64>,
59
60    /// Deprecated: accepted (and ignored) so old configs don't fail to parse. Replaced by
61    /// per-pipeline `ingestion.subscriber-channel-size`.
62    pub checkpoint_buffer_size: Option<usize>,
63}
64
65#[DefaultConfig]
66#[derive(Clone, Default, Debug)]
67#[serde(deny_unknown_fields)]
68pub struct SequentialLayer {
69    pub committer: Option<CommitterLayer>,
70    pub ingestion: Option<PipelineIngestionLayer>,
71    pub fanout: Option<ConcurrencyConfig>,
72    pub min_eager_rows: Option<usize>,
73    pub max_pending_rows: Option<usize>,
74    pub max_batch_checkpoints: Option<usize>,
75    pub processor_channel_size: Option<usize>,
76    pub pipeline_depth: Option<usize>,
77}
78
79#[DefaultConfig]
80#[derive(Clone, Default, Debug)]
81#[serde(deny_unknown_fields)]
82pub struct ConcurrentLayer {
83    pub committer: Option<CommitterLayer>,
84    pub ingestion: Option<PipelineIngestionLayer>,
85    pub pruner: Option<PrunerLayer>,
86    pub fanout: Option<ConcurrencyConfig>,
87    pub min_eager_rows: Option<usize>,
88    pub max_pending_rows: Option<usize>,
89    pub max_watermark_updates: Option<usize>,
90    pub processor_channel_size: Option<usize>,
91    pub collector_channel_size: Option<usize>,
92    pub committer_channel_size: Option<usize>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(deny_unknown_fields)]
98pub struct PipelineIngestionLayer {
99    pub subscriber_channel_size: Option<usize>,
100}
101
102#[DefaultConfig]
103#[derive(Clone, Default, Debug)]
104#[serde(deny_unknown_fields)]
105pub struct CommitterLayer {
106    pub write_concurrency: Option<usize>,
107    pub collect_interval_ms: Option<u64>,
108    pub watermark_interval_ms: Option<u64>,
109}
110
111#[DefaultConfig]
112#[derive(Clone, Default, Debug)]
113#[serde(deny_unknown_fields)]
114pub struct PrunerLayer {
115    pub interval_ms: Option<u64>,
116    pub delay_ms: Option<u64>,
117    pub retention: Option<u64>,
118    pub max_chunk_size: Option<u64>,
119    pub prune_concurrency: Option<u64>,
120}
121
122#[DefaultConfig]
123#[derive(Clone, Default, Debug)]
124#[serde(rename_all = "snake_case", deny_unknown_fields)]
125pub struct PipelineLayer {
126    // Sequential pipelines
127    pub sum_displays: Option<SequentialLayer>,
128
129    // All concurrent pipelines
130    pub cp_bloom_blocks: Option<ConcurrentLayer>,
131    pub cp_blooms: Option<ConcurrentLayer>,
132    pub cp_sequence_numbers: Option<ConcurrentLayer>,
133    pub ev_emit_mod: Option<ConcurrentLayer>,
134    pub ev_struct_inst: Option<ConcurrentLayer>,
135    pub kv_checkpoints: Option<ConcurrentLayer>,
136    pub kv_epoch_ends: Option<ConcurrentLayer>,
137    pub kv_epoch_starts: Option<ConcurrentLayer>,
138    pub kv_feature_flags: Option<ConcurrentLayer>,
139    pub kv_objects: Option<ConcurrentLayer>,
140    pub kv_packages: Option<ConcurrentLayer>,
141    pub kv_protocol_configs: Option<ConcurrentLayer>,
142    pub kv_transactions: Option<ConcurrentLayer>,
143    pub obj_versions: Option<ConcurrentLayer>,
144    pub tx_affected_addresses: Option<ConcurrentLayer>,
145    pub tx_affected_objects: Option<ConcurrentLayer>,
146    pub tx_balance_changes: Option<ConcurrentLayer>,
147    pub tx_calls: Option<ConcurrentLayer>,
148    pub tx_digests: Option<ConcurrentLayer>,
149    pub tx_kinds: Option<ConcurrentLayer>,
150}
151
152impl IndexerConfig {
153    /// Generate an example configuration, suitable for demonstrating the fields available to
154    /// configure.
155    pub fn example() -> Self {
156        let mut example: Self = Default::default();
157
158        example.ingestion = IngestionConfig::default().into();
159        example.committer = CommitterConfig::default().into();
160        example.pruner = PrunerConfig::default().into();
161        example.pipeline = PipelineLayer::example();
162
163        example
164    }
165
166    /// Generate a configuration suitable for testing. This is the same as the example
167    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
168    /// less time waiting.
169    pub fn for_test() -> Self {
170        Self::example()
171            .merge(IndexerConfig {
172                ingestion: IngestionLayer {
173                    retry_interval_ms: Some(10),
174                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
175                    ..Default::default()
176                },
177                committer: CommitterLayer {
178                    collect_interval_ms: Some(50),
179                    watermark_interval_ms: Some(50),
180                    write_concurrency: Some(1),
181                },
182                pruner: PrunerLayer {
183                    interval_ms: Some(50),
184                    delay_ms: Some(0),
185                    ..Default::default()
186                },
187                ..Default::default()
188            })
189            .expect("Merge failed for test configuration")
190    }
191}
192
193impl IngestionLayer {
194    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
195        if self.checkpoint_buffer_size.is_some() {
196            warn!(
197                "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
198                 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
199                 section if you need to override the default."
200            );
201        }
202
203        Ok(IngestionConfig {
204            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
205            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
206            streaming_backoff_initial_batch_size: self
207                .streaming_backoff_initial_batch_size
208                .unwrap_or(base.streaming_backoff_initial_batch_size),
209            streaming_backoff_max_batch_size: self
210                .streaming_backoff_max_batch_size
211                .unwrap_or(base.streaming_backoff_max_batch_size),
212            streaming_connection_timeout_ms: self
213                .streaming_connection_timeout_ms
214                .unwrap_or(base.streaming_connection_timeout_ms),
215            streaming_statement_timeout_ms: self
216                .streaming_statement_timeout_ms
217                .unwrap_or(base.streaming_statement_timeout_ms),
218        })
219    }
220}
221
222impl SequentialLayer {
223    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
224        Ok(SequentialConfig {
225            committer: if let Some(committer) = self.committer {
226                committer.finish(base.committer)?
227            } else {
228                base.committer
229            },
230            ingestion: if let Some(ingestion) = self.ingestion {
231                ingestion.finish(base.ingestion)
232            } else {
233                base.ingestion
234            },
235            fanout: self.fanout.or(base.fanout),
236            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
237            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
238            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
239            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
240            pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
241        })
242    }
243}
244
245impl ConcurrentLayer {
246    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
247    /// appear in the layer *and* in the base.
248    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
249        Ok(ConcurrentConfig {
250            committer: if let Some(committer) = self.committer {
251                committer.finish(base.committer)?
252            } else {
253                base.committer
254            },
255            ingestion: if let Some(ingestion) = self.ingestion {
256                ingestion.finish(base.ingestion)
257            } else {
258                base.ingestion
259            },
260            pruner: match (self.pruner, base.pruner) {
261                (None, _) | (_, None) => None,
262                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
263            },
264            fanout: self.fanout.or(base.fanout),
265            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
266            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
267            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
268            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
269            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
270            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
271        })
272    }
273}
274
275impl PipelineIngestionLayer {
276    pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
277        pipeline::IngestionConfig {
278            subscriber_channel_size: self
279                .subscriber_channel_size
280                .or(base.subscriber_channel_size),
281        }
282    }
283}
284
285impl CommitterLayer {
286    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
287        Ok(CommitterConfig {
288            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
289            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
290            watermark_interval_ms: self
291                .watermark_interval_ms
292                .unwrap_or(base.watermark_interval_ms),
293            watermark_interval_jitter_ms: 0,
294        })
295    }
296}
297
298impl PrunerLayer {
299    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
300        Ok(PrunerConfig {
301            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
302            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
303            retention: self.retention.unwrap_or(base.retention),
304            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
305            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
306        })
307    }
308}
309
310impl PipelineLayer {
311    /// Generate an example configuration, suitable for demonstrating the fields available to
312    /// configure.
313    pub fn example() -> Self {
314        PipelineLayer {
315            cp_blooms: Some(Default::default()),
316            cp_bloom_blocks: Some(Default::default()),
317            sum_displays: Some(Default::default()),
318            cp_sequence_numbers: Some(Default::default()),
319            ev_emit_mod: Some(Default::default()),
320            ev_struct_inst: Some(Default::default()),
321            kv_checkpoints: Some(Default::default()),
322            kv_epoch_ends: Some(Default::default()),
323            kv_epoch_starts: Some(Default::default()),
324            kv_feature_flags: Some(Default::default()),
325            kv_objects: Some(Default::default()),
326            kv_packages: Some(Default::default()),
327            kv_protocol_configs: Some(Default::default()),
328            kv_transactions: Some(Default::default()),
329            obj_versions: Some(Default::default()),
330            tx_affected_addresses: Some(Default::default()),
331            tx_affected_objects: Some(Default::default()),
332            tx_balance_changes: Some(Default::default()),
333            tx_calls: Some(Default::default()),
334            tx_digests: Some(Default::default()),
335            tx_kinds: Some(Default::default()),
336        }
337    }
338}
339
340impl Merge for IndexerConfig {
341    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
342        Ok(IndexerConfig {
343            ingestion: self.ingestion.merge(other.ingestion)?,
344            committer: self.committer.merge(other.committer)?,
345            pruner: self.pruner.merge(other.pruner)?,
346            pipeline: self.pipeline.merge(other.pipeline)?,
347        })
348    }
349}
350
351impl Merge for IngestionLayer {
352    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
353        Ok(IngestionLayer {
354            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
355            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
356            streaming_backoff_initial_batch_size: other
357                .streaming_backoff_initial_batch_size
358                .or(self.streaming_backoff_initial_batch_size),
359            streaming_backoff_max_batch_size: other
360                .streaming_backoff_max_batch_size
361                .or(self.streaming_backoff_max_batch_size),
362            streaming_connection_timeout_ms: other
363                .streaming_connection_timeout_ms
364                .or(self.streaming_connection_timeout_ms),
365            streaming_statement_timeout_ms: other
366                .streaming_statement_timeout_ms
367                .or(self.streaming_statement_timeout_ms),
368            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
369        })
370    }
371}
372
373impl Merge for SequentialLayer {
374    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
375        Ok(SequentialLayer {
376            committer: self.committer.merge(other.committer)?,
377            ingestion: self.ingestion.merge(other.ingestion)?,
378            fanout: other.fanout.or(self.fanout),
379            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
380            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
381            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
382            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
383            pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
384        })
385    }
386}
387
388impl Merge for ConcurrentLayer {
389    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
390        Ok(ConcurrentLayer {
391            committer: self.committer.merge(other.committer)?,
392            ingestion: self.ingestion.merge(other.ingestion)?,
393            pruner: self.pruner.merge(other.pruner)?,
394            fanout: other.fanout.or(self.fanout),
395            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
396            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
397            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
398            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
399            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
400            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
401        })
402    }
403}
404
405impl Merge for PipelineIngestionLayer {
406    fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
407        Ok(PipelineIngestionLayer {
408            subscriber_channel_size: other
409                .subscriber_channel_size
410                .or(self.subscriber_channel_size),
411        })
412    }
413}
414
415impl Merge for CommitterLayer {
416    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
417        Ok(CommitterLayer {
418            write_concurrency: other.write_concurrency.or(self.write_concurrency),
419            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
420            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
421        })
422    }
423}
424
425impl Merge for PrunerLayer {
426    /// Last write takes precedence for all fields except the `retention`, which takes the max of
427    /// all available values.
428    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
429        Ok(PrunerLayer {
430            interval_ms: other.interval_ms.or(self.interval_ms),
431            delay_ms: other.delay_ms.or(self.delay_ms),
432            retention: match (other.retention, self.retention) {
433                (Some(a), Some(b)) => Some(a.max(b)),
434                (Some(a), _) | (_, Some(a)) => Some(a),
435                (None, None) => None,
436            },
437            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
438            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
439        })
440    }
441}
442
443impl Merge for PipelineLayer {
444    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
445        Ok(PipelineLayer {
446            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
447            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
448            sum_displays: self.sum_displays.merge(other.sum_displays)?,
449            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
450            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
451            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
452            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
453            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
454            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
455            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
456            kv_objects: self.kv_objects.merge(other.kv_objects)?,
457            kv_packages: self.kv_packages.merge(other.kv_packages)?,
458            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
459            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
460            obj_versions: self.obj_versions.merge(other.obj_versions)?,
461            tx_affected_addresses: self
462                .tx_affected_addresses
463                .merge(other.tx_affected_addresses)?,
464            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
465            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
466            tx_calls: self.tx_calls.merge(other.tx_calls)?,
467            tx_digests: self.tx_digests.merge(other.tx_digests)?,
468            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
469        })
470    }
471}
472
473impl<T: Merge> Merge for Option<T> {
474    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
475        Ok(match (self, other) {
476            (Some(a), Some(b)) => Some(a.merge(b)?),
477            (Some(a), _) | (_, Some(a)) => Some(a),
478            (None, None) => None,
479        })
480    }
481}
482
483impl From<IngestionConfig> for IngestionLayer {
484    fn from(config: IngestionConfig) -> Self {
485        Self {
486            ingest_concurrency: Some(config.ingest_concurrency),
487            retry_interval_ms: Some(config.retry_interval_ms),
488            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
489            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
490            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
491            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
492            checkpoint_buffer_size: None,
493        }
494    }
495}
496
497impl From<SequentialConfig> for SequentialLayer {
498    fn from(config: SequentialConfig) -> Self {
499        Self {
500            committer: Some(config.committer.into()),
501            ingestion: Some(config.ingestion.into()),
502            fanout: config.fanout,
503            min_eager_rows: config.min_eager_rows,
504            max_pending_rows: config.max_pending_rows,
505            max_batch_checkpoints: config.max_batch_checkpoints,
506            processor_channel_size: config.processor_channel_size,
507            pipeline_depth: config.pipeline_depth,
508        }
509    }
510}
511
512impl From<ConcurrentConfig> for ConcurrentLayer {
513    fn from(config: ConcurrentConfig) -> Self {
514        Self {
515            committer: Some(config.committer.into()),
516            ingestion: Some(config.ingestion.into()),
517            pruner: config.pruner.map(Into::into),
518            fanout: config.fanout,
519            min_eager_rows: config.min_eager_rows,
520            max_pending_rows: config.max_pending_rows,
521            max_watermark_updates: config.max_watermark_updates,
522            processor_channel_size: config.processor_channel_size,
523            collector_channel_size: config.collector_channel_size,
524            committer_channel_size: config.committer_channel_size,
525        }
526    }
527}
528
529impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
530    fn from(config: pipeline::IngestionConfig) -> Self {
531        Self {
532            subscriber_channel_size: config.subscriber_channel_size,
533        }
534    }
535}
536
537impl From<CommitterConfig> for CommitterLayer {
538    fn from(config: CommitterConfig) -> Self {
539        Self {
540            write_concurrency: Some(config.write_concurrency),
541            collect_interval_ms: Some(config.collect_interval_ms),
542            watermark_interval_ms: Some(config.watermark_interval_ms),
543        }
544    }
545}
546
547impl From<PrunerConfig> for PrunerLayer {
548    fn from(config: PrunerConfig) -> Self {
549        Self {
550            interval_ms: Some(config.interval_ms),
551            delay_ms: Some(config.delay_ms),
552            retention: Some(config.retention),
553            max_chunk_size: Some(config.max_chunk_size),
554            prune_concurrency: Some(config.prune_concurrency),
555        }
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562
563    macro_rules! assert_matches {
564        ($value:expr, $pattern:pat $(,)?) => {
565            let value = $value;
566            assert!(
567                matches!(value, $pattern),
568                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
569                stringify!($pattern)
570            );
571        };
572    }
573
574    #[test]
575    fn merge_recursive() {
576        let this = PipelineLayer {
577            sum_displays: Some(SequentialLayer {
578                committer: Some(CommitterLayer {
579                    write_concurrency: Some(10),
580                    collect_interval_ms: Some(1000),
581                    watermark_interval_ms: None,
582                }),
583                min_eager_rows: Some(100),
584                ..Default::default()
585            }),
586            ev_emit_mod: Some(ConcurrentLayer {
587                committer: Some(CommitterLayer {
588                    write_concurrency: Some(5),
589                    collect_interval_ms: Some(500),
590                    watermark_interval_ms: None,
591                }),
592                ..Default::default()
593            }),
594            ..Default::default()
595        };
596
597        let that = PipelineLayer {
598            sum_displays: Some(SequentialLayer {
599                committer: Some(CommitterLayer {
600                    write_concurrency: Some(5),
601                    collect_interval_ms: None,
602                    watermark_interval_ms: Some(500),
603                }),
604                min_eager_rows: Some(200),
605                ..Default::default()
606            }),
607            ev_emit_mod: None,
608            ..Default::default()
609        };
610
611        let this_then_that = this.clone().merge(that.clone()).unwrap();
612        let that_then_this = that.clone().merge(this.clone()).unwrap();
613
614        assert_matches!(
615            this_then_that,
616            PipelineLayer {
617                sum_displays: Some(SequentialLayer {
618                    committer: Some(CommitterLayer {
619                        write_concurrency: Some(5),
620                        collect_interval_ms: Some(1000),
621                        watermark_interval_ms: Some(500),
622                        ..
623                    }),
624                    min_eager_rows: Some(200),
625                    ..
626                }),
627                ev_emit_mod: Some(ConcurrentLayer {
628                    committer: Some(CommitterLayer {
629                        write_concurrency: Some(5),
630                        collect_interval_ms: Some(500),
631                        watermark_interval_ms: None,
632                        ..
633                    }),
634                    pruner: None,
635                    ..
636                }),
637                ..
638            },
639        );
640
641        assert_matches!(
642            that_then_this,
643            PipelineLayer {
644                sum_displays: Some(SequentialLayer {
645                    committer: Some(CommitterLayer {
646                        write_concurrency: Some(10),
647                        collect_interval_ms: Some(1000),
648                        watermark_interval_ms: Some(500),
649                        ..
650                    }),
651                    min_eager_rows: Some(100),
652                    ..
653                }),
654                ev_emit_mod: Some(ConcurrentLayer {
655                    committer: Some(CommitterLayer {
656                        write_concurrency: Some(5),
657                        collect_interval_ms: Some(500),
658                        watermark_interval_ms: None,
659                        ..
660                    }),
661                    pruner: None,
662                    ..
663                }),
664                ..
665            },
666        );
667    }
668
669    #[test]
670    fn merge_pruner() {
671        let this = PrunerLayer {
672            interval_ms: None,
673            delay_ms: Some(100),
674            retention: Some(200),
675            max_chunk_size: Some(300),
676            prune_concurrency: Some(1),
677        };
678
679        let that = PrunerLayer {
680            interval_ms: Some(400),
681            delay_ms: None,
682            retention: Some(500),
683            max_chunk_size: Some(600),
684            prune_concurrency: Some(2),
685        };
686
687        let this_then_that = this.clone().merge(that.clone()).unwrap();
688        let that_then_this = that.clone().merge(this.clone()).unwrap();
689
690        assert_matches!(
691            this_then_that,
692            PrunerLayer {
693                interval_ms: Some(400),
694                delay_ms: Some(100),
695                retention: Some(500),
696                max_chunk_size: Some(600),
697                prune_concurrency: Some(2),
698            },
699        );
700
701        assert_matches!(
702            that_then_this,
703            PrunerLayer {
704                interval_ms: Some(400),
705                delay_ms: Some(100),
706                retention: Some(500),
707                max_chunk_size: Some(300),
708                prune_concurrency: Some(1),
709            },
710        );
711    }
712
713    #[test]
714    fn finish_concurrent_unpruned_override() {
715        let layer = ConcurrentLayer {
716            committer: None,
717            pruner: None,
718            ..Default::default()
719        };
720
721        let base = ConcurrentConfig {
722            committer: CommitterConfig {
723                write_concurrency: 5,
724                collect_interval_ms: 50,
725                watermark_interval_ms: 500,
726                ..Default::default()
727            },
728            pruner: Some(PrunerConfig::default()),
729            ..Default::default()
730        };
731
732        assert_matches!(
733            layer.finish(base).unwrap(),
734            ConcurrentConfig {
735                committer: CommitterConfig {
736                    write_concurrency: 5,
737                    collect_interval_ms: 50,
738                    watermark_interval_ms: 500,
739                    ..
740                },
741                pruner: None,
742                ..
743            },
744        );
745    }
746
747    #[test]
748    fn finish_concurrent_no_pruner() {
749        let layer = ConcurrentLayer {
750            committer: None,
751            pruner: None,
752            ..Default::default()
753        };
754
755        let base = ConcurrentConfig {
756            committer: CommitterConfig {
757                write_concurrency: 5,
758                collect_interval_ms: 50,
759                watermark_interval_ms: 500,
760                ..Default::default()
761            },
762            pruner: None,
763            ..Default::default()
764        };
765
766        assert_matches!(
767            layer.finish(base).unwrap(),
768            ConcurrentConfig {
769                committer: CommitterConfig {
770                    write_concurrency: 5,
771                    collect_interval_ms: 50,
772                    watermark_interval_ms: 500,
773                    ..
774                },
775                pruner: None,
776                ..
777            },
778        );
779    }
780
781    #[test]
782    fn finish_concurrent_pruner() {
783        let layer = ConcurrentLayer {
784            committer: None,
785            pruner: Some(PrunerLayer {
786                interval_ms: Some(1000),
787                ..Default::default()
788            }),
789            ..Default::default()
790        };
791
792        let base = ConcurrentConfig {
793            committer: CommitterConfig {
794                write_concurrency: 5,
795                collect_interval_ms: 50,
796                watermark_interval_ms: 500,
797                ..Default::default()
798            },
799            pruner: Some(PrunerConfig {
800                interval_ms: 100,
801                delay_ms: 200,
802                retention: 300,
803                max_chunk_size: 400,
804                prune_concurrency: 1,
805            }),
806            ..Default::default()
807        };
808
809        assert_matches!(
810            layer.finish(base).unwrap(),
811            ConcurrentConfig {
812                committer: CommitterConfig {
813                    write_concurrency: 5,
814                    collect_interval_ms: 50,
815                    watermark_interval_ms: 500,
816                    ..
817                },
818                pruner: Some(PrunerConfig {
819                    interval_ms: 1000,
820                    delay_ms: 200,
821                    retention: 300,
822                    max_chunk_size: 400,
823                    prune_concurrency: 1,
824                }),
825                ..
826            },
827        );
828    }
829
830    #[test]
831    fn detect_unrecognized_fields() {
832        let err = toml::from_str::<IndexerConfig>(
833            r#"
834            i_dont_exist = "foo"
835            "#,
836        )
837        .unwrap_err();
838
839        assert!(
840            err.to_string().contains("i_dont_exist"),
841            "Unexpected error: {err}"
842        );
843    }
844
845    #[test]
846    fn deprecated_checkpoint_buffer_size_parses() {
847        let config: IndexerConfig = toml::from_str(
848            r#"
849            [ingestion]
850            checkpoint-buffer-size = 5000
851            "#,
852        )
853        .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
854
855        assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
856    }
857}