sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11/// Trait for merging configuration structs together.
12pub trait Merge: Sized {
13    fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20    /// How checkpoints are read by the indexer.
21    pub ingestion: IngestionLayer,
22
23    /// Default configuration for committers that is shared by all pipelines. Pipelines can
24    /// override individual settings in their own configuration sections.
25    pub committer: CommitterLayer,
26
27    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
28    /// override individual settings in their own configuration sections. Concurrent pipelines
29    /// still need to specify a pruner configuration (although it can be empty) to indicate that
30    /// they want to enable pruning, but when they do, any missing values will be filled in by this
31    /// config.
32    pub pruner: PrunerLayer,
33
34    /// Per-pipeline configurations.
35    pub pipeline: PipelineLayer,
36}
37
38// Configuration layers apply overrides over a base configuration. When reading configs from a
39// file, we read them into layer types, and then apply those layers onto an existing configuration
40// (such as the default configuration) to `finish()` them.
41//
42// Treating configs as layers allows us to support configuration merging, where multiple
43// configuration files can be combined into one final configuration. Having a separate type for
44// reading configs also allows us to detect and warn against unrecognised fields.
45
46#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50    pub checkpoint_buffer_size: Option<usize>,
51    pub ingest_concurrency: Option<usize>,
52    pub retry_interval_ms: Option<u64>,
53    pub streaming_backoff_initial_batch_size: Option<usize>,
54    pub streaming_backoff_max_batch_size: Option<usize>,
55    pub streaming_connection_timeout_ms: Option<u64>,
56    pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63    pub committer: Option<CommitterLayer>,
64    pub checkpoint_lag: Option<u64>,
65    pub fanout: Option<usize>,
66    pub min_eager_rows: Option<usize>,
67    pub max_batch_checkpoints: Option<usize>,
68}
69
70#[DefaultConfig]
71#[derive(Clone, Default, Debug)]
72#[serde(deny_unknown_fields)]
73pub struct ConcurrentLayer {
74    pub committer: Option<CommitterLayer>,
75    pub pruner: Option<PrunerLayer>,
76    pub fanout: Option<usize>,
77    pub min_eager_rows: Option<usize>,
78    pub max_pending_rows: Option<usize>,
79    pub max_watermark_updates: Option<usize>,
80}
81
82#[DefaultConfig]
83#[derive(Clone, Default, Debug)]
84#[serde(deny_unknown_fields)]
85pub struct CommitterLayer {
86    pub write_concurrency: Option<usize>,
87    pub collect_interval_ms: Option<u64>,
88    pub watermark_interval_ms: Option<u64>,
89}
90
91#[DefaultConfig]
92#[derive(Clone, Default, Debug)]
93#[serde(deny_unknown_fields)]
94pub struct PrunerLayer {
95    pub interval_ms: Option<u64>,
96    pub delay_ms: Option<u64>,
97    pub retention: Option<u64>,
98    pub max_chunk_size: Option<u64>,
99    pub prune_concurrency: Option<u64>,
100}
101
102#[DefaultConfig]
103#[derive(Clone, Default, Debug)]
104#[serde(rename_all = "snake_case", deny_unknown_fields)]
105pub struct PipelineLayer {
106    // Sequential pipelines
107    pub sum_displays: Option<SequentialLayer>,
108
109    // All concurrent pipelines
110    pub cp_bloom_blocks: Option<ConcurrentLayer>,
111    pub cp_blooms: Option<ConcurrentLayer>,
112    pub coin_balance_buckets: Option<ConcurrentLayer>,
113    pub obj_info: Option<ConcurrentLayer>,
114    pub cp_sequence_numbers: Option<ConcurrentLayer>,
115    pub ev_emit_mod: Option<ConcurrentLayer>,
116    pub ev_struct_inst: Option<ConcurrentLayer>,
117    pub kv_checkpoints: Option<ConcurrentLayer>,
118    pub kv_epoch_ends: Option<ConcurrentLayer>,
119    pub kv_epoch_starts: Option<ConcurrentLayer>,
120    pub kv_feature_flags: Option<ConcurrentLayer>,
121    pub kv_objects: Option<ConcurrentLayer>,
122    pub kv_packages: Option<ConcurrentLayer>,
123    pub kv_protocol_configs: Option<ConcurrentLayer>,
124    pub kv_transactions: Option<ConcurrentLayer>,
125    pub obj_versions: Option<ConcurrentLayer>,
126    pub tx_affected_addresses: Option<ConcurrentLayer>,
127    pub tx_affected_objects: Option<ConcurrentLayer>,
128    pub tx_balance_changes: Option<ConcurrentLayer>,
129    pub tx_calls: Option<ConcurrentLayer>,
130    pub tx_digests: Option<ConcurrentLayer>,
131    pub tx_kinds: Option<ConcurrentLayer>,
132}
133
134impl IndexerConfig {
135    /// Generate an example configuration, suitable for demonstrating the fields available to
136    /// configure.
137    pub fn example() -> Self {
138        let mut example: Self = Default::default();
139
140        example.ingestion = IngestionConfig::default().into();
141        example.committer = CommitterConfig::default().into();
142        example.pruner = PrunerConfig::default().into();
143        example.pipeline = PipelineLayer::example();
144
145        example
146    }
147
148    /// Generate a configuration suitable for testing. This is the same as the example
149    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
150    /// less time waiting.
151    pub fn for_test() -> Self {
152        Self::example()
153            .merge(IndexerConfig {
154                ingestion: IngestionLayer {
155                    retry_interval_ms: Some(10),
156                    ingest_concurrency: Some(1),
157                    ..Default::default()
158                },
159                committer: CommitterLayer {
160                    collect_interval_ms: Some(50),
161                    watermark_interval_ms: Some(50),
162                    write_concurrency: Some(1),
163                },
164                pruner: PrunerLayer {
165                    interval_ms: Some(50),
166                    delay_ms: Some(0),
167                    ..Default::default()
168                },
169                ..Default::default()
170            })
171            .expect("Merge failed for test configuration")
172    }
173}
174
175impl IngestionLayer {
176    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
177        Ok(IngestionConfig {
178            checkpoint_buffer_size: self
179                .checkpoint_buffer_size
180                .unwrap_or(base.checkpoint_buffer_size),
181            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
182            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
183            streaming_backoff_initial_batch_size: self
184                .streaming_backoff_initial_batch_size
185                .unwrap_or(base.streaming_backoff_initial_batch_size),
186            streaming_backoff_max_batch_size: self
187                .streaming_backoff_max_batch_size
188                .unwrap_or(base.streaming_backoff_max_batch_size),
189            streaming_connection_timeout_ms: self
190                .streaming_connection_timeout_ms
191                .unwrap_or(base.streaming_connection_timeout_ms),
192            streaming_statement_timeout_ms: self
193                .streaming_statement_timeout_ms
194                .unwrap_or(base.streaming_statement_timeout_ms),
195        })
196    }
197}
198
199impl SequentialLayer {
200    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
201        Ok(SequentialConfig {
202            committer: if let Some(committer) = self.committer {
203                committer.finish(base.committer)?
204            } else {
205                base.committer
206            },
207            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
208            fanout: self.fanout.or(base.fanout),
209            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
210            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
211        })
212    }
213}
214
215impl ConcurrentLayer {
216    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
217    /// appear in the layer *and* in the base.
218    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
219        Ok(ConcurrentConfig {
220            committer: if let Some(committer) = self.committer {
221                committer.finish(base.committer)?
222            } else {
223                base.committer
224            },
225            pruner: match (self.pruner, base.pruner) {
226                (None, _) | (_, None) => None,
227                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
228            },
229            fanout: self.fanout.or(base.fanout),
230            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
231            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
232            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
233        })
234    }
235}
236
237impl CommitterLayer {
238    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
239        Ok(CommitterConfig {
240            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
241            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
242            watermark_interval_ms: self
243                .watermark_interval_ms
244                .unwrap_or(base.watermark_interval_ms),
245            watermark_interval_jitter_ms: 0,
246        })
247    }
248}
249
250impl PrunerLayer {
251    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
252        Ok(PrunerConfig {
253            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
254            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
255            retention: self.retention.unwrap_or(base.retention),
256            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
257            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
258        })
259    }
260}
261
262impl PipelineLayer {
263    /// Generate an example configuration, suitable for demonstrating the fields available to
264    /// configure.
265    pub fn example() -> Self {
266        PipelineLayer {
267            cp_blooms: Some(Default::default()),
268            cp_bloom_blocks: Some(Default::default()),
269            coin_balance_buckets: Some(Default::default()),
270            obj_info: Some(Default::default()),
271            sum_displays: Some(Default::default()),
272            cp_sequence_numbers: Some(Default::default()),
273            ev_emit_mod: Some(Default::default()),
274            ev_struct_inst: Some(Default::default()),
275            kv_checkpoints: Some(Default::default()),
276            kv_epoch_ends: Some(Default::default()),
277            kv_epoch_starts: Some(Default::default()),
278            kv_feature_flags: Some(Default::default()),
279            kv_objects: Some(Default::default()),
280            kv_packages: Some(Default::default()),
281            kv_protocol_configs: Some(Default::default()),
282            kv_transactions: Some(Default::default()),
283            obj_versions: Some(Default::default()),
284            tx_affected_addresses: Some(Default::default()),
285            tx_affected_objects: Some(Default::default()),
286            tx_balance_changes: Some(Default::default()),
287            tx_calls: Some(Default::default()),
288            tx_digests: Some(Default::default()),
289            tx_kinds: Some(Default::default()),
290        }
291    }
292}
293
294impl Merge for IndexerConfig {
295    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
296        Ok(IndexerConfig {
297            ingestion: self.ingestion.merge(other.ingestion)?,
298            committer: self.committer.merge(other.committer)?,
299            pruner: self.pruner.merge(other.pruner)?,
300            pipeline: self.pipeline.merge(other.pipeline)?,
301        })
302    }
303}
304
305impl Merge for IngestionLayer {
306    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
307        Ok(IngestionLayer {
308            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
309            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
310            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
311            streaming_backoff_initial_batch_size: other
312                .streaming_backoff_initial_batch_size
313                .or(self.streaming_backoff_initial_batch_size),
314            streaming_backoff_max_batch_size: other
315                .streaming_backoff_max_batch_size
316                .or(self.streaming_backoff_max_batch_size),
317            streaming_connection_timeout_ms: other
318                .streaming_connection_timeout_ms
319                .or(self.streaming_connection_timeout_ms),
320            streaming_statement_timeout_ms: other
321                .streaming_statement_timeout_ms
322                .or(self.streaming_statement_timeout_ms),
323        })
324    }
325}
326
327impl Merge for SequentialLayer {
328    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
329        Ok(SequentialLayer {
330            committer: self.committer.merge(other.committer)?,
331            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
332            fanout: other.fanout.or(self.fanout),
333            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
334            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
335        })
336    }
337}
338
339impl Merge for ConcurrentLayer {
340    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
341        Ok(ConcurrentLayer {
342            committer: self.committer.merge(other.committer)?,
343            pruner: self.pruner.merge(other.pruner)?,
344            fanout: other.fanout.or(self.fanout),
345            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
346            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
347            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
348        })
349    }
350}
351
352impl Merge for CommitterLayer {
353    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
354        Ok(CommitterLayer {
355            write_concurrency: other.write_concurrency.or(self.write_concurrency),
356            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
357            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
358        })
359    }
360}
361
362impl Merge for PrunerLayer {
363    /// Last write takes precedence for all fields except the `retention`, which takes the max of
364    /// all available values.
365    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
366        Ok(PrunerLayer {
367            interval_ms: other.interval_ms.or(self.interval_ms),
368            delay_ms: other.delay_ms.or(self.delay_ms),
369            retention: match (other.retention, self.retention) {
370                (Some(a), Some(b)) => Some(a.max(b)),
371                (Some(a), _) | (_, Some(a)) => Some(a),
372                (None, None) => None,
373            },
374            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
375            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
376        })
377    }
378}
379
380impl Merge for PipelineLayer {
381    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
382        Ok(PipelineLayer {
383            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
384            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
385            coin_balance_buckets: self
386                .coin_balance_buckets
387                .merge(other.coin_balance_buckets)?,
388            obj_info: self.obj_info.merge(other.obj_info)?,
389            sum_displays: self.sum_displays.merge(other.sum_displays)?,
390            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
391            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
392            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
393            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
394            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
395            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
396            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
397            kv_objects: self.kv_objects.merge(other.kv_objects)?,
398            kv_packages: self.kv_packages.merge(other.kv_packages)?,
399            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
400            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
401            obj_versions: self.obj_versions.merge(other.obj_versions)?,
402            tx_affected_addresses: self
403                .tx_affected_addresses
404                .merge(other.tx_affected_addresses)?,
405            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
406            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
407            tx_calls: self.tx_calls.merge(other.tx_calls)?,
408            tx_digests: self.tx_digests.merge(other.tx_digests)?,
409            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
410        })
411    }
412}
413
414impl<T: Merge> Merge for Option<T> {
415    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
416        Ok(match (self, other) {
417            (Some(a), Some(b)) => Some(a.merge(b)?),
418            (Some(a), _) | (_, Some(a)) => Some(a),
419            (None, None) => None,
420        })
421    }
422}
423
424impl From<IngestionConfig> for IngestionLayer {
425    fn from(config: IngestionConfig) -> Self {
426        Self {
427            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
428            ingest_concurrency: Some(config.ingest_concurrency),
429            retry_interval_ms: Some(config.retry_interval_ms),
430            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
431            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
432            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
433            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
434        }
435    }
436}
437
438impl From<SequentialConfig> for SequentialLayer {
439    fn from(config: SequentialConfig) -> Self {
440        Self {
441            committer: Some(config.committer.into()),
442            checkpoint_lag: Some(config.checkpoint_lag),
443            fanout: config.fanout,
444            min_eager_rows: config.min_eager_rows,
445            max_batch_checkpoints: config.max_batch_checkpoints,
446        }
447    }
448}
449
450impl From<ConcurrentConfig> for ConcurrentLayer {
451    fn from(config: ConcurrentConfig) -> Self {
452        Self {
453            committer: Some(config.committer.into()),
454            pruner: config.pruner.map(Into::into),
455            fanout: config.fanout,
456            min_eager_rows: config.min_eager_rows,
457            max_pending_rows: config.max_pending_rows,
458            max_watermark_updates: config.max_watermark_updates,
459        }
460    }
461}
462
463impl From<CommitterConfig> for CommitterLayer {
464    fn from(config: CommitterConfig) -> Self {
465        Self {
466            write_concurrency: Some(config.write_concurrency),
467            collect_interval_ms: Some(config.collect_interval_ms),
468            watermark_interval_ms: Some(config.watermark_interval_ms),
469        }
470    }
471}
472
473impl From<PrunerConfig> for PrunerLayer {
474    fn from(config: PrunerConfig) -> Self {
475        Self {
476            interval_ms: Some(config.interval_ms),
477            delay_ms: Some(config.delay_ms),
478            retention: Some(config.retention),
479            max_chunk_size: Some(config.max_chunk_size),
480            prune_concurrency: Some(config.prune_concurrency),
481        }
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488
489    macro_rules! assert_matches {
490        ($value:expr, $pattern:pat $(,)?) => {
491            let value = $value;
492            assert!(
493                matches!(value, $pattern),
494                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
495                stringify!($pattern)
496            );
497        };
498    }
499
500    #[test]
501    fn merge_recursive() {
502        let this = PipelineLayer {
503            sum_displays: Some(SequentialLayer {
504                committer: Some(CommitterLayer {
505                    write_concurrency: Some(10),
506                    collect_interval_ms: Some(1000),
507                    watermark_interval_ms: None,
508                }),
509                checkpoint_lag: Some(100),
510                ..Default::default()
511            }),
512            ev_emit_mod: Some(ConcurrentLayer {
513                committer: Some(CommitterLayer {
514                    write_concurrency: Some(5),
515                    collect_interval_ms: Some(500),
516                    watermark_interval_ms: None,
517                }),
518                ..Default::default()
519            }),
520            ..Default::default()
521        };
522
523        let that = PipelineLayer {
524            sum_displays: Some(SequentialLayer {
525                committer: Some(CommitterLayer {
526                    write_concurrency: Some(5),
527                    collect_interval_ms: None,
528                    watermark_interval_ms: Some(500),
529                }),
530                checkpoint_lag: Some(200),
531                ..Default::default()
532            }),
533            ev_emit_mod: None,
534            ..Default::default()
535        };
536
537        let this_then_that = this.clone().merge(that.clone()).unwrap();
538        let that_then_this = that.clone().merge(this.clone()).unwrap();
539
540        assert_matches!(
541            this_then_that,
542            PipelineLayer {
543                sum_displays: Some(SequentialLayer {
544                    committer: Some(CommitterLayer {
545                        write_concurrency: Some(5),
546                        collect_interval_ms: Some(1000),
547                        watermark_interval_ms: Some(500),
548                        ..
549                    }),
550                    checkpoint_lag: Some(200),
551                    ..
552                }),
553                ev_emit_mod: Some(ConcurrentLayer {
554                    committer: Some(CommitterLayer {
555                        write_concurrency: Some(5),
556                        collect_interval_ms: Some(500),
557                        watermark_interval_ms: None,
558                        ..
559                    }),
560                    pruner: None,
561                    ..
562                }),
563                ..
564            },
565        );
566
567        assert_matches!(
568            that_then_this,
569            PipelineLayer {
570                sum_displays: Some(SequentialLayer {
571                    committer: Some(CommitterLayer {
572                        write_concurrency: Some(10),
573                        collect_interval_ms: Some(1000),
574                        watermark_interval_ms: Some(500),
575                        ..
576                    }),
577                    checkpoint_lag: Some(100),
578                    ..
579                }),
580                ev_emit_mod: Some(ConcurrentLayer {
581                    committer: Some(CommitterLayer {
582                        write_concurrency: Some(5),
583                        collect_interval_ms: Some(500),
584                        watermark_interval_ms: None,
585                        ..
586                    }),
587                    pruner: None,
588                    ..
589                }),
590                ..
591            },
592        );
593    }
594
595    #[test]
596    fn merge_pruner() {
597        let this = PrunerLayer {
598            interval_ms: None,
599            delay_ms: Some(100),
600            retention: Some(200),
601            max_chunk_size: Some(300),
602            prune_concurrency: Some(1),
603        };
604
605        let that = PrunerLayer {
606            interval_ms: Some(400),
607            delay_ms: None,
608            retention: Some(500),
609            max_chunk_size: Some(600),
610            prune_concurrency: Some(2),
611        };
612
613        let this_then_that = this.clone().merge(that.clone()).unwrap();
614        let that_then_this = that.clone().merge(this.clone()).unwrap();
615
616        assert_matches!(
617            this_then_that,
618            PrunerLayer {
619                interval_ms: Some(400),
620                delay_ms: Some(100),
621                retention: Some(500),
622                max_chunk_size: Some(600),
623                prune_concurrency: Some(2),
624            },
625        );
626
627        assert_matches!(
628            that_then_this,
629            PrunerLayer {
630                interval_ms: Some(400),
631                delay_ms: Some(100),
632                retention: Some(500),
633                max_chunk_size: Some(300),
634                prune_concurrency: Some(1),
635            },
636        );
637    }
638
639    #[test]
640    fn finish_concurrent_unpruned_override() {
641        let layer = ConcurrentLayer {
642            committer: None,
643            pruner: None,
644            ..Default::default()
645        };
646
647        let base = ConcurrentConfig {
648            committer: CommitterConfig {
649                write_concurrency: 5,
650                collect_interval_ms: 50,
651                watermark_interval_ms: 500,
652                ..Default::default()
653            },
654            pruner: Some(PrunerConfig::default()),
655            ..Default::default()
656        };
657
658        assert_matches!(
659            layer.finish(base).unwrap(),
660            ConcurrentConfig {
661                committer: CommitterConfig {
662                    write_concurrency: 5,
663                    collect_interval_ms: 50,
664                    watermark_interval_ms: 500,
665                    ..
666                },
667                pruner: None,
668                ..
669            },
670        );
671    }
672
673    #[test]
674    fn finish_concurrent_no_pruner() {
675        let layer = ConcurrentLayer {
676            committer: None,
677            pruner: None,
678            ..Default::default()
679        };
680
681        let base = ConcurrentConfig {
682            committer: CommitterConfig {
683                write_concurrency: 5,
684                collect_interval_ms: 50,
685                watermark_interval_ms: 500,
686                ..Default::default()
687            },
688            pruner: None,
689            ..Default::default()
690        };
691
692        assert_matches!(
693            layer.finish(base).unwrap(),
694            ConcurrentConfig {
695                committer: CommitterConfig {
696                    write_concurrency: 5,
697                    collect_interval_ms: 50,
698                    watermark_interval_ms: 500,
699                    ..
700                },
701                pruner: None,
702                ..
703            },
704        );
705    }
706
707    #[test]
708    fn finish_concurrent_pruner() {
709        let layer = ConcurrentLayer {
710            committer: None,
711            pruner: Some(PrunerLayer {
712                interval_ms: Some(1000),
713                ..Default::default()
714            }),
715            ..Default::default()
716        };
717
718        let base = ConcurrentConfig {
719            committer: CommitterConfig {
720                write_concurrency: 5,
721                collect_interval_ms: 50,
722                watermark_interval_ms: 500,
723                ..Default::default()
724            },
725            pruner: Some(PrunerConfig {
726                interval_ms: 100,
727                delay_ms: 200,
728                retention: 300,
729                max_chunk_size: 400,
730                prune_concurrency: 1,
731            }),
732            ..Default::default()
733        };
734
735        assert_matches!(
736            layer.finish(base).unwrap(),
737            ConcurrentConfig {
738                committer: CommitterConfig {
739                    write_concurrency: 5,
740                    collect_interval_ms: 50,
741                    watermark_interval_ms: 500,
742                    ..
743                },
744                pruner: Some(PrunerConfig {
745                    interval_ms: 1000,
746                    delay_ms: 200,
747                    retention: 300,
748                    max_chunk_size: 400,
749                    prune_concurrency: 1,
750                }),
751                ..
752            },
753        );
754    }
755
756    #[test]
757    fn detect_unrecognized_fields() {
758        let err = toml::from_str::<IndexerConfig>(
759            r#"
760            i_dont_exist = "foo"
761            "#,
762        )
763        .unwrap_err();
764
765        assert!(
766            err.to_string().contains("i_dont_exist"),
767            "Unexpected error: {err}"
768        );
769    }
770}