sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline::CommitterConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
10use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
11
12/// Trait for merging configuration structs together.
13pub trait Merge: Sized {
14    fn merge(self, other: Self) -> anyhow::Result<Self>;
15}
16
17#[DefaultConfig]
18#[derive(Clone, Default, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct IndexerConfig {
21    /// How checkpoints are read by the indexer.
22    pub ingestion: IngestionLayer,
23
24    /// Default configuration for committers that is shared by all pipelines. Pipelines can
25    /// override individual settings in their own configuration sections.
26    pub committer: CommitterLayer,
27
28    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
29    /// override individual settings in their own configuration sections. Concurrent pipelines
30    /// still need to specify a pruner configuration (although it can be empty) to indicate that
31    /// they want to enable pruning, but when they do, any missing values will be filled in by this
32    /// config.
33    pub pruner: PrunerLayer,
34
35    /// Per-pipeline configurations.
36    pub pipeline: PipelineLayer,
37}
38
39// Configuration layers apply overrides over a base configuration. When reading configs from a
40// file, we read them into layer types, and then apply those layers onto an existing configuration
41// (such as the default configuration) to `finish()` them.
42//
43// Treating configs as layers allows us to support configuration merging, where multiple
44// configuration files can be combined into one final configuration. Having a separate type for
45// reading configs also allows us to detect and warn against unrecognised fields.
46
47#[DefaultConfig]
48#[derive(Clone, Default, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct IngestionLayer {
51    pub checkpoint_buffer_size: Option<usize>,
52    pub ingest_concurrency: Option<ConcurrencyConfig>,
53    pub retry_interval_ms: Option<u64>,
54    pub streaming_backoff_initial_batch_size: Option<usize>,
55    pub streaming_backoff_max_batch_size: Option<usize>,
56    pub streaming_connection_timeout_ms: Option<u64>,
57    pub streaming_statement_timeout_ms: Option<u64>,
58}
59
60#[DefaultConfig]
61#[derive(Clone, Default, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct SequentialLayer {
64    pub committer: Option<CommitterLayer>,
65    pub checkpoint_lag: Option<u64>,
66    pub fanout: Option<ConcurrencyConfig>,
67    pub min_eager_rows: Option<usize>,
68    pub max_batch_checkpoints: Option<usize>,
69    pub processor_channel_size: Option<usize>,
70}
71
72#[DefaultConfig]
73#[derive(Clone, Default, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ConcurrentLayer {
76    pub committer: Option<CommitterLayer>,
77    pub pruner: Option<PrunerLayer>,
78    pub fanout: Option<ConcurrencyConfig>,
79    pub min_eager_rows: Option<usize>,
80    pub max_pending_rows: Option<usize>,
81    pub max_watermark_updates: Option<usize>,
82    pub processor_channel_size: Option<usize>,
83    pub collector_channel_size: Option<usize>,
84    pub committer_channel_size: Option<usize>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct CommitterLayer {
91    pub write_concurrency: Option<usize>,
92    pub collect_interval_ms: Option<u64>,
93    pub watermark_interval_ms: Option<u64>,
94}
95
96#[DefaultConfig]
97#[derive(Clone, Default, Debug)]
98#[serde(deny_unknown_fields)]
99pub struct PrunerLayer {
100    pub interval_ms: Option<u64>,
101    pub delay_ms: Option<u64>,
102    pub retention: Option<u64>,
103    pub max_chunk_size: Option<u64>,
104    pub prune_concurrency: Option<u64>,
105}
106
107#[DefaultConfig]
108#[derive(Clone, Default, Debug)]
109#[serde(rename_all = "snake_case", deny_unknown_fields)]
110pub struct PipelineLayer {
111    // Sequential pipelines
112    pub sum_displays: Option<SequentialLayer>,
113
114    // All concurrent pipelines
115    pub cp_bloom_blocks: Option<ConcurrentLayer>,
116    pub cp_blooms: Option<ConcurrentLayer>,
117    pub coin_balance_buckets: Option<ConcurrentLayer>,
118    pub obj_info: Option<ConcurrentLayer>,
119    pub cp_sequence_numbers: Option<ConcurrentLayer>,
120    pub ev_emit_mod: Option<ConcurrentLayer>,
121    pub ev_struct_inst: Option<ConcurrentLayer>,
122    pub kv_checkpoints: Option<ConcurrentLayer>,
123    pub kv_epoch_ends: Option<ConcurrentLayer>,
124    pub kv_epoch_starts: Option<ConcurrentLayer>,
125    pub kv_feature_flags: Option<ConcurrentLayer>,
126    pub kv_objects: Option<ConcurrentLayer>,
127    pub kv_packages: Option<ConcurrentLayer>,
128    pub kv_protocol_configs: Option<ConcurrentLayer>,
129    pub kv_transactions: Option<ConcurrentLayer>,
130    pub obj_versions: Option<ConcurrentLayer>,
131    pub tx_affected_addresses: Option<ConcurrentLayer>,
132    pub tx_affected_objects: Option<ConcurrentLayer>,
133    pub tx_balance_changes: Option<ConcurrentLayer>,
134    pub tx_calls: Option<ConcurrentLayer>,
135    pub tx_digests: Option<ConcurrentLayer>,
136    pub tx_kinds: Option<ConcurrentLayer>,
137}
138
139impl IndexerConfig {
140    /// Generate an example configuration, suitable for demonstrating the fields available to
141    /// configure.
142    pub fn example() -> Self {
143        let mut example: Self = Default::default();
144
145        example.ingestion = IngestionConfig::default().into();
146        example.committer = CommitterConfig::default().into();
147        example.pruner = PrunerConfig::default().into();
148        example.pipeline = PipelineLayer::example();
149
150        example
151    }
152
153    /// Generate a configuration suitable for testing. This is the same as the example
154    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
155    /// less time waiting.
156    pub fn for_test() -> Self {
157        Self::example()
158            .merge(IndexerConfig {
159                ingestion: IngestionLayer {
160                    retry_interval_ms: Some(10),
161                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
162                    ..Default::default()
163                },
164                committer: CommitterLayer {
165                    collect_interval_ms: Some(50),
166                    watermark_interval_ms: Some(50),
167                    write_concurrency: Some(1),
168                },
169                pruner: PrunerLayer {
170                    interval_ms: Some(50),
171                    delay_ms: Some(0),
172                    ..Default::default()
173                },
174                ..Default::default()
175            })
176            .expect("Merge failed for test configuration")
177    }
178}
179
180impl IngestionLayer {
181    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
182        Ok(IngestionConfig {
183            checkpoint_buffer_size: self
184                .checkpoint_buffer_size
185                .unwrap_or(base.checkpoint_buffer_size),
186            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
187            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
188            streaming_backoff_initial_batch_size: self
189                .streaming_backoff_initial_batch_size
190                .unwrap_or(base.streaming_backoff_initial_batch_size),
191            streaming_backoff_max_batch_size: self
192                .streaming_backoff_max_batch_size
193                .unwrap_or(base.streaming_backoff_max_batch_size),
194            streaming_connection_timeout_ms: self
195                .streaming_connection_timeout_ms
196                .unwrap_or(base.streaming_connection_timeout_ms),
197            streaming_statement_timeout_ms: self
198                .streaming_statement_timeout_ms
199                .unwrap_or(base.streaming_statement_timeout_ms),
200        })
201    }
202}
203
204impl SequentialLayer {
205    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
206        Ok(SequentialConfig {
207            committer: if let Some(committer) = self.committer {
208                committer.finish(base.committer)?
209            } else {
210                base.committer
211            },
212            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
213            fanout: self.fanout.or(base.fanout),
214            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
215            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
216            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
217        })
218    }
219}
220
221impl ConcurrentLayer {
222    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
223    /// appear in the layer *and* in the base.
224    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
225        Ok(ConcurrentConfig {
226            committer: if let Some(committer) = self.committer {
227                committer.finish(base.committer)?
228            } else {
229                base.committer
230            },
231            pruner: match (self.pruner, base.pruner) {
232                (None, _) | (_, None) => None,
233                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
234            },
235            fanout: self.fanout.or(base.fanout),
236            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
237            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
238            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
239            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
240            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
241            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
242        })
243    }
244}
245
246impl CommitterLayer {
247    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
248        Ok(CommitterConfig {
249            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
250            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
251            watermark_interval_ms: self
252                .watermark_interval_ms
253                .unwrap_or(base.watermark_interval_ms),
254            watermark_interval_jitter_ms: 0,
255        })
256    }
257}
258
259impl PrunerLayer {
260    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
261        Ok(PrunerConfig {
262            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
263            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
264            retention: self.retention.unwrap_or(base.retention),
265            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
266            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
267        })
268    }
269}
270
271impl PipelineLayer {
272    /// Generate an example configuration, suitable for demonstrating the fields available to
273    /// configure.
274    pub fn example() -> Self {
275        PipelineLayer {
276            cp_blooms: Some(Default::default()),
277            cp_bloom_blocks: Some(Default::default()),
278            coin_balance_buckets: Some(Default::default()),
279            obj_info: Some(Default::default()),
280            sum_displays: Some(Default::default()),
281            cp_sequence_numbers: Some(Default::default()),
282            ev_emit_mod: Some(Default::default()),
283            ev_struct_inst: Some(Default::default()),
284            kv_checkpoints: Some(Default::default()),
285            kv_epoch_ends: Some(Default::default()),
286            kv_epoch_starts: Some(Default::default()),
287            kv_feature_flags: Some(Default::default()),
288            kv_objects: Some(Default::default()),
289            kv_packages: Some(Default::default()),
290            kv_protocol_configs: Some(Default::default()),
291            kv_transactions: Some(Default::default()),
292            obj_versions: Some(Default::default()),
293            tx_affected_addresses: Some(Default::default()),
294            tx_affected_objects: Some(Default::default()),
295            tx_balance_changes: Some(Default::default()),
296            tx_calls: Some(Default::default()),
297            tx_digests: Some(Default::default()),
298            tx_kinds: Some(Default::default()),
299        }
300    }
301}
302
303impl Merge for IndexerConfig {
304    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
305        Ok(IndexerConfig {
306            ingestion: self.ingestion.merge(other.ingestion)?,
307            committer: self.committer.merge(other.committer)?,
308            pruner: self.pruner.merge(other.pruner)?,
309            pipeline: self.pipeline.merge(other.pipeline)?,
310        })
311    }
312}
313
314impl Merge for IngestionLayer {
315    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
316        Ok(IngestionLayer {
317            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
318            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
319            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
320            streaming_backoff_initial_batch_size: other
321                .streaming_backoff_initial_batch_size
322                .or(self.streaming_backoff_initial_batch_size),
323            streaming_backoff_max_batch_size: other
324                .streaming_backoff_max_batch_size
325                .or(self.streaming_backoff_max_batch_size),
326            streaming_connection_timeout_ms: other
327                .streaming_connection_timeout_ms
328                .or(self.streaming_connection_timeout_ms),
329            streaming_statement_timeout_ms: other
330                .streaming_statement_timeout_ms
331                .or(self.streaming_statement_timeout_ms),
332        })
333    }
334}
335
336impl Merge for SequentialLayer {
337    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
338        Ok(SequentialLayer {
339            committer: self.committer.merge(other.committer)?,
340            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
341            fanout: other.fanout.or(self.fanout),
342            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
343            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
344            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
345        })
346    }
347}
348
349impl Merge for ConcurrentLayer {
350    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
351        Ok(ConcurrentLayer {
352            committer: self.committer.merge(other.committer)?,
353            pruner: self.pruner.merge(other.pruner)?,
354            fanout: other.fanout.or(self.fanout),
355            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
356            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
357            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
358            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
359            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
360            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
361        })
362    }
363}
364
365impl Merge for CommitterLayer {
366    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
367        Ok(CommitterLayer {
368            write_concurrency: other.write_concurrency.or(self.write_concurrency),
369            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
370            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
371        })
372    }
373}
374
375impl Merge for PrunerLayer {
376    /// Last write takes precedence for all fields except the `retention`, which takes the max of
377    /// all available values.
378    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
379        Ok(PrunerLayer {
380            interval_ms: other.interval_ms.or(self.interval_ms),
381            delay_ms: other.delay_ms.or(self.delay_ms),
382            retention: match (other.retention, self.retention) {
383                (Some(a), Some(b)) => Some(a.max(b)),
384                (Some(a), _) | (_, Some(a)) => Some(a),
385                (None, None) => None,
386            },
387            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
388            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
389        })
390    }
391}
392
393impl Merge for PipelineLayer {
394    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
395        Ok(PipelineLayer {
396            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
397            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
398            coin_balance_buckets: self
399                .coin_balance_buckets
400                .merge(other.coin_balance_buckets)?,
401            obj_info: self.obj_info.merge(other.obj_info)?,
402            sum_displays: self.sum_displays.merge(other.sum_displays)?,
403            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
404            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
405            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
406            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
407            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
408            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
409            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
410            kv_objects: self.kv_objects.merge(other.kv_objects)?,
411            kv_packages: self.kv_packages.merge(other.kv_packages)?,
412            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
413            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
414            obj_versions: self.obj_versions.merge(other.obj_versions)?,
415            tx_affected_addresses: self
416                .tx_affected_addresses
417                .merge(other.tx_affected_addresses)?,
418            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
419            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
420            tx_calls: self.tx_calls.merge(other.tx_calls)?,
421            tx_digests: self.tx_digests.merge(other.tx_digests)?,
422            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
423        })
424    }
425}
426
427impl<T: Merge> Merge for Option<T> {
428    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
429        Ok(match (self, other) {
430            (Some(a), Some(b)) => Some(a.merge(b)?),
431            (Some(a), _) | (_, Some(a)) => Some(a),
432            (None, None) => None,
433        })
434    }
435}
436
437impl From<IngestionConfig> for IngestionLayer {
438    fn from(config: IngestionConfig) -> Self {
439        Self {
440            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
441            ingest_concurrency: Some(config.ingest_concurrency),
442            retry_interval_ms: Some(config.retry_interval_ms),
443            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
444            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
445            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
446            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
447        }
448    }
449}
450
451impl From<SequentialConfig> for SequentialLayer {
452    fn from(config: SequentialConfig) -> Self {
453        Self {
454            committer: Some(config.committer.into()),
455            checkpoint_lag: Some(config.checkpoint_lag),
456            fanout: config.fanout,
457            min_eager_rows: config.min_eager_rows,
458            max_batch_checkpoints: config.max_batch_checkpoints,
459            processor_channel_size: config.processor_channel_size,
460        }
461    }
462}
463
464impl From<ConcurrentConfig> for ConcurrentLayer {
465    fn from(config: ConcurrentConfig) -> Self {
466        Self {
467            committer: Some(config.committer.into()),
468            pruner: config.pruner.map(Into::into),
469            fanout: config.fanout,
470            min_eager_rows: config.min_eager_rows,
471            max_pending_rows: config.max_pending_rows,
472            max_watermark_updates: config.max_watermark_updates,
473            processor_channel_size: config.processor_channel_size,
474            collector_channel_size: config.collector_channel_size,
475            committer_channel_size: config.committer_channel_size,
476        }
477    }
478}
479
480impl From<CommitterConfig> for CommitterLayer {
481    fn from(config: CommitterConfig) -> Self {
482        Self {
483            write_concurrency: Some(config.write_concurrency),
484            collect_interval_ms: Some(config.collect_interval_ms),
485            watermark_interval_ms: Some(config.watermark_interval_ms),
486        }
487    }
488}
489
490impl From<PrunerConfig> for PrunerLayer {
491    fn from(config: PrunerConfig) -> Self {
492        Self {
493            interval_ms: Some(config.interval_ms),
494            delay_ms: Some(config.delay_ms),
495            retention: Some(config.retention),
496            max_chunk_size: Some(config.max_chunk_size),
497            prune_concurrency: Some(config.prune_concurrency),
498        }
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    macro_rules! assert_matches {
507        ($value:expr, $pattern:pat $(,)?) => {
508            let value = $value;
509            assert!(
510                matches!(value, $pattern),
511                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
512                stringify!($pattern)
513            );
514        };
515    }
516
517    #[test]
518    fn merge_recursive() {
519        let this = PipelineLayer {
520            sum_displays: Some(SequentialLayer {
521                committer: Some(CommitterLayer {
522                    write_concurrency: Some(10),
523                    collect_interval_ms: Some(1000),
524                    watermark_interval_ms: None,
525                }),
526                checkpoint_lag: Some(100),
527                ..Default::default()
528            }),
529            ev_emit_mod: Some(ConcurrentLayer {
530                committer: Some(CommitterLayer {
531                    write_concurrency: Some(5),
532                    collect_interval_ms: Some(500),
533                    watermark_interval_ms: None,
534                }),
535                ..Default::default()
536            }),
537            ..Default::default()
538        };
539
540        let that = PipelineLayer {
541            sum_displays: Some(SequentialLayer {
542                committer: Some(CommitterLayer {
543                    write_concurrency: Some(5),
544                    collect_interval_ms: None,
545                    watermark_interval_ms: Some(500),
546                }),
547                checkpoint_lag: Some(200),
548                ..Default::default()
549            }),
550            ev_emit_mod: None,
551            ..Default::default()
552        };
553
554        let this_then_that = this.clone().merge(that.clone()).unwrap();
555        let that_then_this = that.clone().merge(this.clone()).unwrap();
556
557        assert_matches!(
558            this_then_that,
559            PipelineLayer {
560                sum_displays: Some(SequentialLayer {
561                    committer: Some(CommitterLayer {
562                        write_concurrency: Some(5),
563                        collect_interval_ms: Some(1000),
564                        watermark_interval_ms: Some(500),
565                        ..
566                    }),
567                    checkpoint_lag: Some(200),
568                    ..
569                }),
570                ev_emit_mod: Some(ConcurrentLayer {
571                    committer: Some(CommitterLayer {
572                        write_concurrency: Some(5),
573                        collect_interval_ms: Some(500),
574                        watermark_interval_ms: None,
575                        ..
576                    }),
577                    pruner: None,
578                    ..
579                }),
580                ..
581            },
582        );
583
584        assert_matches!(
585            that_then_this,
586            PipelineLayer {
587                sum_displays: Some(SequentialLayer {
588                    committer: Some(CommitterLayer {
589                        write_concurrency: Some(10),
590                        collect_interval_ms: Some(1000),
591                        watermark_interval_ms: Some(500),
592                        ..
593                    }),
594                    checkpoint_lag: Some(100),
595                    ..
596                }),
597                ev_emit_mod: Some(ConcurrentLayer {
598                    committer: Some(CommitterLayer {
599                        write_concurrency: Some(5),
600                        collect_interval_ms: Some(500),
601                        watermark_interval_ms: None,
602                        ..
603                    }),
604                    pruner: None,
605                    ..
606                }),
607                ..
608            },
609        );
610    }
611
612    #[test]
613    fn merge_pruner() {
614        let this = PrunerLayer {
615            interval_ms: None,
616            delay_ms: Some(100),
617            retention: Some(200),
618            max_chunk_size: Some(300),
619            prune_concurrency: Some(1),
620        };
621
622        let that = PrunerLayer {
623            interval_ms: Some(400),
624            delay_ms: None,
625            retention: Some(500),
626            max_chunk_size: Some(600),
627            prune_concurrency: Some(2),
628        };
629
630        let this_then_that = this.clone().merge(that.clone()).unwrap();
631        let that_then_this = that.clone().merge(this.clone()).unwrap();
632
633        assert_matches!(
634            this_then_that,
635            PrunerLayer {
636                interval_ms: Some(400),
637                delay_ms: Some(100),
638                retention: Some(500),
639                max_chunk_size: Some(600),
640                prune_concurrency: Some(2),
641            },
642        );
643
644        assert_matches!(
645            that_then_this,
646            PrunerLayer {
647                interval_ms: Some(400),
648                delay_ms: Some(100),
649                retention: Some(500),
650                max_chunk_size: Some(300),
651                prune_concurrency: Some(1),
652            },
653        );
654    }
655
656    #[test]
657    fn finish_concurrent_unpruned_override() {
658        let layer = ConcurrentLayer {
659            committer: None,
660            pruner: None,
661            ..Default::default()
662        };
663
664        let base = ConcurrentConfig {
665            committer: CommitterConfig {
666                write_concurrency: 5,
667                collect_interval_ms: 50,
668                watermark_interval_ms: 500,
669                ..Default::default()
670            },
671            pruner: Some(PrunerConfig::default()),
672            ..Default::default()
673        };
674
675        assert_matches!(
676            layer.finish(base).unwrap(),
677            ConcurrentConfig {
678                committer: CommitterConfig {
679                    write_concurrency: 5,
680                    collect_interval_ms: 50,
681                    watermark_interval_ms: 500,
682                    ..
683                },
684                pruner: None,
685                ..
686            },
687        );
688    }
689
690    #[test]
691    fn finish_concurrent_no_pruner() {
692        let layer = ConcurrentLayer {
693            committer: None,
694            pruner: None,
695            ..Default::default()
696        };
697
698        let base = ConcurrentConfig {
699            committer: CommitterConfig {
700                write_concurrency: 5,
701                collect_interval_ms: 50,
702                watermark_interval_ms: 500,
703                ..Default::default()
704            },
705            pruner: None,
706            ..Default::default()
707        };
708
709        assert_matches!(
710            layer.finish(base).unwrap(),
711            ConcurrentConfig {
712                committer: CommitterConfig {
713                    write_concurrency: 5,
714                    collect_interval_ms: 50,
715                    watermark_interval_ms: 500,
716                    ..
717                },
718                pruner: None,
719                ..
720            },
721        );
722    }
723
724    #[test]
725    fn finish_concurrent_pruner() {
726        let layer = ConcurrentLayer {
727            committer: None,
728            pruner: Some(PrunerLayer {
729                interval_ms: Some(1000),
730                ..Default::default()
731            }),
732            ..Default::default()
733        };
734
735        let base = ConcurrentConfig {
736            committer: CommitterConfig {
737                write_concurrency: 5,
738                collect_interval_ms: 50,
739                watermark_interval_ms: 500,
740                ..Default::default()
741            },
742            pruner: Some(PrunerConfig {
743                interval_ms: 100,
744                delay_ms: 200,
745                retention: 300,
746                max_chunk_size: 400,
747                prune_concurrency: 1,
748            }),
749            ..Default::default()
750        };
751
752        assert_matches!(
753            layer.finish(base).unwrap(),
754            ConcurrentConfig {
755                committer: CommitterConfig {
756                    write_concurrency: 5,
757                    collect_interval_ms: 50,
758                    watermark_interval_ms: 500,
759                    ..
760                },
761                pruner: Some(PrunerConfig {
762                    interval_ms: 1000,
763                    delay_ms: 200,
764                    retention: 300,
765                    max_chunk_size: 400,
766                    prune_concurrency: 1,
767                }),
768                ..
769            },
770        );
771    }
772
773    #[test]
774    fn detect_unrecognized_fields() {
775        let err = toml::from_str::<IndexerConfig>(
776            r#"
777            i_dont_exist = "foo"
778            "#,
779        )
780        .unwrap_err();
781
782        assert!(
783            err.to_string().contains("i_dont_exist"),
784            "Unexpected error: {err}"
785        );
786    }
787}