sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11/// Trait for merging configuration structs together.
12pub trait Merge: Sized {
13    fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20    /// How checkpoints are read by the indexer.
21    pub ingestion: IngestionLayer,
22
23    /// Default configuration for committers that is shared by all pipelines. Pipelines can
24    /// override individual settings in their own configuration sections.
25    pub committer: CommitterLayer,
26
27    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
28    /// override individual settings in their own configuration sections. Concurrent pipelines
29    /// still need to specify a pruner configuration (although it can be empty) to indicate that
30    /// they want to enable pruning, but when they do, any missing values will be filled in by this
31    /// config.
32    pub pruner: PrunerLayer,
33
34    /// Per-pipeline configurations.
35    pub pipeline: PipelineLayer,
36}
37
38// Configuration layers apply overrides over a base configuration. When reading configs from a
39// file, we read them into layer types, and then apply those layers onto an existing configuration
40// (such as the default configuration) to `finish()` them.
41//
42// Treating configs as layers allows us to support configuration merging, where multiple
43// configuration files can be combined into one final configuration. Having a separate type for
44// reading configs also allows us to detect and warn against unrecognised fields.
45
46#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50    pub checkpoint_buffer_size: Option<usize>,
51    pub ingest_concurrency: Option<usize>,
52    pub retry_interval_ms: Option<u64>,
53    pub streaming_backoff_initial_batch_size: Option<usize>,
54    pub streaming_backoff_max_batch_size: Option<usize>,
55    pub streaming_connection_timeout_ms: Option<u64>,
56    pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63    pub committer: Option<CommitterLayer>,
64    pub checkpoint_lag: Option<u64>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct ConcurrentLayer {
71    pub committer: Option<CommitterLayer>,
72    pub pruner: Option<PrunerLayer>,
73}
74
75#[DefaultConfig]
76#[derive(Clone, Default, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct CommitterLayer {
79    pub write_concurrency: Option<usize>,
80    pub collect_interval_ms: Option<u64>,
81    pub watermark_interval_ms: Option<u64>,
82}
83
84#[DefaultConfig]
85#[derive(Clone, Default, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct PrunerLayer {
88    pub interval_ms: Option<u64>,
89    pub delay_ms: Option<u64>,
90    pub retention: Option<u64>,
91    pub max_chunk_size: Option<u64>,
92    pub prune_concurrency: Option<u64>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(rename_all = "snake_case", deny_unknown_fields)]
98pub struct PipelineLayer {
99    // Sequential pipelines
100    pub sum_displays: Option<SequentialLayer>,
101
102    // All concurrent pipelines
103    pub cp_bloom_blocks: Option<ConcurrentLayer>,
104    pub cp_blooms: Option<ConcurrentLayer>,
105    pub coin_balance_buckets: Option<ConcurrentLayer>,
106    pub obj_info: Option<ConcurrentLayer>,
107    pub cp_sequence_numbers: Option<ConcurrentLayer>,
108    pub ev_emit_mod: Option<ConcurrentLayer>,
109    pub ev_struct_inst: Option<ConcurrentLayer>,
110    pub kv_checkpoints: Option<ConcurrentLayer>,
111    pub kv_epoch_ends: Option<ConcurrentLayer>,
112    pub kv_epoch_starts: Option<ConcurrentLayer>,
113    pub kv_feature_flags: Option<ConcurrentLayer>,
114    pub kv_objects: Option<ConcurrentLayer>,
115    pub kv_packages: Option<ConcurrentLayer>,
116    pub kv_protocol_configs: Option<ConcurrentLayer>,
117    pub kv_transactions: Option<ConcurrentLayer>,
118    pub obj_versions: Option<ConcurrentLayer>,
119    pub tx_affected_addresses: Option<ConcurrentLayer>,
120    pub tx_affected_objects: Option<ConcurrentLayer>,
121    pub tx_balance_changes: Option<ConcurrentLayer>,
122    pub tx_calls: Option<ConcurrentLayer>,
123    pub tx_digests: Option<ConcurrentLayer>,
124    pub tx_kinds: Option<ConcurrentLayer>,
125}
126
127impl IndexerConfig {
128    /// Generate an example configuration, suitable for demonstrating the fields available to
129    /// configure.
130    pub fn example() -> Self {
131        let mut example: Self = Default::default();
132
133        example.ingestion = IngestionConfig::default().into();
134        example.committer = CommitterConfig::default().into();
135        example.pruner = PrunerConfig::default().into();
136        example.pipeline = PipelineLayer::example();
137
138        example
139    }
140
141    /// Generate a configuration suitable for testing. This is the same as the example
142    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
143    /// less time waiting.
144    pub fn for_test() -> Self {
145        Self::example()
146            .merge(IndexerConfig {
147                ingestion: IngestionLayer {
148                    retry_interval_ms: Some(10),
149                    ingest_concurrency: Some(1),
150                    ..Default::default()
151                },
152                committer: CommitterLayer {
153                    collect_interval_ms: Some(50),
154                    watermark_interval_ms: Some(50),
155                    write_concurrency: Some(1),
156                },
157                pruner: PrunerLayer {
158                    interval_ms: Some(50),
159                    delay_ms: Some(0),
160                    ..Default::default()
161                },
162                ..Default::default()
163            })
164            .expect("Merge failed for test configuration")
165    }
166}
167
168impl IngestionLayer {
169    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
170        Ok(IngestionConfig {
171            checkpoint_buffer_size: self
172                .checkpoint_buffer_size
173                .unwrap_or(base.checkpoint_buffer_size),
174            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
175            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
176            streaming_backoff_initial_batch_size: self
177                .streaming_backoff_initial_batch_size
178                .unwrap_or(base.streaming_backoff_initial_batch_size),
179            streaming_backoff_max_batch_size: self
180                .streaming_backoff_max_batch_size
181                .unwrap_or(base.streaming_backoff_max_batch_size),
182            streaming_connection_timeout_ms: self
183                .streaming_connection_timeout_ms
184                .unwrap_or(base.streaming_connection_timeout_ms),
185            streaming_statement_timeout_ms: self
186                .streaming_statement_timeout_ms
187                .unwrap_or(base.streaming_statement_timeout_ms),
188        })
189    }
190}
191
192impl SequentialLayer {
193    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
194        Ok(SequentialConfig {
195            committer: if let Some(committer) = self.committer {
196                committer.finish(base.committer)?
197            } else {
198                base.committer
199            },
200            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
201        })
202    }
203}
204
205impl ConcurrentLayer {
206    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
207    /// appear in the layer *and* in the base.
208    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
209        Ok(ConcurrentConfig {
210            committer: if let Some(committer) = self.committer {
211                committer.finish(base.committer)?
212            } else {
213                base.committer
214            },
215            pruner: match (self.pruner, base.pruner) {
216                (None, _) | (_, None) => None,
217                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
218            },
219        })
220    }
221}
222
223impl CommitterLayer {
224    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
225        Ok(CommitterConfig {
226            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
227            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
228            watermark_interval_ms: self
229                .watermark_interval_ms
230                .unwrap_or(base.watermark_interval_ms),
231            watermark_interval_jitter_ms: 0,
232        })
233    }
234}
235
236impl PrunerLayer {
237    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
238        Ok(PrunerConfig {
239            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
240            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
241            retention: self.retention.unwrap_or(base.retention),
242            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
243            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
244        })
245    }
246}
247
248impl PipelineLayer {
249    /// Generate an example configuration, suitable for demonstrating the fields available to
250    /// configure.
251    pub fn example() -> Self {
252        PipelineLayer {
253            cp_blooms: Some(Default::default()),
254            cp_bloom_blocks: Some(Default::default()),
255            coin_balance_buckets: Some(Default::default()),
256            obj_info: Some(Default::default()),
257            sum_displays: Some(Default::default()),
258            cp_sequence_numbers: Some(Default::default()),
259            ev_emit_mod: Some(Default::default()),
260            ev_struct_inst: Some(Default::default()),
261            kv_checkpoints: Some(Default::default()),
262            kv_epoch_ends: Some(Default::default()),
263            kv_epoch_starts: Some(Default::default()),
264            kv_feature_flags: Some(Default::default()),
265            kv_objects: Some(Default::default()),
266            kv_packages: Some(Default::default()),
267            kv_protocol_configs: Some(Default::default()),
268            kv_transactions: Some(Default::default()),
269            obj_versions: Some(Default::default()),
270            tx_affected_addresses: Some(Default::default()),
271            tx_affected_objects: Some(Default::default()),
272            tx_balance_changes: Some(Default::default()),
273            tx_calls: Some(Default::default()),
274            tx_digests: Some(Default::default()),
275            tx_kinds: Some(Default::default()),
276        }
277    }
278}
279
280impl Merge for IndexerConfig {
281    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
282        Ok(IndexerConfig {
283            ingestion: self.ingestion.merge(other.ingestion)?,
284            committer: self.committer.merge(other.committer)?,
285            pruner: self.pruner.merge(other.pruner)?,
286            pipeline: self.pipeline.merge(other.pipeline)?,
287        })
288    }
289}
290
291impl Merge for IngestionLayer {
292    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
293        Ok(IngestionLayer {
294            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
295            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
296            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
297            streaming_backoff_initial_batch_size: other
298                .streaming_backoff_initial_batch_size
299                .or(self.streaming_backoff_initial_batch_size),
300            streaming_backoff_max_batch_size: other
301                .streaming_backoff_max_batch_size
302                .or(self.streaming_backoff_max_batch_size),
303            streaming_connection_timeout_ms: other
304                .streaming_connection_timeout_ms
305                .or(self.streaming_connection_timeout_ms),
306            streaming_statement_timeout_ms: other
307                .streaming_statement_timeout_ms
308                .or(self.streaming_statement_timeout_ms),
309        })
310    }
311}
312
313impl Merge for SequentialLayer {
314    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
315        Ok(SequentialLayer {
316            committer: self.committer.merge(other.committer)?,
317            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
318        })
319    }
320}
321
322impl Merge for ConcurrentLayer {
323    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
324        Ok(ConcurrentLayer {
325            committer: self.committer.merge(other.committer)?,
326            pruner: self.pruner.merge(other.pruner)?,
327        })
328    }
329}
330
331impl Merge for CommitterLayer {
332    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
333        Ok(CommitterLayer {
334            write_concurrency: other.write_concurrency.or(self.write_concurrency),
335            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
336            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
337        })
338    }
339}
340
341impl Merge for PrunerLayer {
342    /// Last write takes precedence for all fields except the `retention`, which takes the max of
343    /// all available values.
344    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
345        Ok(PrunerLayer {
346            interval_ms: other.interval_ms.or(self.interval_ms),
347            delay_ms: other.delay_ms.or(self.delay_ms),
348            retention: match (other.retention, self.retention) {
349                (Some(a), Some(b)) => Some(a.max(b)),
350                (Some(a), _) | (_, Some(a)) => Some(a),
351                (None, None) => None,
352            },
353            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
354            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
355        })
356    }
357}
358
359impl Merge for PipelineLayer {
360    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
361        Ok(PipelineLayer {
362            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
363            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
364            coin_balance_buckets: self
365                .coin_balance_buckets
366                .merge(other.coin_balance_buckets)?,
367            obj_info: self.obj_info.merge(other.obj_info)?,
368            sum_displays: self.sum_displays.merge(other.sum_displays)?,
369            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
370            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
371            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
372            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
373            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
374            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
375            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
376            kv_objects: self.kv_objects.merge(other.kv_objects)?,
377            kv_packages: self.kv_packages.merge(other.kv_packages)?,
378            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
379            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
380            obj_versions: self.obj_versions.merge(other.obj_versions)?,
381            tx_affected_addresses: self
382                .tx_affected_addresses
383                .merge(other.tx_affected_addresses)?,
384            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
385            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
386            tx_calls: self.tx_calls.merge(other.tx_calls)?,
387            tx_digests: self.tx_digests.merge(other.tx_digests)?,
388            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
389        })
390    }
391}
392
393impl<T: Merge> Merge for Option<T> {
394    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
395        Ok(match (self, other) {
396            (Some(a), Some(b)) => Some(a.merge(b)?),
397            (Some(a), _) | (_, Some(a)) => Some(a),
398            (None, None) => None,
399        })
400    }
401}
402
403impl From<IngestionConfig> for IngestionLayer {
404    fn from(config: IngestionConfig) -> Self {
405        Self {
406            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
407            ingest_concurrency: Some(config.ingest_concurrency),
408            retry_interval_ms: Some(config.retry_interval_ms),
409            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
410            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
411            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
412            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
413        }
414    }
415}
416
417impl From<SequentialConfig> for SequentialLayer {
418    fn from(config: SequentialConfig) -> Self {
419        Self {
420            committer: Some(config.committer.into()),
421            checkpoint_lag: Some(config.checkpoint_lag),
422        }
423    }
424}
425
426impl From<ConcurrentConfig> for ConcurrentLayer {
427    fn from(config: ConcurrentConfig) -> Self {
428        Self {
429            committer: Some(config.committer.into()),
430            pruner: config.pruner.map(Into::into),
431        }
432    }
433}
434
435impl From<CommitterConfig> for CommitterLayer {
436    fn from(config: CommitterConfig) -> Self {
437        Self {
438            write_concurrency: Some(config.write_concurrency),
439            collect_interval_ms: Some(config.collect_interval_ms),
440            watermark_interval_ms: Some(config.watermark_interval_ms),
441        }
442    }
443}
444
445impl From<PrunerConfig> for PrunerLayer {
446    fn from(config: PrunerConfig) -> Self {
447        Self {
448            interval_ms: Some(config.interval_ms),
449            delay_ms: Some(config.delay_ms),
450            retention: Some(config.retention),
451            max_chunk_size: Some(config.max_chunk_size),
452            prune_concurrency: Some(config.prune_concurrency),
453        }
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    macro_rules! assert_matches {
462        ($value:expr, $pattern:pat $(,)?) => {
463            let value = $value;
464            assert!(
465                matches!(value, $pattern),
466                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
467                stringify!($pattern)
468            );
469        };
470    }
471
472    #[test]
473    fn merge_recursive() {
474        let this = PipelineLayer {
475            sum_displays: Some(SequentialLayer {
476                committer: Some(CommitterLayer {
477                    write_concurrency: Some(10),
478                    collect_interval_ms: Some(1000),
479                    watermark_interval_ms: None,
480                }),
481                checkpoint_lag: Some(100),
482            }),
483            ev_emit_mod: Some(ConcurrentLayer {
484                committer: Some(CommitterLayer {
485                    write_concurrency: Some(5),
486                    collect_interval_ms: Some(500),
487                    watermark_interval_ms: None,
488                }),
489                ..Default::default()
490            }),
491            ..Default::default()
492        };
493
494        let that = PipelineLayer {
495            sum_displays: Some(SequentialLayer {
496                committer: Some(CommitterLayer {
497                    write_concurrency: Some(5),
498                    collect_interval_ms: None,
499                    watermark_interval_ms: Some(500),
500                }),
501                checkpoint_lag: Some(200),
502            }),
503            ev_emit_mod: None,
504            ..Default::default()
505        };
506
507        let this_then_that = this.clone().merge(that.clone()).unwrap();
508        let that_then_this = that.clone().merge(this.clone()).unwrap();
509
510        assert_matches!(
511            this_then_that,
512            PipelineLayer {
513                sum_displays: Some(SequentialLayer {
514                    committer: Some(CommitterLayer {
515                        write_concurrency: Some(5),
516                        collect_interval_ms: Some(1000),
517                        watermark_interval_ms: Some(500),
518                    }),
519                    checkpoint_lag: Some(200),
520                }),
521                ev_emit_mod: Some(ConcurrentLayer {
522                    committer: Some(CommitterLayer {
523                        write_concurrency: Some(5),
524                        collect_interval_ms: Some(500),
525                        watermark_interval_ms: None,
526                    }),
527                    pruner: None,
528                }),
529                ..
530            },
531        );
532
533        assert_matches!(
534            that_then_this,
535            PipelineLayer {
536                sum_displays: Some(SequentialLayer {
537                    committer: Some(CommitterLayer {
538                        write_concurrency: Some(10),
539                        collect_interval_ms: Some(1000),
540                        watermark_interval_ms: Some(500),
541                    }),
542                    checkpoint_lag: Some(100),
543                }),
544                ev_emit_mod: Some(ConcurrentLayer {
545                    committer: Some(CommitterLayer {
546                        write_concurrency: Some(5),
547                        collect_interval_ms: Some(500),
548                        watermark_interval_ms: None,
549                    }),
550                    pruner: None,
551                }),
552                ..
553            },
554        );
555    }
556
557    #[test]
558    fn merge_pruner() {
559        let this = PrunerLayer {
560            interval_ms: None,
561            delay_ms: Some(100),
562            retention: Some(200),
563            max_chunk_size: Some(300),
564            prune_concurrency: Some(1),
565        };
566
567        let that = PrunerLayer {
568            interval_ms: Some(400),
569            delay_ms: None,
570            retention: Some(500),
571            max_chunk_size: Some(600),
572            prune_concurrency: Some(2),
573        };
574
575        let this_then_that = this.clone().merge(that.clone()).unwrap();
576        let that_then_this = that.clone().merge(this.clone()).unwrap();
577
578        assert_matches!(
579            this_then_that,
580            PrunerLayer {
581                interval_ms: Some(400),
582                delay_ms: Some(100),
583                retention: Some(500),
584                max_chunk_size: Some(600),
585                prune_concurrency: Some(2),
586            },
587        );
588
589        assert_matches!(
590            that_then_this,
591            PrunerLayer {
592                interval_ms: Some(400),
593                delay_ms: Some(100),
594                retention: Some(500),
595                max_chunk_size: Some(300),
596                prune_concurrency: Some(1),
597            },
598        );
599    }
600
601    #[test]
602    fn finish_concurrent_unpruned_override() {
603        let layer = ConcurrentLayer {
604            committer: None,
605            pruner: None,
606        };
607
608        let base = ConcurrentConfig {
609            committer: CommitterConfig {
610                write_concurrency: 5,
611                collect_interval_ms: 50,
612                watermark_interval_ms: 500,
613                ..Default::default()
614            },
615            pruner: Some(PrunerConfig::default()),
616        };
617
618        assert_matches!(
619            layer.finish(base).unwrap(),
620            ConcurrentConfig {
621                committer: CommitterConfig {
622                    write_concurrency: 5,
623                    collect_interval_ms: 50,
624                    watermark_interval_ms: 500,
625                    ..
626                },
627                pruner: None,
628            },
629        );
630    }
631
632    #[test]
633    fn finish_concurrent_no_pruner() {
634        let layer = ConcurrentLayer {
635            committer: None,
636            pruner: None,
637        };
638
639        let base = ConcurrentConfig {
640            committer: CommitterConfig {
641                write_concurrency: 5,
642                collect_interval_ms: 50,
643                watermark_interval_ms: 500,
644                ..Default::default()
645            },
646            pruner: None,
647        };
648
649        assert_matches!(
650            layer.finish(base).unwrap(),
651            ConcurrentConfig {
652                committer: CommitterConfig {
653                    write_concurrency: 5,
654                    collect_interval_ms: 50,
655                    watermark_interval_ms: 500,
656                    ..
657                },
658                pruner: None,
659            },
660        );
661    }
662
663    #[test]
664    fn finish_concurrent_pruner() {
665        let layer = ConcurrentLayer {
666            committer: None,
667            pruner: Some(PrunerLayer {
668                interval_ms: Some(1000),
669                ..Default::default()
670            }),
671        };
672
673        let base = ConcurrentConfig {
674            committer: CommitterConfig {
675                write_concurrency: 5,
676                collect_interval_ms: 50,
677                watermark_interval_ms: 500,
678                ..Default::default()
679            },
680            pruner: Some(PrunerConfig {
681                interval_ms: 100,
682                delay_ms: 200,
683                retention: 300,
684                max_chunk_size: 400,
685                prune_concurrency: 1,
686            }),
687        };
688
689        assert_matches!(
690            layer.finish(base).unwrap(),
691            ConcurrentConfig {
692                committer: CommitterConfig {
693                    write_concurrency: 5,
694                    collect_interval_ms: 50,
695                    watermark_interval_ms: 500,
696                    ..
697                },
698                pruner: Some(PrunerConfig {
699                    interval_ms: 1000,
700                    delay_ms: 200,
701                    retention: 300,
702                    max_chunk_size: 400,
703                    prune_concurrency: 1,
704                }),
705            },
706        );
707    }
708
709    #[test]
710    fn detect_unrecognized_fields() {
711        let err = toml::from_str::<IndexerConfig>(
712            r#"
713            i_dont_exist = "foo"
714            "#,
715        )
716        .unwrap_err();
717
718        assert!(
719            err.to_string().contains("i_dont_exist"),
720            "Unexpected error: {err}"
721        );
722    }
723}