sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::num::NonZeroUsize;
5
6use sui_default_config::DefaultConfig;
7use sui_indexer_alt_framework::config::ConcurrencyConfig;
8use sui_indexer_alt_framework::ingestion::IngestionConfig;
9use sui_indexer_alt_framework::pipeline;
10use sui_indexer_alt_framework::pipeline::CommitterConfig;
11use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
12use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
13use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
14use tracing::warn;
15
16/// Trait for merging configuration structs together.
17pub trait Merge: Sized {
18    fn merge(self, other: Self) -> anyhow::Result<Self>;
19}
20
21#[DefaultConfig]
22#[derive(Clone, Default, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct IndexerConfig {
25    /// How checkpoints are read by the indexer.
26    pub ingestion: IngestionLayer,
27
28    /// Default configuration for committers that is shared by all pipelines. Pipelines can
29    /// override individual settings in their own configuration sections.
30    pub committer: CommitterLayer,
31
32    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
33    /// override individual settings in their own configuration sections. Concurrent pipelines
34    /// still need to specify a pruner configuration (although it can be empty) to indicate that
35    /// they want to enable pruning, but when they do, any missing values will be filled in by this
36    /// config.
37    pub pruner: PrunerLayer,
38
39    /// Per-pipeline configurations.
40    pub pipeline: PipelineLayer,
41}
42
43// Configuration layers apply overrides over a base configuration. When reading configs from a
44// file, we read them into layer types, and then apply those layers onto an existing configuration
45// (such as the default configuration) to `finish()` them.
46//
47// Treating configs as layers allows us to support configuration merging, where multiple
48// configuration files can be combined into one final configuration. Having a separate type for
49// reading configs also allows us to detect and warn against unrecognised fields.
50
51#[DefaultConfig]
52#[derive(Clone, Default, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct IngestionLayer {
55    pub ingest_concurrency: Option<ConcurrencyConfig>,
56    pub retry_interval_ms: Option<u64>,
57    pub streaming_backoff_initial_batch_size: Option<NonZeroUsize>,
58    pub streaming_backoff_max_batch_size: Option<usize>,
59    pub streaming_connection_timeout_ms: Option<u64>,
60    pub streaming_statement_timeout_ms: Option<u64>,
61
62    /// Deprecated: accepted (and ignored) so old configs don't fail to parse. Replaced by
63    /// per-pipeline `ingestion.subscriber-channel-size`.
64    pub checkpoint_buffer_size: Option<usize>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct SequentialLayer {
71    pub committer: Option<CommitterLayer>,
72    pub ingestion: Option<PipelineIngestionLayer>,
73    pub fanout: Option<ConcurrencyConfig>,
74    pub min_eager_rows: Option<usize>,
75    pub max_pending_rows: Option<usize>,
76    pub max_batch_checkpoints: Option<usize>,
77    pub processor_channel_size: Option<usize>,
78    pub pipeline_depth: Option<usize>,
79}
80
81#[DefaultConfig]
82#[derive(Clone, Default, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ConcurrentLayer {
85    pub committer: Option<CommitterLayer>,
86    pub ingestion: Option<PipelineIngestionLayer>,
87    pub pruner: Option<PrunerLayer>,
88    pub fanout: Option<ConcurrencyConfig>,
89    pub min_eager_rows: Option<usize>,
90    pub max_pending_rows: Option<usize>,
91    pub max_watermark_updates: Option<usize>,
92    pub processor_channel_size: Option<usize>,
93    pub collector_channel_size: Option<usize>,
94    pub committer_channel_size: Option<usize>,
95}
96
97#[DefaultConfig]
98#[derive(Clone, Default, Debug)]
99#[serde(deny_unknown_fields)]
100pub struct PipelineIngestionLayer {
101    pub subscriber_channel_size: Option<usize>,
102}
103
104#[DefaultConfig]
105#[derive(Clone, Default, Debug)]
106#[serde(deny_unknown_fields)]
107pub struct CommitterLayer {
108    pub write_concurrency: Option<usize>,
109    pub collect_interval_ms: Option<u64>,
110    pub watermark_interval_ms: Option<u64>,
111}
112
113#[DefaultConfig]
114#[derive(Clone, Default, Debug)]
115#[serde(deny_unknown_fields)]
116pub struct PrunerLayer {
117    pub interval_ms: Option<u64>,
118    pub delay_ms: Option<u64>,
119    pub retention: Option<u64>,
120    pub max_chunk_size: Option<u64>,
121    pub prune_concurrency: Option<u64>,
122}
123
124#[DefaultConfig]
125#[derive(Clone, Default, Debug)]
126#[serde(rename_all = "snake_case", deny_unknown_fields)]
127pub struct PipelineLayer {
128    // Sequential pipelines
129    pub sum_displays: Option<SequentialLayer>,
130
131    // All concurrent pipelines
132    pub cp_bloom_blocks: Option<ConcurrentLayer>,
133    pub cp_blooms: Option<ConcurrentLayer>,
134    pub cp_digests: Option<ConcurrentLayer>,
135    pub cp_sequence_numbers: Option<ConcurrentLayer>,
136    pub ev_emit_mod: Option<ConcurrentLayer>,
137    pub ev_struct_inst: Option<ConcurrentLayer>,
138    pub kv_checkpoints: Option<ConcurrentLayer>,
139    pub kv_epoch_ends: Option<ConcurrentLayer>,
140    pub kv_epoch_starts: Option<ConcurrentLayer>,
141    pub kv_feature_flags: Option<ConcurrentLayer>,
142    pub kv_objects: Option<ConcurrentLayer>,
143    pub kv_packages: Option<ConcurrentLayer>,
144    pub kv_protocol_configs: Option<ConcurrentLayer>,
145    pub kv_transactions: Option<ConcurrentLayer>,
146    pub obj_versions: Option<ConcurrentLayer>,
147    pub tx_affected_addresses: Option<ConcurrentLayer>,
148    pub tx_affected_objects: Option<ConcurrentLayer>,
149    pub tx_balance_changes: Option<ConcurrentLayer>,
150    pub tx_calls: Option<ConcurrentLayer>,
151    pub tx_digests: Option<ConcurrentLayer>,
152    pub tx_kinds: Option<ConcurrentLayer>,
153}
154
155impl IndexerConfig {
156    /// Generate an example configuration, suitable for demonstrating the fields available to
157    /// configure.
158    pub fn example() -> Self {
159        let mut example: Self = Default::default();
160
161        example.ingestion = IngestionConfig::default().into();
162        example.committer = CommitterConfig::default().into();
163        example.pruner = PrunerConfig::default().into();
164        example.pipeline = PipelineLayer::example();
165
166        example
167    }
168
169    /// Generate a configuration suitable for testing. This is the same as the example
170    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
171    /// less time waiting.
172    pub fn for_test() -> Self {
173        Self::example()
174            .merge(IndexerConfig {
175                ingestion: IngestionLayer {
176                    retry_interval_ms: Some(10),
177                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
178                    ..Default::default()
179                },
180                committer: CommitterLayer {
181                    collect_interval_ms: Some(50),
182                    watermark_interval_ms: Some(50),
183                    write_concurrency: Some(1),
184                },
185                pruner: PrunerLayer {
186                    interval_ms: Some(50),
187                    delay_ms: Some(0),
188                    ..Default::default()
189                },
190                ..Default::default()
191            })
192            .expect("Merge failed for test configuration")
193    }
194}
195
196impl IngestionLayer {
197    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
198        if self.checkpoint_buffer_size.is_some() {
199            warn!(
200                "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
201                 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
202                 section if you need to override the default."
203            );
204        }
205
206        Ok(IngestionConfig {
207            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
208            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
209            streaming_backoff_initial_batch_size: self
210                .streaming_backoff_initial_batch_size
211                .unwrap_or(base.streaming_backoff_initial_batch_size),
212            streaming_backoff_max_batch_size: self
213                .streaming_backoff_max_batch_size
214                .unwrap_or(base.streaming_backoff_max_batch_size),
215            streaming_connection_timeout_ms: self
216                .streaming_connection_timeout_ms
217                .unwrap_or(base.streaming_connection_timeout_ms),
218            streaming_statement_timeout_ms: self
219                .streaming_statement_timeout_ms
220                .unwrap_or(base.streaming_statement_timeout_ms),
221        })
222    }
223}
224
225impl SequentialLayer {
226    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
227        Ok(SequentialConfig {
228            committer: if let Some(committer) = self.committer {
229                committer.finish(base.committer)?
230            } else {
231                base.committer
232            },
233            ingestion: if let Some(ingestion) = self.ingestion {
234                ingestion.finish(base.ingestion)
235            } else {
236                base.ingestion
237            },
238            fanout: self.fanout.or(base.fanout),
239            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
240            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
241            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
242            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
243            pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
244        })
245    }
246}
247
248impl ConcurrentLayer {
249    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
250    /// appear in the layer *and* in the base.
251    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
252        Ok(ConcurrentConfig {
253            committer: if let Some(committer) = self.committer {
254                committer.finish(base.committer)?
255            } else {
256                base.committer
257            },
258            ingestion: if let Some(ingestion) = self.ingestion {
259                ingestion.finish(base.ingestion)
260            } else {
261                base.ingestion
262            },
263            pruner: match (self.pruner, base.pruner) {
264                (None, _) | (_, None) => None,
265                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
266            },
267            fanout: self.fanout.or(base.fanout),
268            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
269            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
270            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
271            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
272            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
273            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
274        })
275    }
276}
277
278impl PipelineIngestionLayer {
279    pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
280        pipeline::IngestionConfig {
281            subscriber_channel_size: self
282                .subscriber_channel_size
283                .or(base.subscriber_channel_size),
284        }
285    }
286}
287
288impl CommitterLayer {
289    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
290        Ok(CommitterConfig {
291            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
292            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
293            watermark_interval_ms: self
294                .watermark_interval_ms
295                .unwrap_or(base.watermark_interval_ms),
296            watermark_interval_jitter_ms: 0,
297        })
298    }
299}
300
301impl PrunerLayer {
302    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
303        Ok(PrunerConfig {
304            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
305            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
306            retention: self.retention.unwrap_or(base.retention),
307            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
308            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
309        })
310    }
311}
312
313impl PipelineLayer {
314    /// Generate an example configuration, suitable for demonstrating the fields available to
315    /// configure.
316    pub fn example() -> Self {
317        PipelineLayer {
318            cp_blooms: Some(Default::default()),
319            cp_bloom_blocks: Some(Default::default()),
320            cp_digests: Some(Default::default()),
321            sum_displays: Some(Default::default()),
322            cp_sequence_numbers: Some(Default::default()),
323            ev_emit_mod: Some(Default::default()),
324            ev_struct_inst: Some(Default::default()),
325            kv_checkpoints: Some(Default::default()),
326            kv_epoch_ends: Some(Default::default()),
327            kv_epoch_starts: Some(Default::default()),
328            kv_feature_flags: Some(Default::default()),
329            kv_objects: Some(Default::default()),
330            kv_packages: Some(Default::default()),
331            kv_protocol_configs: Some(Default::default()),
332            kv_transactions: Some(Default::default()),
333            obj_versions: Some(Default::default()),
334            tx_affected_addresses: Some(Default::default()),
335            tx_affected_objects: Some(Default::default()),
336            tx_balance_changes: Some(Default::default()),
337            tx_calls: Some(Default::default()),
338            tx_digests: Some(Default::default()),
339            tx_kinds: Some(Default::default()),
340        }
341    }
342}
343
344impl Merge for IndexerConfig {
345    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
346        Ok(IndexerConfig {
347            ingestion: self.ingestion.merge(other.ingestion)?,
348            committer: self.committer.merge(other.committer)?,
349            pruner: self.pruner.merge(other.pruner)?,
350            pipeline: self.pipeline.merge(other.pipeline)?,
351        })
352    }
353}
354
355impl Merge for IngestionLayer {
356    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
357        Ok(IngestionLayer {
358            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
359            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
360            streaming_backoff_initial_batch_size: other
361                .streaming_backoff_initial_batch_size
362                .or(self.streaming_backoff_initial_batch_size),
363            streaming_backoff_max_batch_size: other
364                .streaming_backoff_max_batch_size
365                .or(self.streaming_backoff_max_batch_size),
366            streaming_connection_timeout_ms: other
367                .streaming_connection_timeout_ms
368                .or(self.streaming_connection_timeout_ms),
369            streaming_statement_timeout_ms: other
370                .streaming_statement_timeout_ms
371                .or(self.streaming_statement_timeout_ms),
372            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
373        })
374    }
375}
376
377impl Merge for SequentialLayer {
378    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
379        Ok(SequentialLayer {
380            committer: self.committer.merge(other.committer)?,
381            ingestion: self.ingestion.merge(other.ingestion)?,
382            fanout: other.fanout.or(self.fanout),
383            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
384            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
385            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
386            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
387            pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
388        })
389    }
390}
391
392impl Merge for ConcurrentLayer {
393    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
394        Ok(ConcurrentLayer {
395            committer: self.committer.merge(other.committer)?,
396            ingestion: self.ingestion.merge(other.ingestion)?,
397            pruner: self.pruner.merge(other.pruner)?,
398            fanout: other.fanout.or(self.fanout),
399            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
400            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
401            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
402            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
403            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
404            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
405        })
406    }
407}
408
409impl Merge for PipelineIngestionLayer {
410    fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
411        Ok(PipelineIngestionLayer {
412            subscriber_channel_size: other
413                .subscriber_channel_size
414                .or(self.subscriber_channel_size),
415        })
416    }
417}
418
419impl Merge for CommitterLayer {
420    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
421        Ok(CommitterLayer {
422            write_concurrency: other.write_concurrency.or(self.write_concurrency),
423            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
424            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
425        })
426    }
427}
428
429impl Merge for PrunerLayer {
430    /// Last write takes precedence for all fields except the `retention`, which takes the max of
431    /// all available values.
432    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
433        Ok(PrunerLayer {
434            interval_ms: other.interval_ms.or(self.interval_ms),
435            delay_ms: other.delay_ms.or(self.delay_ms),
436            retention: match (other.retention, self.retention) {
437                (Some(a), Some(b)) => Some(a.max(b)),
438                (Some(a), _) | (_, Some(a)) => Some(a),
439                (None, None) => None,
440            },
441            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
442            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
443        })
444    }
445}
446
447impl Merge for PipelineLayer {
448    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
449        Ok(PipelineLayer {
450            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
451            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
452            cp_digests: self.cp_digests.merge(other.cp_digests)?,
453            sum_displays: self.sum_displays.merge(other.sum_displays)?,
454            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
455            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
456            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
457            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
458            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
459            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
460            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
461            kv_objects: self.kv_objects.merge(other.kv_objects)?,
462            kv_packages: self.kv_packages.merge(other.kv_packages)?,
463            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
464            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
465            obj_versions: self.obj_versions.merge(other.obj_versions)?,
466            tx_affected_addresses: self
467                .tx_affected_addresses
468                .merge(other.tx_affected_addresses)?,
469            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
470            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
471            tx_calls: self.tx_calls.merge(other.tx_calls)?,
472            tx_digests: self.tx_digests.merge(other.tx_digests)?,
473            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
474        })
475    }
476}
477
478impl<T: Merge> Merge for Option<T> {
479    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
480        Ok(match (self, other) {
481            (Some(a), Some(b)) => Some(a.merge(b)?),
482            (Some(a), _) | (_, Some(a)) => Some(a),
483            (None, None) => None,
484        })
485    }
486}
487
488impl From<IngestionConfig> for IngestionLayer {
489    fn from(config: IngestionConfig) -> Self {
490        Self {
491            ingest_concurrency: Some(config.ingest_concurrency),
492            retry_interval_ms: Some(config.retry_interval_ms),
493            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
494            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
495            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
496            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
497            checkpoint_buffer_size: None,
498        }
499    }
500}
501
502impl From<SequentialConfig> for SequentialLayer {
503    fn from(config: SequentialConfig) -> Self {
504        Self {
505            committer: Some(config.committer.into()),
506            ingestion: Some(config.ingestion.into()),
507            fanout: config.fanout,
508            min_eager_rows: config.min_eager_rows,
509            max_pending_rows: config.max_pending_rows,
510            max_batch_checkpoints: config.max_batch_checkpoints,
511            processor_channel_size: config.processor_channel_size,
512            pipeline_depth: config.pipeline_depth,
513        }
514    }
515}
516
517impl From<ConcurrentConfig> for ConcurrentLayer {
518    fn from(config: ConcurrentConfig) -> Self {
519        Self {
520            committer: Some(config.committer.into()),
521            ingestion: Some(config.ingestion.into()),
522            pruner: config.pruner.map(Into::into),
523            fanout: config.fanout,
524            min_eager_rows: config.min_eager_rows,
525            max_pending_rows: config.max_pending_rows,
526            max_watermark_updates: config.max_watermark_updates,
527            processor_channel_size: config.processor_channel_size,
528            collector_channel_size: config.collector_channel_size,
529            committer_channel_size: config.committer_channel_size,
530        }
531    }
532}
533
534impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
535    fn from(config: pipeline::IngestionConfig) -> Self {
536        Self {
537            subscriber_channel_size: config.subscriber_channel_size,
538        }
539    }
540}
541
542impl From<CommitterConfig> for CommitterLayer {
543    fn from(config: CommitterConfig) -> Self {
544        Self {
545            write_concurrency: Some(config.write_concurrency),
546            collect_interval_ms: Some(config.collect_interval_ms),
547            watermark_interval_ms: Some(config.watermark_interval_ms),
548        }
549    }
550}
551
552impl From<PrunerConfig> for PrunerLayer {
553    fn from(config: PrunerConfig) -> Self {
554        Self {
555            interval_ms: Some(config.interval_ms),
556            delay_ms: Some(config.delay_ms),
557            retention: Some(config.retention),
558            max_chunk_size: Some(config.max_chunk_size),
559            prune_concurrency: Some(config.prune_concurrency),
560        }
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    macro_rules! assert_matches {
569        ($value:expr, $pattern:pat $(,)?) => {
570            let value = $value;
571            assert!(
572                matches!(value, $pattern),
573                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
574                stringify!($pattern)
575            );
576        };
577    }
578
579    #[test]
580    fn merge_recursive() {
581        let this = PipelineLayer {
582            sum_displays: Some(SequentialLayer {
583                committer: Some(CommitterLayer {
584                    write_concurrency: Some(10),
585                    collect_interval_ms: Some(1000),
586                    watermark_interval_ms: None,
587                }),
588                min_eager_rows: Some(100),
589                ..Default::default()
590            }),
591            ev_emit_mod: Some(ConcurrentLayer {
592                committer: Some(CommitterLayer {
593                    write_concurrency: Some(5),
594                    collect_interval_ms: Some(500),
595                    watermark_interval_ms: None,
596                }),
597                ..Default::default()
598            }),
599            ..Default::default()
600        };
601
602        let that = PipelineLayer {
603            sum_displays: Some(SequentialLayer {
604                committer: Some(CommitterLayer {
605                    write_concurrency: Some(5),
606                    collect_interval_ms: None,
607                    watermark_interval_ms: Some(500),
608                }),
609                min_eager_rows: Some(200),
610                ..Default::default()
611            }),
612            ev_emit_mod: None,
613            ..Default::default()
614        };
615
616        let this_then_that = this.clone().merge(that.clone()).unwrap();
617        let that_then_this = that.clone().merge(this.clone()).unwrap();
618
619        assert_matches!(
620            this_then_that,
621            PipelineLayer {
622                sum_displays: Some(SequentialLayer {
623                    committer: Some(CommitterLayer {
624                        write_concurrency: Some(5),
625                        collect_interval_ms: Some(1000),
626                        watermark_interval_ms: Some(500),
627                        ..
628                    }),
629                    min_eager_rows: Some(200),
630                    ..
631                }),
632                ev_emit_mod: Some(ConcurrentLayer {
633                    committer: Some(CommitterLayer {
634                        write_concurrency: Some(5),
635                        collect_interval_ms: Some(500),
636                        watermark_interval_ms: None,
637                        ..
638                    }),
639                    pruner: None,
640                    ..
641                }),
642                ..
643            },
644        );
645
646        assert_matches!(
647            that_then_this,
648            PipelineLayer {
649                sum_displays: Some(SequentialLayer {
650                    committer: Some(CommitterLayer {
651                        write_concurrency: Some(10),
652                        collect_interval_ms: Some(1000),
653                        watermark_interval_ms: Some(500),
654                        ..
655                    }),
656                    min_eager_rows: Some(100),
657                    ..
658                }),
659                ev_emit_mod: Some(ConcurrentLayer {
660                    committer: Some(CommitterLayer {
661                        write_concurrency: Some(5),
662                        collect_interval_ms: Some(500),
663                        watermark_interval_ms: None,
664                        ..
665                    }),
666                    pruner: None,
667                    ..
668                }),
669                ..
670            },
671        );
672    }
673
674    #[test]
675    fn merge_pruner() {
676        let this = PrunerLayer {
677            interval_ms: None,
678            delay_ms: Some(100),
679            retention: Some(200),
680            max_chunk_size: Some(300),
681            prune_concurrency: Some(1),
682        };
683
684        let that = PrunerLayer {
685            interval_ms: Some(400),
686            delay_ms: None,
687            retention: Some(500),
688            max_chunk_size: Some(600),
689            prune_concurrency: Some(2),
690        };
691
692        let this_then_that = this.clone().merge(that.clone()).unwrap();
693        let that_then_this = that.clone().merge(this.clone()).unwrap();
694
695        assert_matches!(
696            this_then_that,
697            PrunerLayer {
698                interval_ms: Some(400),
699                delay_ms: Some(100),
700                retention: Some(500),
701                max_chunk_size: Some(600),
702                prune_concurrency: Some(2),
703            },
704        );
705
706        assert_matches!(
707            that_then_this,
708            PrunerLayer {
709                interval_ms: Some(400),
710                delay_ms: Some(100),
711                retention: Some(500),
712                max_chunk_size: Some(300),
713                prune_concurrency: Some(1),
714            },
715        );
716    }
717
718    #[test]
719    fn finish_concurrent_unpruned_override() {
720        let layer = ConcurrentLayer {
721            committer: None,
722            pruner: None,
723            ..Default::default()
724        };
725
726        let base = ConcurrentConfig {
727            committer: CommitterConfig {
728                write_concurrency: 5,
729                collect_interval_ms: 50,
730                watermark_interval_ms: 500,
731                ..Default::default()
732            },
733            pruner: Some(PrunerConfig::default()),
734            ..Default::default()
735        };
736
737        assert_matches!(
738            layer.finish(base).unwrap(),
739            ConcurrentConfig {
740                committer: CommitterConfig {
741                    write_concurrency: 5,
742                    collect_interval_ms: 50,
743                    watermark_interval_ms: 500,
744                    ..
745                },
746                pruner: None,
747                ..
748            },
749        );
750    }
751
752    #[test]
753    fn finish_concurrent_no_pruner() {
754        let layer = ConcurrentLayer {
755            committer: None,
756            pruner: None,
757            ..Default::default()
758        };
759
760        let base = ConcurrentConfig {
761            committer: CommitterConfig {
762                write_concurrency: 5,
763                collect_interval_ms: 50,
764                watermark_interval_ms: 500,
765                ..Default::default()
766            },
767            pruner: None,
768            ..Default::default()
769        };
770
771        assert_matches!(
772            layer.finish(base).unwrap(),
773            ConcurrentConfig {
774                committer: CommitterConfig {
775                    write_concurrency: 5,
776                    collect_interval_ms: 50,
777                    watermark_interval_ms: 500,
778                    ..
779                },
780                pruner: None,
781                ..
782            },
783        );
784    }
785
786    #[test]
787    fn finish_concurrent_pruner() {
788        let layer = ConcurrentLayer {
789            committer: None,
790            pruner: Some(PrunerLayer {
791                interval_ms: Some(1000),
792                ..Default::default()
793            }),
794            ..Default::default()
795        };
796
797        let base = ConcurrentConfig {
798            committer: CommitterConfig {
799                write_concurrency: 5,
800                collect_interval_ms: 50,
801                watermark_interval_ms: 500,
802                ..Default::default()
803            },
804            pruner: Some(PrunerConfig {
805                interval_ms: 100,
806                delay_ms: 200,
807                retention: 300,
808                max_chunk_size: 400,
809                prune_concurrency: 1,
810            }),
811            ..Default::default()
812        };
813
814        assert_matches!(
815            layer.finish(base).unwrap(),
816            ConcurrentConfig {
817                committer: CommitterConfig {
818                    write_concurrency: 5,
819                    collect_interval_ms: 50,
820                    watermark_interval_ms: 500,
821                    ..
822                },
823                pruner: Some(PrunerConfig {
824                    interval_ms: 1000,
825                    delay_ms: 200,
826                    retention: 300,
827                    max_chunk_size: 400,
828                    prune_concurrency: 1,
829                }),
830                ..
831            },
832        );
833    }
834
835    #[test]
836    fn detect_unrecognized_fields() {
837        let err = toml::from_str::<IndexerConfig>(
838            r#"
839            i_dont_exist = "foo"
840            "#,
841        )
842        .unwrap_err();
843
844        assert!(
845            err.to_string().contains("i_dont_exist"),
846            "Unexpected error: {err}"
847        );
848    }
849
850    #[test]
851    fn deprecated_checkpoint_buffer_size_parses() {
852        let config: IndexerConfig = toml::from_str(
853            r#"
854            [ingestion]
855            checkpoint-buffer-size = 5000
856            "#,
857        )
858        .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
859
860        assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
861    }
862}