sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11/// Trait for merging configuration structs together.
12pub trait Merge: Sized {
13    fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20    /// How checkpoints are read by the indexer.
21    pub ingestion: IngestionLayer,
22
23    /// Default configuration for committers that is shared by all pipelines. Pipelines can
24    /// override individual settings in their own configuration sections.
25    pub committer: CommitterLayer,
26
27    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
28    /// override individual settings in their own configuration sections. Concurrent pipelines
29    /// still need to specify a pruner configuration (although it can be empty) to indicate that
30    /// they want to enable pruning, but when they do, any missing values will be filled in by this
31    /// config.
32    pub pruner: PrunerLayer,
33
34    /// Per-pipeline configurations.
35    pub pipeline: PipelineLayer,
36}
37
38// Configuration layers apply overrides over a base configuration. When reading configs from a
39// file, we read them into layer types, and then apply those layers onto an existing configuration
40// (such as the default configuration) to `finish()` them.
41//
42// Treating configs as layers allows us to support configuration merging, where multiple
43// configuration files can be combined into one final configuration. Having a separate type for
44// reading configs also allows us to detect and warn against unrecognised fields.
45
46#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50    pub checkpoint_buffer_size: Option<usize>,
51    pub ingest_concurrency: Option<usize>,
52    pub retry_interval_ms: Option<u64>,
53    pub streaming_backoff_initial_batch_size: Option<usize>,
54    pub streaming_backoff_max_batch_size: Option<usize>,
55    pub streaming_connection_timeout_ms: Option<u64>,
56    pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63    pub committer: Option<CommitterLayer>,
64    pub checkpoint_lag: Option<u64>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct ConcurrentLayer {
71    pub committer: Option<CommitterLayer>,
72    pub pruner: Option<PrunerLayer>,
73}
74
75#[DefaultConfig]
76#[derive(Clone, Default, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct CommitterLayer {
79    pub write_concurrency: Option<usize>,
80    pub collect_interval_ms: Option<u64>,
81    pub watermark_interval_ms: Option<u64>,
82}
83
84#[DefaultConfig]
85#[derive(Clone, Default, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct PrunerLayer {
88    pub interval_ms: Option<u64>,
89    pub delay_ms: Option<u64>,
90    pub retention: Option<u64>,
91    pub max_chunk_size: Option<u64>,
92    pub prune_concurrency: Option<u64>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(rename_all = "snake_case", deny_unknown_fields)]
98pub struct PipelineLayer {
99    // Sequential pipelines
100    pub sum_displays: Option<SequentialLayer>,
101
102    // All concurrent pipelines
103    pub coin_balance_buckets: Option<ConcurrentLayer>,
104    pub obj_info: Option<ConcurrentLayer>,
105    pub cp_sequence_numbers: Option<ConcurrentLayer>,
106    pub ev_emit_mod: Option<ConcurrentLayer>,
107    pub ev_struct_inst: Option<ConcurrentLayer>,
108    pub kv_checkpoints: Option<ConcurrentLayer>,
109    pub kv_epoch_ends: Option<ConcurrentLayer>,
110    pub kv_epoch_starts: Option<ConcurrentLayer>,
111    pub kv_feature_flags: Option<ConcurrentLayer>,
112    pub kv_objects: Option<ConcurrentLayer>,
113    pub kv_packages: Option<ConcurrentLayer>,
114    pub kv_protocol_configs: Option<ConcurrentLayer>,
115    pub kv_transactions: Option<ConcurrentLayer>,
116    pub obj_versions: Option<ConcurrentLayer>,
117    pub tx_affected_addresses: Option<ConcurrentLayer>,
118    pub tx_affected_objects: Option<ConcurrentLayer>,
119    pub tx_balance_changes: Option<ConcurrentLayer>,
120    pub tx_calls: Option<ConcurrentLayer>,
121    pub tx_digests: Option<ConcurrentLayer>,
122    pub tx_kinds: Option<ConcurrentLayer>,
123}
124
125impl IndexerConfig {
126    /// Generate an example configuration, suitable for demonstrating the fields available to
127    /// configure.
128    pub fn example() -> Self {
129        let mut example: Self = Default::default();
130
131        example.ingestion = IngestionConfig::default().into();
132        example.committer = CommitterConfig::default().into();
133        example.pruner = PrunerConfig::default().into();
134        example.pipeline = PipelineLayer::example();
135
136        example
137    }
138
139    /// Generate a configuration suitable for testing. This is the same as the example
140    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
141    /// less time waiting.
142    pub fn for_test() -> Self {
143        Self::example()
144            .merge(IndexerConfig {
145                ingestion: IngestionLayer {
146                    retry_interval_ms: Some(10),
147                    ingest_concurrency: Some(1),
148                    ..Default::default()
149                },
150                committer: CommitterLayer {
151                    collect_interval_ms: Some(50),
152                    watermark_interval_ms: Some(50),
153                    write_concurrency: Some(1),
154                },
155                pruner: PrunerLayer {
156                    interval_ms: Some(50),
157                    delay_ms: Some(0),
158                    ..Default::default()
159                },
160                ..Default::default()
161            })
162            .expect("Merge failed for test configuration")
163    }
164}
165
166impl IngestionLayer {
167    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
168        Ok(IngestionConfig {
169            checkpoint_buffer_size: self
170                .checkpoint_buffer_size
171                .unwrap_or(base.checkpoint_buffer_size),
172            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
173            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
174            streaming_backoff_initial_batch_size: self
175                .streaming_backoff_initial_batch_size
176                .unwrap_or(base.streaming_backoff_initial_batch_size),
177            streaming_backoff_max_batch_size: self
178                .streaming_backoff_max_batch_size
179                .unwrap_or(base.streaming_backoff_max_batch_size),
180            streaming_connection_timeout_ms: self
181                .streaming_connection_timeout_ms
182                .unwrap_or(base.streaming_connection_timeout_ms),
183            streaming_statement_timeout_ms: self
184                .streaming_statement_timeout_ms
185                .unwrap_or(base.streaming_statement_timeout_ms),
186        })
187    }
188}
189
190impl SequentialLayer {
191    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
192        Ok(SequentialConfig {
193            committer: if let Some(committer) = self.committer {
194                committer.finish(base.committer)?
195            } else {
196                base.committer
197            },
198            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
199        })
200    }
201}
202
203impl ConcurrentLayer {
204    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
205    /// appear in the layer *and* in the base.
206    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
207        Ok(ConcurrentConfig {
208            committer: if let Some(committer) = self.committer {
209                committer.finish(base.committer)?
210            } else {
211                base.committer
212            },
213            pruner: match (self.pruner, base.pruner) {
214                (None, _) | (_, None) => None,
215                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
216            },
217        })
218    }
219}
220
221impl CommitterLayer {
222    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
223        Ok(CommitterConfig {
224            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
225            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
226            watermark_interval_ms: self
227                .watermark_interval_ms
228                .unwrap_or(base.watermark_interval_ms),
229        })
230    }
231}
232
233impl PrunerLayer {
234    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
235        Ok(PrunerConfig {
236            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
237            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
238            retention: self.retention.unwrap_or(base.retention),
239            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
240            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
241        })
242    }
243}
244
245impl PipelineLayer {
246    /// Generate an example configuration, suitable for demonstrating the fields available to
247    /// configure.
248    pub fn example() -> Self {
249        PipelineLayer {
250            coin_balance_buckets: Some(Default::default()),
251            obj_info: Some(Default::default()),
252            sum_displays: Some(Default::default()),
253            cp_sequence_numbers: Some(Default::default()),
254            ev_emit_mod: Some(Default::default()),
255            ev_struct_inst: Some(Default::default()),
256            kv_checkpoints: Some(Default::default()),
257            kv_epoch_ends: Some(Default::default()),
258            kv_epoch_starts: Some(Default::default()),
259            kv_feature_flags: Some(Default::default()),
260            kv_objects: Some(Default::default()),
261            kv_packages: Some(Default::default()),
262            kv_protocol_configs: Some(Default::default()),
263            kv_transactions: Some(Default::default()),
264            obj_versions: Some(Default::default()),
265            tx_affected_addresses: Some(Default::default()),
266            tx_affected_objects: Some(Default::default()),
267            tx_balance_changes: Some(Default::default()),
268            tx_calls: Some(Default::default()),
269            tx_digests: Some(Default::default()),
270            tx_kinds: Some(Default::default()),
271        }
272    }
273}
274
275impl Merge for IndexerConfig {
276    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
277        Ok(IndexerConfig {
278            ingestion: self.ingestion.merge(other.ingestion)?,
279            committer: self.committer.merge(other.committer)?,
280            pruner: self.pruner.merge(other.pruner)?,
281            pipeline: self.pipeline.merge(other.pipeline)?,
282        })
283    }
284}
285
286impl Merge for IngestionLayer {
287    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
288        Ok(IngestionLayer {
289            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
290            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
291            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
292            streaming_backoff_initial_batch_size: other
293                .streaming_backoff_initial_batch_size
294                .or(self.streaming_backoff_initial_batch_size),
295            streaming_backoff_max_batch_size: other
296                .streaming_backoff_max_batch_size
297                .or(self.streaming_backoff_max_batch_size),
298            streaming_connection_timeout_ms: other
299                .streaming_connection_timeout_ms
300                .or(self.streaming_connection_timeout_ms),
301            streaming_statement_timeout_ms: other
302                .streaming_statement_timeout_ms
303                .or(self.streaming_statement_timeout_ms),
304        })
305    }
306}
307
308impl Merge for SequentialLayer {
309    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
310        Ok(SequentialLayer {
311            committer: self.committer.merge(other.committer)?,
312            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
313        })
314    }
315}
316
317impl Merge for ConcurrentLayer {
318    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
319        Ok(ConcurrentLayer {
320            committer: self.committer.merge(other.committer)?,
321            pruner: self.pruner.merge(other.pruner)?,
322        })
323    }
324}
325
326impl Merge for CommitterLayer {
327    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
328        Ok(CommitterLayer {
329            write_concurrency: other.write_concurrency.or(self.write_concurrency),
330            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
331            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
332        })
333    }
334}
335
336impl Merge for PrunerLayer {
337    /// Last write takes precedence for all fields except the `retention`, which takes the max of
338    /// all available values.
339    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
340        Ok(PrunerLayer {
341            interval_ms: other.interval_ms.or(self.interval_ms),
342            delay_ms: other.delay_ms.or(self.delay_ms),
343            retention: match (other.retention, self.retention) {
344                (Some(a), Some(b)) => Some(a.max(b)),
345                (Some(a), _) | (_, Some(a)) => Some(a),
346                (None, None) => None,
347            },
348            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
349            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
350        })
351    }
352}
353
354impl Merge for PipelineLayer {
355    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
356        Ok(PipelineLayer {
357            coin_balance_buckets: self
358                .coin_balance_buckets
359                .merge(other.coin_balance_buckets)?,
360            obj_info: self.obj_info.merge(other.obj_info)?,
361            sum_displays: self.sum_displays.merge(other.sum_displays)?,
362            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
363            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
364            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
365            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
366            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
367            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
368            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
369            kv_objects: self.kv_objects.merge(other.kv_objects)?,
370            kv_packages: self.kv_packages.merge(other.kv_packages)?,
371            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
372            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
373            obj_versions: self.obj_versions.merge(other.obj_versions)?,
374            tx_affected_addresses: self
375                .tx_affected_addresses
376                .merge(other.tx_affected_addresses)?,
377            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
378            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
379            tx_calls: self.tx_calls.merge(other.tx_calls)?,
380            tx_digests: self.tx_digests.merge(other.tx_digests)?,
381            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
382        })
383    }
384}
385
386impl<T: Merge> Merge for Option<T> {
387    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
388        Ok(match (self, other) {
389            (Some(a), Some(b)) => Some(a.merge(b)?),
390            (Some(a), _) | (_, Some(a)) => Some(a),
391            (None, None) => None,
392        })
393    }
394}
395
396impl From<IngestionConfig> for IngestionLayer {
397    fn from(config: IngestionConfig) -> Self {
398        Self {
399            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
400            ingest_concurrency: Some(config.ingest_concurrency),
401            retry_interval_ms: Some(config.retry_interval_ms),
402            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
403            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
404            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
405            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
406        }
407    }
408}
409
410impl From<SequentialConfig> for SequentialLayer {
411    fn from(config: SequentialConfig) -> Self {
412        Self {
413            committer: Some(config.committer.into()),
414            checkpoint_lag: Some(config.checkpoint_lag),
415        }
416    }
417}
418
419impl From<ConcurrentConfig> for ConcurrentLayer {
420    fn from(config: ConcurrentConfig) -> Self {
421        Self {
422            committer: Some(config.committer.into()),
423            pruner: config.pruner.map(Into::into),
424        }
425    }
426}
427
428impl From<CommitterConfig> for CommitterLayer {
429    fn from(config: CommitterConfig) -> Self {
430        Self {
431            write_concurrency: Some(config.write_concurrency),
432            collect_interval_ms: Some(config.collect_interval_ms),
433            watermark_interval_ms: Some(config.watermark_interval_ms),
434        }
435    }
436}
437
438impl From<PrunerConfig> for PrunerLayer {
439    fn from(config: PrunerConfig) -> Self {
440        Self {
441            interval_ms: Some(config.interval_ms),
442            delay_ms: Some(config.delay_ms),
443            retention: Some(config.retention),
444            max_chunk_size: Some(config.max_chunk_size),
445            prune_concurrency: Some(config.prune_concurrency),
446        }
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    macro_rules! assert_matches {
455        ($value:expr, $pattern:pat $(,)?) => {
456            let value = $value;
457            assert!(
458                matches!(value, $pattern),
459                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
460                stringify!($pattern)
461            );
462        };
463    }
464
465    #[test]
466    fn merge_recursive() {
467        let this = PipelineLayer {
468            sum_displays: Some(SequentialLayer {
469                committer: Some(CommitterLayer {
470                    write_concurrency: Some(10),
471                    collect_interval_ms: Some(1000),
472                    watermark_interval_ms: None,
473                }),
474                checkpoint_lag: Some(100),
475            }),
476            ev_emit_mod: Some(ConcurrentLayer {
477                committer: Some(CommitterLayer {
478                    write_concurrency: Some(5),
479                    collect_interval_ms: Some(500),
480                    watermark_interval_ms: None,
481                }),
482                ..Default::default()
483            }),
484            ..Default::default()
485        };
486
487        let that = PipelineLayer {
488            sum_displays: Some(SequentialLayer {
489                committer: Some(CommitterLayer {
490                    write_concurrency: Some(5),
491                    collect_interval_ms: None,
492                    watermark_interval_ms: Some(500),
493                }),
494                checkpoint_lag: Some(200),
495            }),
496            ev_emit_mod: None,
497            ..Default::default()
498        };
499
500        let this_then_that = this.clone().merge(that.clone()).unwrap();
501        let that_then_this = that.clone().merge(this.clone()).unwrap();
502
503        assert_matches!(
504            this_then_that,
505            PipelineLayer {
506                sum_displays: Some(SequentialLayer {
507                    committer: Some(CommitterLayer {
508                        write_concurrency: Some(5),
509                        collect_interval_ms: Some(1000),
510                        watermark_interval_ms: Some(500),
511                    }),
512                    checkpoint_lag: Some(200),
513                }),
514                ev_emit_mod: Some(ConcurrentLayer {
515                    committer: Some(CommitterLayer {
516                        write_concurrency: Some(5),
517                        collect_interval_ms: Some(500),
518                        watermark_interval_ms: None,
519                    }),
520                    pruner: None,
521                }),
522                ..
523            },
524        );
525
526        assert_matches!(
527            that_then_this,
528            PipelineLayer {
529                sum_displays: Some(SequentialLayer {
530                    committer: Some(CommitterLayer {
531                        write_concurrency: Some(10),
532                        collect_interval_ms: Some(1000),
533                        watermark_interval_ms: Some(500),
534                    }),
535                    checkpoint_lag: Some(100),
536                }),
537                ev_emit_mod: Some(ConcurrentLayer {
538                    committer: Some(CommitterLayer {
539                        write_concurrency: Some(5),
540                        collect_interval_ms: Some(500),
541                        watermark_interval_ms: None,
542                    }),
543                    pruner: None,
544                }),
545                ..
546            },
547        );
548    }
549
550    #[test]
551    fn merge_pruner() {
552        let this = PrunerLayer {
553            interval_ms: None,
554            delay_ms: Some(100),
555            retention: Some(200),
556            max_chunk_size: Some(300),
557            prune_concurrency: Some(1),
558        };
559
560        let that = PrunerLayer {
561            interval_ms: Some(400),
562            delay_ms: None,
563            retention: Some(500),
564            max_chunk_size: Some(600),
565            prune_concurrency: Some(2),
566        };
567
568        let this_then_that = this.clone().merge(that.clone()).unwrap();
569        let that_then_this = that.clone().merge(this.clone()).unwrap();
570
571        assert_matches!(
572            this_then_that,
573            PrunerLayer {
574                interval_ms: Some(400),
575                delay_ms: Some(100),
576                retention: Some(500),
577                max_chunk_size: Some(600),
578                prune_concurrency: Some(2),
579            },
580        );
581
582        assert_matches!(
583            that_then_this,
584            PrunerLayer {
585                interval_ms: Some(400),
586                delay_ms: Some(100),
587                retention: Some(500),
588                max_chunk_size: Some(300),
589                prune_concurrency: Some(1),
590            },
591        );
592    }
593
594    #[test]
595    fn finish_concurrent_unpruned_override() {
596        let layer = ConcurrentLayer {
597            committer: None,
598            pruner: None,
599        };
600
601        let base = ConcurrentConfig {
602            committer: CommitterConfig {
603                write_concurrency: 5,
604                collect_interval_ms: 50,
605                watermark_interval_ms: 500,
606            },
607            pruner: Some(PrunerConfig::default()),
608        };
609
610        assert_matches!(
611            layer.finish(base).unwrap(),
612            ConcurrentConfig {
613                committer: CommitterConfig {
614                    write_concurrency: 5,
615                    collect_interval_ms: 50,
616                    watermark_interval_ms: 500,
617                },
618                pruner: None,
619            },
620        );
621    }
622
623    #[test]
624    fn finish_concurrent_no_pruner() {
625        let layer = ConcurrentLayer {
626            committer: None,
627            pruner: None,
628        };
629
630        let base = ConcurrentConfig {
631            committer: CommitterConfig {
632                write_concurrency: 5,
633                collect_interval_ms: 50,
634                watermark_interval_ms: 500,
635            },
636            pruner: None,
637        };
638
639        assert_matches!(
640            layer.finish(base).unwrap(),
641            ConcurrentConfig {
642                committer: CommitterConfig {
643                    write_concurrency: 5,
644                    collect_interval_ms: 50,
645                    watermark_interval_ms: 500,
646                },
647                pruner: None,
648            },
649        );
650    }
651
652    #[test]
653    fn finish_concurrent_pruner() {
654        let layer = ConcurrentLayer {
655            committer: None,
656            pruner: Some(PrunerLayer {
657                interval_ms: Some(1000),
658                ..Default::default()
659            }),
660        };
661
662        let base = ConcurrentConfig {
663            committer: CommitterConfig {
664                write_concurrency: 5,
665                collect_interval_ms: 50,
666                watermark_interval_ms: 500,
667            },
668            pruner: Some(PrunerConfig {
669                interval_ms: 100,
670                delay_ms: 200,
671                retention: 300,
672                max_chunk_size: 400,
673                prune_concurrency: 1,
674            }),
675        };
676
677        assert_matches!(
678            layer.finish(base).unwrap(),
679            ConcurrentConfig {
680                committer: CommitterConfig {
681                    write_concurrency: 5,
682                    collect_interval_ms: 50,
683                    watermark_interval_ms: 500,
684                },
685                pruner: Some(PrunerConfig {
686                    interval_ms: 1000,
687                    delay_ms: 200,
688                    retention: 300,
689                    max_chunk_size: 400,
690                    prune_concurrency: 1,
691                }),
692            },
693        );
694    }
695
696    #[test]
697    fn detect_unrecognized_fields() {
698        let err = toml::from_str::<IndexerConfig>(
699            r#"
700            i_dont_exist = "foo"
701            "#,
702        )
703        .unwrap_err();
704
705        assert!(
706            err.to_string().contains("i_dont_exist"),
707            "Unexpected error: {err}"
708        );
709    }
710}