sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::{
6    ingestion::IngestionConfig,
7    pipeline::{
8        CommitterConfig,
9        concurrent::{ConcurrentConfig, PrunerConfig},
10        sequential::SequentialConfig,
11    },
12};
13
14/// Trait for merging configuration structs together.
15pub trait Merge: Sized {
16    fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23    /// How checkpoints are read by the indexer.
24    pub ingestion: IngestionLayer,
25
26    /// Default configuration for committers that is shared by all pipelines. Pipelines can
27    /// override individual settings in their own configuration sections.
28    pub committer: CommitterLayer,
29
30    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
31    /// override individual settings in their own configuration sections. Concurrent pipelines
32    /// still need to specify a pruner configuration (although it can be empty) to indicate that
33    /// they want to enable pruning, but when they do, any missing values will be filled in by this
34    /// config.
35    pub pruner: PrunerLayer,
36
37    /// Per-pipeline configurations.
38    pub pipeline: PipelineLayer,
39}
40
41// Configuration layers apply overrides over a base configuration. When reading configs from a
42// file, we read them into layer types, and then apply those layers onto an existing configuration
43// (such as the default configuration) to `finish()` them.
44//
45// Treating configs as layers allows us to support configuration merging, where multiple
46// configuration files can be combined into one final configuration. Having a separate type for
47// reading configs also allows us to detect and warn against unrecognised fields.
48
49#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53    pub checkpoint_buffer_size: Option<usize>,
54    pub ingest_concurrency: Option<usize>,
55    pub retry_interval_ms: Option<u64>,
56    pub streaming_backoff_initial_batch_size: Option<usize>,
57    pub streaming_backoff_max_batch_size: Option<usize>,
58    pub streaming_connection_timeout_ms: Option<u64>,
59    pub streaming_statement_timeout_ms: Option<u64>,
60}
61
62#[DefaultConfig]
63#[derive(Clone, Default, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct SequentialLayer {
66    pub committer: Option<CommitterLayer>,
67    pub checkpoint_lag: Option<u64>,
68}
69
70#[DefaultConfig]
71#[derive(Clone, Default, Debug)]
72#[serde(deny_unknown_fields)]
73pub struct ConcurrentLayer {
74    pub committer: Option<CommitterLayer>,
75    pub pruner: Option<PrunerLayer>,
76}
77
78#[DefaultConfig]
79#[derive(Clone, Default, Debug)]
80#[serde(deny_unknown_fields)]
81pub struct CommitterLayer {
82    pub write_concurrency: Option<usize>,
83    pub collect_interval_ms: Option<u64>,
84    pub watermark_interval_ms: Option<u64>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct PrunerLayer {
91    pub interval_ms: Option<u64>,
92    pub delay_ms: Option<u64>,
93    pub retention: Option<u64>,
94    pub max_chunk_size: Option<u64>,
95    pub prune_concurrency: Option<u64>,
96}
97
98#[DefaultConfig]
99#[derive(Clone, Default, Debug)]
100#[serde(rename_all = "snake_case", deny_unknown_fields)]
101pub struct PipelineLayer {
102    // Sequential pipelines
103    pub sum_displays: Option<SequentialLayer>,
104
105    // All concurrent pipelines
106    pub coin_balance_buckets: Option<ConcurrentLayer>,
107    pub obj_info: Option<ConcurrentLayer>,
108    pub cp_sequence_numbers: Option<ConcurrentLayer>,
109    pub ev_emit_mod: Option<ConcurrentLayer>,
110    pub ev_struct_inst: Option<ConcurrentLayer>,
111    pub kv_checkpoints: Option<ConcurrentLayer>,
112    pub kv_epoch_ends: Option<ConcurrentLayer>,
113    pub kv_epoch_starts: Option<ConcurrentLayer>,
114    pub kv_feature_flags: Option<ConcurrentLayer>,
115    pub kv_objects: Option<ConcurrentLayer>,
116    pub kv_packages: Option<ConcurrentLayer>,
117    pub kv_protocol_configs: Option<ConcurrentLayer>,
118    pub kv_transactions: Option<ConcurrentLayer>,
119    pub obj_versions: Option<ConcurrentLayer>,
120    pub tx_affected_addresses: Option<ConcurrentLayer>,
121    pub tx_affected_objects: Option<ConcurrentLayer>,
122    pub tx_balance_changes: Option<ConcurrentLayer>,
123    pub tx_calls: Option<ConcurrentLayer>,
124    pub tx_digests: Option<ConcurrentLayer>,
125    pub tx_kinds: Option<ConcurrentLayer>,
126}
127
128impl IndexerConfig {
129    /// Generate an example configuration, suitable for demonstrating the fields available to
130    /// configure.
131    pub fn example() -> Self {
132        let mut example: Self = Default::default();
133
134        example.ingestion = IngestionConfig::default().into();
135        example.committer = CommitterConfig::default().into();
136        example.pruner = PrunerConfig::default().into();
137        example.pipeline = PipelineLayer::example();
138
139        example
140    }
141
142    /// Generate a configuration suitable for testing. This is the same as the example
143    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
144    /// less time waiting.
145    pub fn for_test() -> Self {
146        Self::example()
147            .merge(IndexerConfig {
148                ingestion: IngestionLayer {
149                    retry_interval_ms: Some(10),
150                    ingest_concurrency: Some(1),
151                    ..Default::default()
152                },
153                committer: CommitterLayer {
154                    collect_interval_ms: Some(50),
155                    watermark_interval_ms: Some(50),
156                    write_concurrency: Some(1),
157                },
158                pruner: PrunerLayer {
159                    interval_ms: Some(50),
160                    delay_ms: Some(0),
161                    ..Default::default()
162                },
163                ..Default::default()
164            })
165            .expect("Merge failed for test configuration")
166    }
167}
168
169impl IngestionLayer {
170    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
171        Ok(IngestionConfig {
172            checkpoint_buffer_size: self
173                .checkpoint_buffer_size
174                .unwrap_or(base.checkpoint_buffer_size),
175            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
176            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
177            streaming_backoff_initial_batch_size: self
178                .streaming_backoff_initial_batch_size
179                .unwrap_or(base.streaming_backoff_initial_batch_size),
180            streaming_backoff_max_batch_size: self
181                .streaming_backoff_max_batch_size
182                .unwrap_or(base.streaming_backoff_max_batch_size),
183            streaming_connection_timeout_ms: self
184                .streaming_connection_timeout_ms
185                .unwrap_or(base.streaming_connection_timeout_ms),
186            streaming_statement_timeout_ms: self
187                .streaming_statement_timeout_ms
188                .unwrap_or(base.streaming_statement_timeout_ms),
189        })
190    }
191}
192
193impl SequentialLayer {
194    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
195        Ok(SequentialConfig {
196            committer: if let Some(committer) = self.committer {
197                committer.finish(base.committer)?
198            } else {
199                base.committer
200            },
201            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
202        })
203    }
204}
205
206impl ConcurrentLayer {
207    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
208    /// appear in the layer *and* in the base.
209    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
210        Ok(ConcurrentConfig {
211            committer: if let Some(committer) = self.committer {
212                committer.finish(base.committer)?
213            } else {
214                base.committer
215            },
216            pruner: match (self.pruner, base.pruner) {
217                (None, _) | (_, None) => None,
218                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
219            },
220        })
221    }
222}
223
224impl CommitterLayer {
225    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
226        Ok(CommitterConfig {
227            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
228            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
229            watermark_interval_ms: self
230                .watermark_interval_ms
231                .unwrap_or(base.watermark_interval_ms),
232        })
233    }
234}
235
236impl PrunerLayer {
237    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
238        Ok(PrunerConfig {
239            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
240            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
241            retention: self.retention.unwrap_or(base.retention),
242            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
243            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
244        })
245    }
246}
247
248impl PipelineLayer {
249    /// Generate an example configuration, suitable for demonstrating the fields available to
250    /// configure.
251    pub fn example() -> Self {
252        PipelineLayer {
253            coin_balance_buckets: Some(Default::default()),
254            obj_info: Some(Default::default()),
255            sum_displays: Some(Default::default()),
256            cp_sequence_numbers: Some(Default::default()),
257            ev_emit_mod: Some(Default::default()),
258            ev_struct_inst: Some(Default::default()),
259            kv_checkpoints: Some(Default::default()),
260            kv_epoch_ends: Some(Default::default()),
261            kv_epoch_starts: Some(Default::default()),
262            kv_feature_flags: Some(Default::default()),
263            kv_objects: Some(Default::default()),
264            kv_packages: Some(Default::default()),
265            kv_protocol_configs: Some(Default::default()),
266            kv_transactions: Some(Default::default()),
267            obj_versions: Some(Default::default()),
268            tx_affected_addresses: Some(Default::default()),
269            tx_affected_objects: Some(Default::default()),
270            tx_balance_changes: Some(Default::default()),
271            tx_calls: Some(Default::default()),
272            tx_digests: Some(Default::default()),
273            tx_kinds: Some(Default::default()),
274        }
275    }
276}
277
278impl Merge for IndexerConfig {
279    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
280        Ok(IndexerConfig {
281            ingestion: self.ingestion.merge(other.ingestion)?,
282            committer: self.committer.merge(other.committer)?,
283            pruner: self.pruner.merge(other.pruner)?,
284            pipeline: self.pipeline.merge(other.pipeline)?,
285        })
286    }
287}
288
289impl Merge for IngestionLayer {
290    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
291        Ok(IngestionLayer {
292            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
293            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
294            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
295            streaming_backoff_initial_batch_size: other
296                .streaming_backoff_initial_batch_size
297                .or(self.streaming_backoff_initial_batch_size),
298            streaming_backoff_max_batch_size: other
299                .streaming_backoff_max_batch_size
300                .or(self.streaming_backoff_max_batch_size),
301            streaming_connection_timeout_ms: other
302                .streaming_connection_timeout_ms
303                .or(self.streaming_connection_timeout_ms),
304            streaming_statement_timeout_ms: other
305                .streaming_statement_timeout_ms
306                .or(self.streaming_statement_timeout_ms),
307        })
308    }
309}
310
311impl Merge for SequentialLayer {
312    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
313        Ok(SequentialLayer {
314            committer: self.committer.merge(other.committer)?,
315            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
316        })
317    }
318}
319
320impl Merge for ConcurrentLayer {
321    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
322        Ok(ConcurrentLayer {
323            committer: self.committer.merge(other.committer)?,
324            pruner: self.pruner.merge(other.pruner)?,
325        })
326    }
327}
328
329impl Merge for CommitterLayer {
330    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
331        Ok(CommitterLayer {
332            write_concurrency: other.write_concurrency.or(self.write_concurrency),
333            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
334            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
335        })
336    }
337}
338
339impl Merge for PrunerLayer {
340    /// Last write takes precedence for all fields except the `retention`, which takes the max of
341    /// all available values.
342    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
343        Ok(PrunerLayer {
344            interval_ms: other.interval_ms.or(self.interval_ms),
345            delay_ms: other.delay_ms.or(self.delay_ms),
346            retention: match (other.retention, self.retention) {
347                (Some(a), Some(b)) => Some(a.max(b)),
348                (Some(a), _) | (_, Some(a)) => Some(a),
349                (None, None) => None,
350            },
351            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
352            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
353        })
354    }
355}
356
357impl Merge for PipelineLayer {
358    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
359        Ok(PipelineLayer {
360            coin_balance_buckets: self
361                .coin_balance_buckets
362                .merge(other.coin_balance_buckets)?,
363            obj_info: self.obj_info.merge(other.obj_info)?,
364            sum_displays: self.sum_displays.merge(other.sum_displays)?,
365            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
366            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
367            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
368            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
369            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
370            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
371            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
372            kv_objects: self.kv_objects.merge(other.kv_objects)?,
373            kv_packages: self.kv_packages.merge(other.kv_packages)?,
374            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
375            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
376            obj_versions: self.obj_versions.merge(other.obj_versions)?,
377            tx_affected_addresses: self
378                .tx_affected_addresses
379                .merge(other.tx_affected_addresses)?,
380            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
381            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
382            tx_calls: self.tx_calls.merge(other.tx_calls)?,
383            tx_digests: self.tx_digests.merge(other.tx_digests)?,
384            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
385        })
386    }
387}
388
389impl<T: Merge> Merge for Option<T> {
390    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
391        Ok(match (self, other) {
392            (Some(a), Some(b)) => Some(a.merge(b)?),
393            (Some(a), _) | (_, Some(a)) => Some(a),
394            (None, None) => None,
395        })
396    }
397}
398
399impl From<IngestionConfig> for IngestionLayer {
400    fn from(config: IngestionConfig) -> Self {
401        Self {
402            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
403            ingest_concurrency: Some(config.ingest_concurrency),
404            retry_interval_ms: Some(config.retry_interval_ms),
405            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
406            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
407            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
408            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
409        }
410    }
411}
412
413impl From<SequentialConfig> for SequentialLayer {
414    fn from(config: SequentialConfig) -> Self {
415        Self {
416            committer: Some(config.committer.into()),
417            checkpoint_lag: Some(config.checkpoint_lag),
418        }
419    }
420}
421
422impl From<ConcurrentConfig> for ConcurrentLayer {
423    fn from(config: ConcurrentConfig) -> Self {
424        Self {
425            committer: Some(config.committer.into()),
426            pruner: config.pruner.map(Into::into),
427        }
428    }
429}
430
431impl From<CommitterConfig> for CommitterLayer {
432    fn from(config: CommitterConfig) -> Self {
433        Self {
434            write_concurrency: Some(config.write_concurrency),
435            collect_interval_ms: Some(config.collect_interval_ms),
436            watermark_interval_ms: Some(config.watermark_interval_ms),
437        }
438    }
439}
440
441impl From<PrunerConfig> for PrunerLayer {
442    fn from(config: PrunerConfig) -> Self {
443        Self {
444            interval_ms: Some(config.interval_ms),
445            delay_ms: Some(config.delay_ms),
446            retention: Some(config.retention),
447            max_chunk_size: Some(config.max_chunk_size),
448            prune_concurrency: Some(config.prune_concurrency),
449        }
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456
457    macro_rules! assert_matches {
458        ($value:expr, $pattern:pat $(,)?) => {
459            let value = $value;
460            assert!(
461                matches!(value, $pattern),
462                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
463                stringify!($pattern)
464            );
465        };
466    }
467
468    #[test]
469    fn merge_recursive() {
470        let this = PipelineLayer {
471            sum_displays: Some(SequentialLayer {
472                committer: Some(CommitterLayer {
473                    write_concurrency: Some(10),
474                    collect_interval_ms: Some(1000),
475                    watermark_interval_ms: None,
476                }),
477                checkpoint_lag: Some(100),
478            }),
479            ev_emit_mod: Some(ConcurrentLayer {
480                committer: Some(CommitterLayer {
481                    write_concurrency: Some(5),
482                    collect_interval_ms: Some(500),
483                    watermark_interval_ms: None,
484                }),
485                ..Default::default()
486            }),
487            ..Default::default()
488        };
489
490        let that = PipelineLayer {
491            sum_displays: Some(SequentialLayer {
492                committer: Some(CommitterLayer {
493                    write_concurrency: Some(5),
494                    collect_interval_ms: None,
495                    watermark_interval_ms: Some(500),
496                }),
497                checkpoint_lag: Some(200),
498            }),
499            ev_emit_mod: None,
500            ..Default::default()
501        };
502
503        let this_then_that = this.clone().merge(that.clone()).unwrap();
504        let that_then_this = that.clone().merge(this.clone()).unwrap();
505
506        assert_matches!(
507            this_then_that,
508            PipelineLayer {
509                sum_displays: Some(SequentialLayer {
510                    committer: Some(CommitterLayer {
511                        write_concurrency: Some(5),
512                        collect_interval_ms: Some(1000),
513                        watermark_interval_ms: Some(500),
514                    }),
515                    checkpoint_lag: Some(200),
516                }),
517                ev_emit_mod: Some(ConcurrentLayer {
518                    committer: Some(CommitterLayer {
519                        write_concurrency: Some(5),
520                        collect_interval_ms: Some(500),
521                        watermark_interval_ms: None,
522                    }),
523                    pruner: None,
524                }),
525                ..
526            },
527        );
528
529        assert_matches!(
530            that_then_this,
531            PipelineLayer {
532                sum_displays: Some(SequentialLayer {
533                    committer: Some(CommitterLayer {
534                        write_concurrency: Some(10),
535                        collect_interval_ms: Some(1000),
536                        watermark_interval_ms: Some(500),
537                    }),
538                    checkpoint_lag: Some(100),
539                }),
540                ev_emit_mod: Some(ConcurrentLayer {
541                    committer: Some(CommitterLayer {
542                        write_concurrency: Some(5),
543                        collect_interval_ms: Some(500),
544                        watermark_interval_ms: None,
545                    }),
546                    pruner: None,
547                }),
548                ..
549            },
550        );
551    }
552
553    #[test]
554    fn merge_pruner() {
555        let this = PrunerLayer {
556            interval_ms: None,
557            delay_ms: Some(100),
558            retention: Some(200),
559            max_chunk_size: Some(300),
560            prune_concurrency: Some(1),
561        };
562
563        let that = PrunerLayer {
564            interval_ms: Some(400),
565            delay_ms: None,
566            retention: Some(500),
567            max_chunk_size: Some(600),
568            prune_concurrency: Some(2),
569        };
570
571        let this_then_that = this.clone().merge(that.clone()).unwrap();
572        let that_then_this = that.clone().merge(this.clone()).unwrap();
573
574        assert_matches!(
575            this_then_that,
576            PrunerLayer {
577                interval_ms: Some(400),
578                delay_ms: Some(100),
579                retention: Some(500),
580                max_chunk_size: Some(600),
581                prune_concurrency: Some(2),
582            },
583        );
584
585        assert_matches!(
586            that_then_this,
587            PrunerLayer {
588                interval_ms: Some(400),
589                delay_ms: Some(100),
590                retention: Some(500),
591                max_chunk_size: Some(300),
592                prune_concurrency: Some(1),
593            },
594        );
595    }
596
597    #[test]
598    fn finish_concurrent_unpruned_override() {
599        let layer = ConcurrentLayer {
600            committer: None,
601            pruner: None,
602        };
603
604        let base = ConcurrentConfig {
605            committer: CommitterConfig {
606                write_concurrency: 5,
607                collect_interval_ms: 50,
608                watermark_interval_ms: 500,
609            },
610            pruner: Some(PrunerConfig::default()),
611        };
612
613        assert_matches!(
614            layer.finish(base).unwrap(),
615            ConcurrentConfig {
616                committer: CommitterConfig {
617                    write_concurrency: 5,
618                    collect_interval_ms: 50,
619                    watermark_interval_ms: 500,
620                },
621                pruner: None,
622            },
623        );
624    }
625
626    #[test]
627    fn finish_concurrent_no_pruner() {
628        let layer = ConcurrentLayer {
629            committer: None,
630            pruner: None,
631        };
632
633        let base = ConcurrentConfig {
634            committer: CommitterConfig {
635                write_concurrency: 5,
636                collect_interval_ms: 50,
637                watermark_interval_ms: 500,
638            },
639            pruner: None,
640        };
641
642        assert_matches!(
643            layer.finish(base).unwrap(),
644            ConcurrentConfig {
645                committer: CommitterConfig {
646                    write_concurrency: 5,
647                    collect_interval_ms: 50,
648                    watermark_interval_ms: 500,
649                },
650                pruner: None,
651            },
652        );
653    }
654
655    #[test]
656    fn finish_concurrent_pruner() {
657        let layer = ConcurrentLayer {
658            committer: None,
659            pruner: Some(PrunerLayer {
660                interval_ms: Some(1000),
661                ..Default::default()
662            }),
663        };
664
665        let base = ConcurrentConfig {
666            committer: CommitterConfig {
667                write_concurrency: 5,
668                collect_interval_ms: 50,
669                watermark_interval_ms: 500,
670            },
671            pruner: Some(PrunerConfig {
672                interval_ms: 100,
673                delay_ms: 200,
674                retention: 300,
675                max_chunk_size: 400,
676                prune_concurrency: 1,
677            }),
678        };
679
680        assert_matches!(
681            layer.finish(base).unwrap(),
682            ConcurrentConfig {
683                committer: CommitterConfig {
684                    write_concurrency: 5,
685                    collect_interval_ms: 50,
686                    watermark_interval_ms: 500,
687                },
688                pruner: Some(PrunerConfig {
689                    interval_ms: 1000,
690                    delay_ms: 200,
691                    retention: 300,
692                    max_chunk_size: 400,
693                    prune_concurrency: 1,
694                }),
695            },
696        );
697    }
698
699    #[test]
700    fn detect_unrecognized_fields() {
701        let err = toml::from_str::<IndexerConfig>(
702            r#"
703            i_dont_exist = "foo"
704            "#,
705        )
706        .unwrap_err();
707
708        assert!(
709            err.to_string().contains("i_dont_exist"),
710            "Unexpected error: {err}"
711        );
712    }
713}