sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::config::ConcurrencyConfig;
6use sui_indexer_alt_framework::ingestion::IngestionConfig;
7use sui_indexer_alt_framework::pipeline::CommitterConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
9use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
10use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
11
12/// Trait for merging configuration structs together.
13pub trait Merge: Sized {
14    fn merge(self, other: Self) -> anyhow::Result<Self>;
15}
16
17#[DefaultConfig]
18#[derive(Clone, Default, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct IndexerConfig {
21    /// How checkpoints are read by the indexer.
22    pub ingestion: IngestionLayer,
23
24    /// Default configuration for committers that is shared by all pipelines. Pipelines can
25    /// override individual settings in their own configuration sections.
26    pub committer: CommitterLayer,
27
28    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
29    /// override individual settings in their own configuration sections. Concurrent pipelines
30    /// still need to specify a pruner configuration (although it can be empty) to indicate that
31    /// they want to enable pruning, but when they do, any missing values will be filled in by this
32    /// config.
33    pub pruner: PrunerLayer,
34
35    /// Per-pipeline configurations.
36    pub pipeline: PipelineLayer,
37}
38
39// Configuration layers apply overrides over a base configuration. When reading configs from a
40// file, we read them into layer types, and then apply those layers onto an existing configuration
41// (such as the default configuration) to `finish()` them.
42//
43// Treating configs as layers allows us to support configuration merging, where multiple
44// configuration files can be combined into one final configuration. Having a separate type for
45// reading configs also allows us to detect and warn against unrecognised fields.
46
47#[DefaultConfig]
48#[derive(Clone, Default, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct IngestionLayer {
51    pub checkpoint_buffer_size: Option<usize>,
52    pub ingest_concurrency: Option<ConcurrencyConfig>,
53    pub retry_interval_ms: Option<u64>,
54    pub streaming_backoff_initial_batch_size: Option<usize>,
55    pub streaming_backoff_max_batch_size: Option<usize>,
56    pub streaming_connection_timeout_ms: Option<u64>,
57    pub streaming_statement_timeout_ms: Option<u64>,
58}
59
60#[DefaultConfig]
61#[derive(Clone, Default, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct SequentialLayer {
64    pub committer: Option<CommitterLayer>,
65    pub checkpoint_lag: Option<u64>,
66    pub fanout: Option<ConcurrencyConfig>,
67    pub min_eager_rows: Option<usize>,
68    pub max_batch_checkpoints: Option<usize>,
69    pub processor_channel_size: Option<usize>,
70}
71
72#[DefaultConfig]
73#[derive(Clone, Default, Debug)]
74#[serde(deny_unknown_fields)]
75pub struct ConcurrentLayer {
76    pub committer: Option<CommitterLayer>,
77    pub pruner: Option<PrunerLayer>,
78    pub fanout: Option<ConcurrencyConfig>,
79    pub min_eager_rows: Option<usize>,
80    pub max_pending_rows: Option<usize>,
81    pub max_watermark_updates: Option<usize>,
82    pub processor_channel_size: Option<usize>,
83    pub collector_channel_size: Option<usize>,
84    pub committer_channel_size: Option<usize>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct CommitterLayer {
91    pub write_concurrency: Option<usize>,
92    pub collect_interval_ms: Option<u64>,
93    pub watermark_interval_ms: Option<u64>,
94}
95
96#[DefaultConfig]
97#[derive(Clone, Default, Debug)]
98#[serde(deny_unknown_fields)]
99pub struct PrunerLayer {
100    pub interval_ms: Option<u64>,
101    pub delay_ms: Option<u64>,
102    pub retention: Option<u64>,
103    pub max_chunk_size: Option<u64>,
104    pub prune_concurrency: Option<u64>,
105}
106
107#[DefaultConfig]
108#[derive(Clone, Default, Debug)]
109#[serde(rename_all = "snake_case", deny_unknown_fields)]
110pub struct PipelineLayer {
111    // Sequential pipelines
112    pub sum_displays: Option<SequentialLayer>,
113
114    // All concurrent pipelines
115    pub cp_bloom_blocks: Option<ConcurrentLayer>,
116    pub cp_blooms: Option<ConcurrentLayer>,
117    pub cp_sequence_numbers: Option<ConcurrentLayer>,
118    pub ev_emit_mod: Option<ConcurrentLayer>,
119    pub ev_struct_inst: Option<ConcurrentLayer>,
120    pub kv_checkpoints: Option<ConcurrentLayer>,
121    pub kv_epoch_ends: Option<ConcurrentLayer>,
122    pub kv_epoch_starts: Option<ConcurrentLayer>,
123    pub kv_feature_flags: Option<ConcurrentLayer>,
124    pub kv_objects: Option<ConcurrentLayer>,
125    pub kv_packages: Option<ConcurrentLayer>,
126    pub kv_protocol_configs: Option<ConcurrentLayer>,
127    pub kv_transactions: Option<ConcurrentLayer>,
128    pub obj_versions: Option<ConcurrentLayer>,
129    pub tx_affected_addresses: Option<ConcurrentLayer>,
130    pub tx_affected_objects: Option<ConcurrentLayer>,
131    pub tx_balance_changes: Option<ConcurrentLayer>,
132    pub tx_calls: Option<ConcurrentLayer>,
133    pub tx_digests: Option<ConcurrentLayer>,
134    pub tx_kinds: Option<ConcurrentLayer>,
135}
136
137impl IndexerConfig {
138    /// Generate an example configuration, suitable for demonstrating the fields available to
139    /// configure.
140    pub fn example() -> Self {
141        let mut example: Self = Default::default();
142
143        example.ingestion = IngestionConfig::default().into();
144        example.committer = CommitterConfig::default().into();
145        example.pruner = PrunerConfig::default().into();
146        example.pipeline = PipelineLayer::example();
147
148        example
149    }
150
151    /// Generate a configuration suitable for testing. This is the same as the example
152    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
153    /// less time waiting.
154    pub fn for_test() -> Self {
155        Self::example()
156            .merge(IndexerConfig {
157                ingestion: IngestionLayer {
158                    retry_interval_ms: Some(10),
159                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
160                    ..Default::default()
161                },
162                committer: CommitterLayer {
163                    collect_interval_ms: Some(50),
164                    watermark_interval_ms: Some(50),
165                    write_concurrency: Some(1),
166                },
167                pruner: PrunerLayer {
168                    interval_ms: Some(50),
169                    delay_ms: Some(0),
170                    ..Default::default()
171                },
172                ..Default::default()
173            })
174            .expect("Merge failed for test configuration")
175    }
176}
177
178impl IngestionLayer {
179    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
180        Ok(IngestionConfig {
181            checkpoint_buffer_size: self
182                .checkpoint_buffer_size
183                .unwrap_or(base.checkpoint_buffer_size),
184            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
185            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
186            streaming_backoff_initial_batch_size: self
187                .streaming_backoff_initial_batch_size
188                .unwrap_or(base.streaming_backoff_initial_batch_size),
189            streaming_backoff_max_batch_size: self
190                .streaming_backoff_max_batch_size
191                .unwrap_or(base.streaming_backoff_max_batch_size),
192            streaming_connection_timeout_ms: self
193                .streaming_connection_timeout_ms
194                .unwrap_or(base.streaming_connection_timeout_ms),
195            streaming_statement_timeout_ms: self
196                .streaming_statement_timeout_ms
197                .unwrap_or(base.streaming_statement_timeout_ms),
198        })
199    }
200}
201
202impl SequentialLayer {
203    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
204        Ok(SequentialConfig {
205            committer: if let Some(committer) = self.committer {
206                committer.finish(base.committer)?
207            } else {
208                base.committer
209            },
210            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
211            fanout: self.fanout.or(base.fanout),
212            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
213            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
214            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
215        })
216    }
217}
218
219impl ConcurrentLayer {
220    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
221    /// appear in the layer *and* in the base.
222    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
223        Ok(ConcurrentConfig {
224            committer: if let Some(committer) = self.committer {
225                committer.finish(base.committer)?
226            } else {
227                base.committer
228            },
229            pruner: match (self.pruner, base.pruner) {
230                (None, _) | (_, None) => None,
231                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
232            },
233            fanout: self.fanout.or(base.fanout),
234            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
235            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
236            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
237            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
238            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
239            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
240        })
241    }
242}
243
244impl CommitterLayer {
245    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
246        Ok(CommitterConfig {
247            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
248            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
249            watermark_interval_ms: self
250                .watermark_interval_ms
251                .unwrap_or(base.watermark_interval_ms),
252            watermark_interval_jitter_ms: 0,
253        })
254    }
255}
256
257impl PrunerLayer {
258    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
259        Ok(PrunerConfig {
260            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
261            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
262            retention: self.retention.unwrap_or(base.retention),
263            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
264            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
265        })
266    }
267}
268
269impl PipelineLayer {
270    /// Generate an example configuration, suitable for demonstrating the fields available to
271    /// configure.
272    pub fn example() -> Self {
273        PipelineLayer {
274            cp_blooms: Some(Default::default()),
275            cp_bloom_blocks: Some(Default::default()),
276            sum_displays: Some(Default::default()),
277            cp_sequence_numbers: Some(Default::default()),
278            ev_emit_mod: Some(Default::default()),
279            ev_struct_inst: Some(Default::default()),
280            kv_checkpoints: Some(Default::default()),
281            kv_epoch_ends: Some(Default::default()),
282            kv_epoch_starts: Some(Default::default()),
283            kv_feature_flags: Some(Default::default()),
284            kv_objects: Some(Default::default()),
285            kv_packages: Some(Default::default()),
286            kv_protocol_configs: Some(Default::default()),
287            kv_transactions: Some(Default::default()),
288            obj_versions: Some(Default::default()),
289            tx_affected_addresses: Some(Default::default()),
290            tx_affected_objects: Some(Default::default()),
291            tx_balance_changes: Some(Default::default()),
292            tx_calls: Some(Default::default()),
293            tx_digests: Some(Default::default()),
294            tx_kinds: Some(Default::default()),
295        }
296    }
297}
298
299impl Merge for IndexerConfig {
300    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
301        Ok(IndexerConfig {
302            ingestion: self.ingestion.merge(other.ingestion)?,
303            committer: self.committer.merge(other.committer)?,
304            pruner: self.pruner.merge(other.pruner)?,
305            pipeline: self.pipeline.merge(other.pipeline)?,
306        })
307    }
308}
309
310impl Merge for IngestionLayer {
311    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
312        Ok(IngestionLayer {
313            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
314            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
315            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
316            streaming_backoff_initial_batch_size: other
317                .streaming_backoff_initial_batch_size
318                .or(self.streaming_backoff_initial_batch_size),
319            streaming_backoff_max_batch_size: other
320                .streaming_backoff_max_batch_size
321                .or(self.streaming_backoff_max_batch_size),
322            streaming_connection_timeout_ms: other
323                .streaming_connection_timeout_ms
324                .or(self.streaming_connection_timeout_ms),
325            streaming_statement_timeout_ms: other
326                .streaming_statement_timeout_ms
327                .or(self.streaming_statement_timeout_ms),
328        })
329    }
330}
331
332impl Merge for SequentialLayer {
333    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
334        Ok(SequentialLayer {
335            committer: self.committer.merge(other.committer)?,
336            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
337            fanout: other.fanout.or(self.fanout),
338            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
339            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
340            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
341        })
342    }
343}
344
345impl Merge for ConcurrentLayer {
346    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
347        Ok(ConcurrentLayer {
348            committer: self.committer.merge(other.committer)?,
349            pruner: self.pruner.merge(other.pruner)?,
350            fanout: other.fanout.or(self.fanout),
351            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
352            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
353            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
354            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
355            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
356            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
357        })
358    }
359}
360
361impl Merge for CommitterLayer {
362    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
363        Ok(CommitterLayer {
364            write_concurrency: other.write_concurrency.or(self.write_concurrency),
365            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
366            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
367        })
368    }
369}
370
371impl Merge for PrunerLayer {
372    /// Last write takes precedence for all fields except the `retention`, which takes the max of
373    /// all available values.
374    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
375        Ok(PrunerLayer {
376            interval_ms: other.interval_ms.or(self.interval_ms),
377            delay_ms: other.delay_ms.or(self.delay_ms),
378            retention: match (other.retention, self.retention) {
379                (Some(a), Some(b)) => Some(a.max(b)),
380                (Some(a), _) | (_, Some(a)) => Some(a),
381                (None, None) => None,
382            },
383            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
384            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
385        })
386    }
387}
388
389impl Merge for PipelineLayer {
390    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
391        Ok(PipelineLayer {
392            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
393            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
394            sum_displays: self.sum_displays.merge(other.sum_displays)?,
395            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
396            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
397            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
398            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
399            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
400            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
401            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
402            kv_objects: self.kv_objects.merge(other.kv_objects)?,
403            kv_packages: self.kv_packages.merge(other.kv_packages)?,
404            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
405            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
406            obj_versions: self.obj_versions.merge(other.obj_versions)?,
407            tx_affected_addresses: self
408                .tx_affected_addresses
409                .merge(other.tx_affected_addresses)?,
410            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
411            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
412            tx_calls: self.tx_calls.merge(other.tx_calls)?,
413            tx_digests: self.tx_digests.merge(other.tx_digests)?,
414            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
415        })
416    }
417}
418
419impl<T: Merge> Merge for Option<T> {
420    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
421        Ok(match (self, other) {
422            (Some(a), Some(b)) => Some(a.merge(b)?),
423            (Some(a), _) | (_, Some(a)) => Some(a),
424            (None, None) => None,
425        })
426    }
427}
428
429impl From<IngestionConfig> for IngestionLayer {
430    fn from(config: IngestionConfig) -> Self {
431        Self {
432            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
433            ingest_concurrency: Some(config.ingest_concurrency),
434            retry_interval_ms: Some(config.retry_interval_ms),
435            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
436            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
437            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
438            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
439        }
440    }
441}
442
443impl From<SequentialConfig> for SequentialLayer {
444    fn from(config: SequentialConfig) -> Self {
445        Self {
446            committer: Some(config.committer.into()),
447            checkpoint_lag: Some(config.checkpoint_lag),
448            fanout: config.fanout,
449            min_eager_rows: config.min_eager_rows,
450            max_batch_checkpoints: config.max_batch_checkpoints,
451            processor_channel_size: config.processor_channel_size,
452        }
453    }
454}
455
456impl From<ConcurrentConfig> for ConcurrentLayer {
457    fn from(config: ConcurrentConfig) -> Self {
458        Self {
459            committer: Some(config.committer.into()),
460            pruner: config.pruner.map(Into::into),
461            fanout: config.fanout,
462            min_eager_rows: config.min_eager_rows,
463            max_pending_rows: config.max_pending_rows,
464            max_watermark_updates: config.max_watermark_updates,
465            processor_channel_size: config.processor_channel_size,
466            collector_channel_size: config.collector_channel_size,
467            committer_channel_size: config.committer_channel_size,
468        }
469    }
470}
471
472impl From<CommitterConfig> for CommitterLayer {
473    fn from(config: CommitterConfig) -> Self {
474        Self {
475            write_concurrency: Some(config.write_concurrency),
476            collect_interval_ms: Some(config.collect_interval_ms),
477            watermark_interval_ms: Some(config.watermark_interval_ms),
478        }
479    }
480}
481
482impl From<PrunerConfig> for PrunerLayer {
483    fn from(config: PrunerConfig) -> Self {
484        Self {
485            interval_ms: Some(config.interval_ms),
486            delay_ms: Some(config.delay_ms),
487            retention: Some(config.retention),
488            max_chunk_size: Some(config.max_chunk_size),
489            prune_concurrency: Some(config.prune_concurrency),
490        }
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    macro_rules! assert_matches {
499        ($value:expr, $pattern:pat $(,)?) => {
500            let value = $value;
501            assert!(
502                matches!(value, $pattern),
503                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
504                stringify!($pattern)
505            );
506        };
507    }
508
509    #[test]
510    fn merge_recursive() {
511        let this = PipelineLayer {
512            sum_displays: Some(SequentialLayer {
513                committer: Some(CommitterLayer {
514                    write_concurrency: Some(10),
515                    collect_interval_ms: Some(1000),
516                    watermark_interval_ms: None,
517                }),
518                checkpoint_lag: Some(100),
519                ..Default::default()
520            }),
521            ev_emit_mod: Some(ConcurrentLayer {
522                committer: Some(CommitterLayer {
523                    write_concurrency: Some(5),
524                    collect_interval_ms: Some(500),
525                    watermark_interval_ms: None,
526                }),
527                ..Default::default()
528            }),
529            ..Default::default()
530        };
531
532        let that = PipelineLayer {
533            sum_displays: Some(SequentialLayer {
534                committer: Some(CommitterLayer {
535                    write_concurrency: Some(5),
536                    collect_interval_ms: None,
537                    watermark_interval_ms: Some(500),
538                }),
539                checkpoint_lag: Some(200),
540                ..Default::default()
541            }),
542            ev_emit_mod: None,
543            ..Default::default()
544        };
545
546        let this_then_that = this.clone().merge(that.clone()).unwrap();
547        let that_then_this = that.clone().merge(this.clone()).unwrap();
548
549        assert_matches!(
550            this_then_that,
551            PipelineLayer {
552                sum_displays: Some(SequentialLayer {
553                    committer: Some(CommitterLayer {
554                        write_concurrency: Some(5),
555                        collect_interval_ms: Some(1000),
556                        watermark_interval_ms: Some(500),
557                        ..
558                    }),
559                    checkpoint_lag: Some(200),
560                    ..
561                }),
562                ev_emit_mod: Some(ConcurrentLayer {
563                    committer: Some(CommitterLayer {
564                        write_concurrency: Some(5),
565                        collect_interval_ms: Some(500),
566                        watermark_interval_ms: None,
567                        ..
568                    }),
569                    pruner: None,
570                    ..
571                }),
572                ..
573            },
574        );
575
576        assert_matches!(
577            that_then_this,
578            PipelineLayer {
579                sum_displays: Some(SequentialLayer {
580                    committer: Some(CommitterLayer {
581                        write_concurrency: Some(10),
582                        collect_interval_ms: Some(1000),
583                        watermark_interval_ms: Some(500),
584                        ..
585                    }),
586                    checkpoint_lag: Some(100),
587                    ..
588                }),
589                ev_emit_mod: Some(ConcurrentLayer {
590                    committer: Some(CommitterLayer {
591                        write_concurrency: Some(5),
592                        collect_interval_ms: Some(500),
593                        watermark_interval_ms: None,
594                        ..
595                    }),
596                    pruner: None,
597                    ..
598                }),
599                ..
600            },
601        );
602    }
603
604    #[test]
605    fn merge_pruner() {
606        let this = PrunerLayer {
607            interval_ms: None,
608            delay_ms: Some(100),
609            retention: Some(200),
610            max_chunk_size: Some(300),
611            prune_concurrency: Some(1),
612        };
613
614        let that = PrunerLayer {
615            interval_ms: Some(400),
616            delay_ms: None,
617            retention: Some(500),
618            max_chunk_size: Some(600),
619            prune_concurrency: Some(2),
620        };
621
622        let this_then_that = this.clone().merge(that.clone()).unwrap();
623        let that_then_this = that.clone().merge(this.clone()).unwrap();
624
625        assert_matches!(
626            this_then_that,
627            PrunerLayer {
628                interval_ms: Some(400),
629                delay_ms: Some(100),
630                retention: Some(500),
631                max_chunk_size: Some(600),
632                prune_concurrency: Some(2),
633            },
634        );
635
636        assert_matches!(
637            that_then_this,
638            PrunerLayer {
639                interval_ms: Some(400),
640                delay_ms: Some(100),
641                retention: Some(500),
642                max_chunk_size: Some(300),
643                prune_concurrency: Some(1),
644            },
645        );
646    }
647
648    #[test]
649    fn finish_concurrent_unpruned_override() {
650        let layer = ConcurrentLayer {
651            committer: None,
652            pruner: None,
653            ..Default::default()
654        };
655
656        let base = ConcurrentConfig {
657            committer: CommitterConfig {
658                write_concurrency: 5,
659                collect_interval_ms: 50,
660                watermark_interval_ms: 500,
661                ..Default::default()
662            },
663            pruner: Some(PrunerConfig::default()),
664            ..Default::default()
665        };
666
667        assert_matches!(
668            layer.finish(base).unwrap(),
669            ConcurrentConfig {
670                committer: CommitterConfig {
671                    write_concurrency: 5,
672                    collect_interval_ms: 50,
673                    watermark_interval_ms: 500,
674                    ..
675                },
676                pruner: None,
677                ..
678            },
679        );
680    }
681
682    #[test]
683    fn finish_concurrent_no_pruner() {
684        let layer = ConcurrentLayer {
685            committer: None,
686            pruner: None,
687            ..Default::default()
688        };
689
690        let base = ConcurrentConfig {
691            committer: CommitterConfig {
692                write_concurrency: 5,
693                collect_interval_ms: 50,
694                watermark_interval_ms: 500,
695                ..Default::default()
696            },
697            pruner: None,
698            ..Default::default()
699        };
700
701        assert_matches!(
702            layer.finish(base).unwrap(),
703            ConcurrentConfig {
704                committer: CommitterConfig {
705                    write_concurrency: 5,
706                    collect_interval_ms: 50,
707                    watermark_interval_ms: 500,
708                    ..
709                },
710                pruner: None,
711                ..
712            },
713        );
714    }
715
716    #[test]
717    fn finish_concurrent_pruner() {
718        let layer = ConcurrentLayer {
719            committer: None,
720            pruner: Some(PrunerLayer {
721                interval_ms: Some(1000),
722                ..Default::default()
723            }),
724            ..Default::default()
725        };
726
727        let base = ConcurrentConfig {
728            committer: CommitterConfig {
729                write_concurrency: 5,
730                collect_interval_ms: 50,
731                watermark_interval_ms: 500,
732                ..Default::default()
733            },
734            pruner: Some(PrunerConfig {
735                interval_ms: 100,
736                delay_ms: 200,
737                retention: 300,
738                max_chunk_size: 400,
739                prune_concurrency: 1,
740            }),
741            ..Default::default()
742        };
743
744        assert_matches!(
745            layer.finish(base).unwrap(),
746            ConcurrentConfig {
747                committer: CommitterConfig {
748                    write_concurrency: 5,
749                    collect_interval_ms: 50,
750                    watermark_interval_ms: 500,
751                    ..
752                },
753                pruner: Some(PrunerConfig {
754                    interval_ms: 1000,
755                    delay_ms: 200,
756                    retention: 300,
757                    max_chunk_size: 400,
758                    prune_concurrency: 1,
759                }),
760                ..
761            },
762        );
763    }
764
765    #[test]
766    fn detect_unrecognized_fields() {
767        let err = toml::from_str::<IndexerConfig>(
768            r#"
769            i_dont_exist = "foo"
770            "#,
771        )
772        .unwrap_err();
773
774        assert!(
775            err.to_string().contains("i_dont_exist"),
776            "Unexpected error: {err}"
777        );
778    }
779}