sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::num::NonZeroUsize;
5
6use serde::Deserialize;
7use serde::Serialize;
8use sui_indexer_alt_framework::config::ConcurrencyConfig;
9use sui_indexer_alt_framework::ingestion::IngestionConfig;
10use sui_indexer_alt_framework::pipeline;
11use sui_indexer_alt_framework::pipeline::CommitterConfig;
12use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
13use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
14use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
15use tracing::warn;
16
17/// Trait for merging configuration structs together.
18pub trait Merge: Sized {
19    fn merge(self, other: Self) -> anyhow::Result<Self>;
20}
21
22#[derive(Clone, Default, Debug, Deserialize, Serialize)]
23#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
24pub struct IndexerConfig {
25    /// How checkpoints are read by the indexer.
26    pub ingestion: IngestionLayer,
27
28    /// Default configuration for committers that is shared by all pipelines. Pipelines can
29    /// override individual settings in their own configuration sections.
30    pub committer: CommitterLayer,
31
32    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
33    /// override individual settings in their own configuration sections. Concurrent pipelines
34    /// still need to specify a pruner configuration (although it can be empty) to indicate that
35    /// they want to enable pruning, but when they do, any missing values will be filled in by this
36    /// config.
37    pub pruner: PrunerLayer,
38
39    /// Per-pipeline configurations.
40    pub pipeline: PipelineLayer,
41}
42
43// Configuration layers apply overrides over a base configuration. When reading configs from a
44// file, we read them into layer types, and then apply those layers onto an existing configuration
45// (such as the default configuration) to `finish()` them.
46//
47// Treating configs as layers allows us to support configuration merging, where multiple
48// configuration files can be combined into one final configuration. Having a separate type for
49// reading configs also allows us to detect and warn against unrecognised fields.
50
51#[derive(Clone, Default, Debug, Deserialize, Serialize)]
52#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
53pub struct IngestionLayer {
54    pub ingest_concurrency: Option<ConcurrencyConfig>,
55    pub retry_interval_ms: Option<u64>,
56    pub streaming_backoff_initial_batch_size: Option<NonZeroUsize>,
57    pub streaming_backoff_max_batch_size: Option<usize>,
58    pub streaming_connection_timeout_ms: Option<u64>,
59    pub streaming_statement_timeout_ms: Option<u64>,
60
61    /// Deprecated: accepted (and ignored) so old configs don't fail to parse. Replaced by
62    /// per-pipeline `ingestion.subscriber-channel-size`.
63    pub checkpoint_buffer_size: Option<usize>,
64}
65
66#[derive(Clone, Default, Debug, Deserialize, Serialize)]
67#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
68pub struct SequentialLayer {
69    pub committer: Option<CommitterLayer>,
70    pub ingestion: Option<PipelineIngestionLayer>,
71    pub fanout: Option<ConcurrencyConfig>,
72    pub min_eager_rows: Option<usize>,
73    pub max_pending_rows: Option<usize>,
74    pub max_batch_checkpoints: Option<usize>,
75    pub processor_channel_size: Option<usize>,
76    pub pipeline_depth: Option<usize>,
77}
78
79#[derive(Clone, Default, Debug, Deserialize, Serialize)]
80#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
81pub struct ConcurrentLayer {
82    pub committer: Option<CommitterLayer>,
83    pub ingestion: Option<PipelineIngestionLayer>,
84    pub pruner: Option<PrunerLayer>,
85    pub fanout: Option<ConcurrencyConfig>,
86    pub min_eager_rows: Option<usize>,
87    pub max_pending_rows: Option<usize>,
88    pub max_watermark_updates: Option<usize>,
89    pub processor_channel_size: Option<usize>,
90    pub collector_channel_size: Option<usize>,
91    pub committer_channel_size: Option<usize>,
92}
93
94#[derive(Clone, Default, Debug, Deserialize, Serialize)]
95#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
96pub struct PipelineIngestionLayer {
97    pub subscriber_channel_size: Option<usize>,
98}
99
100#[derive(Clone, Default, Debug, Deserialize, Serialize)]
101#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
102pub struct CommitterLayer {
103    pub write_concurrency: Option<usize>,
104    pub collect_interval_ms: Option<u64>,
105    pub watermark_interval_ms: Option<u64>,
106}
107
108#[derive(Clone, Default, Debug, Deserialize, Serialize)]
109#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
110pub struct PrunerLayer {
111    pub interval_ms: Option<u64>,
112    pub delay_ms: Option<u64>,
113    pub retention: Option<u64>,
114    pub max_chunk_size: Option<u64>,
115    pub prune_concurrency: Option<u64>,
116}
117
118#[derive(Clone, Default, Debug, Deserialize, Serialize)]
119#[serde(default, rename_all = "snake_case", deny_unknown_fields)]
120pub struct PipelineLayer {
121    // Sequential pipelines
122    pub sum_displays: Option<SequentialLayer>,
123
124    // All concurrent pipelines
125    pub cp_bloom_blocks: Option<ConcurrentLayer>,
126    pub cp_blooms: Option<ConcurrentLayer>,
127    pub cp_digests: Option<ConcurrentLayer>,
128    pub cp_sequence_numbers: Option<ConcurrentLayer>,
129    pub ev_emit_mod: Option<ConcurrentLayer>,
130    pub ev_struct_inst: Option<ConcurrentLayer>,
131    pub kv_checkpoints: Option<ConcurrentLayer>,
132    pub kv_epoch_ends: Option<ConcurrentLayer>,
133    pub kv_epoch_starts: Option<ConcurrentLayer>,
134    pub kv_feature_flags: Option<ConcurrentLayer>,
135    pub kv_objects: Option<ConcurrentLayer>,
136    pub kv_packages: Option<ConcurrentLayer>,
137    pub kv_protocol_configs: Option<ConcurrentLayer>,
138    pub kv_transactions: Option<ConcurrentLayer>,
139    pub obj_versions: Option<ConcurrentLayer>,
140    pub tx_affected_addresses: Option<ConcurrentLayer>,
141    pub tx_affected_objects: Option<ConcurrentLayer>,
142    pub tx_balance_changes: Option<ConcurrentLayer>,
143    pub tx_calls: Option<ConcurrentLayer>,
144    pub tx_digests: Option<ConcurrentLayer>,
145    pub tx_kinds: Option<ConcurrentLayer>,
146}
147
148impl IndexerConfig {
149    /// Generate an example configuration, suitable for demonstrating the fields available to
150    /// configure.
151    pub fn example() -> Self {
152        let mut example: Self = Default::default();
153
154        example.ingestion = IngestionConfig::default().into();
155        example.committer = CommitterConfig::default().into();
156        example.pruner = PrunerConfig::default().into();
157        example.pipeline = PipelineLayer::example();
158
159        example
160    }
161
162    /// Generate a configuration suitable for testing. This is the same as the example
163    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
164    /// less time waiting.
165    pub fn for_test() -> Self {
166        Self::example()
167            .merge(IndexerConfig {
168                ingestion: IngestionLayer {
169                    retry_interval_ms: Some(10),
170                    ingest_concurrency: Some(ConcurrencyConfig::Fixed { value: 1 }),
171                    ..Default::default()
172                },
173                committer: CommitterLayer {
174                    collect_interval_ms: Some(50),
175                    watermark_interval_ms: Some(50),
176                    write_concurrency: Some(1),
177                },
178                pruner: PrunerLayer {
179                    interval_ms: Some(50),
180                    delay_ms: Some(0),
181                    ..Default::default()
182                },
183                ..Default::default()
184            })
185            .expect("Merge failed for test configuration")
186    }
187}
188
189impl IngestionLayer {
190    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
191        if self.checkpoint_buffer_size.is_some() {
192            warn!(
193                "Config field `checkpoint-buffer-size` is deprecated and ignored. Remove it from \
194                 your config; set `subscriber-channel-size` under each pipeline's `ingestion` \
195                 section if you need to override the default."
196            );
197        }
198
199        Ok(IngestionConfig {
200            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
201            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
202            streaming_backoff_initial_batch_size: self
203                .streaming_backoff_initial_batch_size
204                .unwrap_or(base.streaming_backoff_initial_batch_size),
205            streaming_backoff_max_batch_size: self
206                .streaming_backoff_max_batch_size
207                .unwrap_or(base.streaming_backoff_max_batch_size),
208            streaming_connection_timeout_ms: self
209                .streaming_connection_timeout_ms
210                .unwrap_or(base.streaming_connection_timeout_ms),
211            streaming_statement_timeout_ms: self
212                .streaming_statement_timeout_ms
213                .unwrap_or(base.streaming_statement_timeout_ms),
214        })
215    }
216}
217
218impl SequentialLayer {
219    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
220        Ok(SequentialConfig {
221            committer: if let Some(committer) = self.committer {
222                committer.finish(base.committer)?
223            } else {
224                base.committer
225            },
226            ingestion: if let Some(ingestion) = self.ingestion {
227                ingestion.finish(base.ingestion)
228            } else {
229                base.ingestion
230            },
231            fanout: self.fanout.or(base.fanout),
232            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
233            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
234            max_batch_checkpoints: self.max_batch_checkpoints.or(base.max_batch_checkpoints),
235            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
236            pipeline_depth: self.pipeline_depth.or(base.pipeline_depth),
237        })
238    }
239}
240
241impl ConcurrentLayer {
242    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
243    /// appear in the layer *and* in the base.
244    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
245        Ok(ConcurrentConfig {
246            committer: if let Some(committer) = self.committer {
247                committer.finish(base.committer)?
248            } else {
249                base.committer
250            },
251            ingestion: if let Some(ingestion) = self.ingestion {
252                ingestion.finish(base.ingestion)
253            } else {
254                base.ingestion
255            },
256            pruner: match (self.pruner, base.pruner) {
257                (None, _) | (_, None) => None,
258                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
259            },
260            fanout: self.fanout.or(base.fanout),
261            min_eager_rows: self.min_eager_rows.or(base.min_eager_rows),
262            max_pending_rows: self.max_pending_rows.or(base.max_pending_rows),
263            max_watermark_updates: self.max_watermark_updates.or(base.max_watermark_updates),
264            processor_channel_size: self.processor_channel_size.or(base.processor_channel_size),
265            collector_channel_size: self.collector_channel_size.or(base.collector_channel_size),
266            committer_channel_size: self.committer_channel_size.or(base.committer_channel_size),
267        })
268    }
269}
270
271impl PipelineIngestionLayer {
272    pub fn finish(self, base: pipeline::IngestionConfig) -> pipeline::IngestionConfig {
273        pipeline::IngestionConfig {
274            subscriber_channel_size: self
275                .subscriber_channel_size
276                .or(base.subscriber_channel_size),
277        }
278    }
279}
280
281impl CommitterLayer {
282    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
283        Ok(CommitterConfig {
284            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
285            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
286            watermark_interval_ms: self
287                .watermark_interval_ms
288                .unwrap_or(base.watermark_interval_ms),
289            watermark_interval_jitter_ms: 0,
290        })
291    }
292}
293
294impl PrunerLayer {
295    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
296        Ok(PrunerConfig {
297            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
298            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
299            retention: self.retention.unwrap_or(base.retention),
300            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
301            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
302        })
303    }
304}
305
306impl PipelineLayer {
307    /// Generate an example configuration, suitable for demonstrating the fields available to
308    /// configure.
309    pub fn example() -> Self {
310        PipelineLayer {
311            cp_blooms: Some(Default::default()),
312            cp_bloom_blocks: Some(Default::default()),
313            cp_digests: Some(Default::default()),
314            sum_displays: Some(Default::default()),
315            cp_sequence_numbers: Some(Default::default()),
316            ev_emit_mod: Some(Default::default()),
317            ev_struct_inst: Some(Default::default()),
318            kv_checkpoints: Some(Default::default()),
319            kv_epoch_ends: Some(Default::default()),
320            kv_epoch_starts: Some(Default::default()),
321            kv_feature_flags: Some(Default::default()),
322            kv_objects: Some(Default::default()),
323            kv_packages: Some(Default::default()),
324            kv_protocol_configs: Some(Default::default()),
325            kv_transactions: Some(Default::default()),
326            obj_versions: Some(Default::default()),
327            tx_affected_addresses: Some(Default::default()),
328            tx_affected_objects: Some(Default::default()),
329            tx_balance_changes: Some(Default::default()),
330            tx_calls: Some(Default::default()),
331            tx_digests: Some(Default::default()),
332            tx_kinds: Some(Default::default()),
333        }
334    }
335}
336
337impl Merge for IndexerConfig {
338    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
339        Ok(IndexerConfig {
340            ingestion: self.ingestion.merge(other.ingestion)?,
341            committer: self.committer.merge(other.committer)?,
342            pruner: self.pruner.merge(other.pruner)?,
343            pipeline: self.pipeline.merge(other.pipeline)?,
344        })
345    }
346}
347
348impl Merge for IngestionLayer {
349    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
350        Ok(IngestionLayer {
351            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
352            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
353            streaming_backoff_initial_batch_size: other
354                .streaming_backoff_initial_batch_size
355                .or(self.streaming_backoff_initial_batch_size),
356            streaming_backoff_max_batch_size: other
357                .streaming_backoff_max_batch_size
358                .or(self.streaming_backoff_max_batch_size),
359            streaming_connection_timeout_ms: other
360                .streaming_connection_timeout_ms
361                .or(self.streaming_connection_timeout_ms),
362            streaming_statement_timeout_ms: other
363                .streaming_statement_timeout_ms
364                .or(self.streaming_statement_timeout_ms),
365            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
366        })
367    }
368}
369
370impl Merge for SequentialLayer {
371    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
372        Ok(SequentialLayer {
373            committer: self.committer.merge(other.committer)?,
374            ingestion: self.ingestion.merge(other.ingestion)?,
375            fanout: other.fanout.or(self.fanout),
376            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
377            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
378            max_batch_checkpoints: other.max_batch_checkpoints.or(self.max_batch_checkpoints),
379            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
380            pipeline_depth: other.pipeline_depth.or(self.pipeline_depth),
381        })
382    }
383}
384
385impl Merge for ConcurrentLayer {
386    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
387        Ok(ConcurrentLayer {
388            committer: self.committer.merge(other.committer)?,
389            ingestion: self.ingestion.merge(other.ingestion)?,
390            pruner: self.pruner.merge(other.pruner)?,
391            fanout: other.fanout.or(self.fanout),
392            min_eager_rows: other.min_eager_rows.or(self.min_eager_rows),
393            max_pending_rows: other.max_pending_rows.or(self.max_pending_rows),
394            max_watermark_updates: other.max_watermark_updates.or(self.max_watermark_updates),
395            processor_channel_size: other.processor_channel_size.or(self.processor_channel_size),
396            collector_channel_size: other.collector_channel_size.or(self.collector_channel_size),
397            committer_channel_size: other.committer_channel_size.or(self.committer_channel_size),
398        })
399    }
400}
401
402impl Merge for PipelineIngestionLayer {
403    fn merge(self, other: PipelineIngestionLayer) -> anyhow::Result<PipelineIngestionLayer> {
404        Ok(PipelineIngestionLayer {
405            subscriber_channel_size: other
406                .subscriber_channel_size
407                .or(self.subscriber_channel_size),
408        })
409    }
410}
411
412impl Merge for CommitterLayer {
413    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
414        Ok(CommitterLayer {
415            write_concurrency: other.write_concurrency.or(self.write_concurrency),
416            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
417            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
418        })
419    }
420}
421
422impl Merge for PrunerLayer {
423    /// Last write takes precedence for all fields except the `retention`, which takes the max of
424    /// all available values.
425    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
426        Ok(PrunerLayer {
427            interval_ms: other.interval_ms.or(self.interval_ms),
428            delay_ms: other.delay_ms.or(self.delay_ms),
429            retention: match (other.retention, self.retention) {
430                (Some(a), Some(b)) => Some(a.max(b)),
431                (Some(a), _) | (_, Some(a)) => Some(a),
432                (None, None) => None,
433            },
434            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
435            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
436        })
437    }
438}
439
440impl Merge for PipelineLayer {
441    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
442        Ok(PipelineLayer {
443            cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
444            cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
445            cp_digests: self.cp_digests.merge(other.cp_digests)?,
446            sum_displays: self.sum_displays.merge(other.sum_displays)?,
447            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
448            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
449            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
450            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
451            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
452            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
453            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
454            kv_objects: self.kv_objects.merge(other.kv_objects)?,
455            kv_packages: self.kv_packages.merge(other.kv_packages)?,
456            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
457            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
458            obj_versions: self.obj_versions.merge(other.obj_versions)?,
459            tx_affected_addresses: self
460                .tx_affected_addresses
461                .merge(other.tx_affected_addresses)?,
462            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
463            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
464            tx_calls: self.tx_calls.merge(other.tx_calls)?,
465            tx_digests: self.tx_digests.merge(other.tx_digests)?,
466            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
467        })
468    }
469}
470
471impl<T: Merge> Merge for Option<T> {
472    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
473        Ok(match (self, other) {
474            (Some(a), Some(b)) => Some(a.merge(b)?),
475            (Some(a), _) | (_, Some(a)) => Some(a),
476            (None, None) => None,
477        })
478    }
479}
480
481impl From<IngestionConfig> for IngestionLayer {
482    fn from(config: IngestionConfig) -> Self {
483        Self {
484            ingest_concurrency: Some(config.ingest_concurrency),
485            retry_interval_ms: Some(config.retry_interval_ms),
486            streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
487            streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
488            streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
489            streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
490            checkpoint_buffer_size: None,
491        }
492    }
493}
494
495impl From<SequentialConfig> for SequentialLayer {
496    fn from(config: SequentialConfig) -> Self {
497        Self {
498            committer: Some(config.committer.into()),
499            ingestion: Some(config.ingestion.into()),
500            fanout: config.fanout,
501            min_eager_rows: config.min_eager_rows,
502            max_pending_rows: config.max_pending_rows,
503            max_batch_checkpoints: config.max_batch_checkpoints,
504            processor_channel_size: config.processor_channel_size,
505            pipeline_depth: config.pipeline_depth,
506        }
507    }
508}
509
510impl From<ConcurrentConfig> for ConcurrentLayer {
511    fn from(config: ConcurrentConfig) -> Self {
512        Self {
513            committer: Some(config.committer.into()),
514            ingestion: Some(config.ingestion.into()),
515            pruner: config.pruner.map(Into::into),
516            fanout: config.fanout,
517            min_eager_rows: config.min_eager_rows,
518            max_pending_rows: config.max_pending_rows,
519            max_watermark_updates: config.max_watermark_updates,
520            processor_channel_size: config.processor_channel_size,
521            collector_channel_size: config.collector_channel_size,
522            committer_channel_size: config.committer_channel_size,
523        }
524    }
525}
526
527impl From<pipeline::IngestionConfig> for PipelineIngestionLayer {
528    fn from(config: pipeline::IngestionConfig) -> Self {
529        Self {
530            subscriber_channel_size: config.subscriber_channel_size,
531        }
532    }
533}
534
535impl From<CommitterConfig> for CommitterLayer {
536    fn from(config: CommitterConfig) -> Self {
537        Self {
538            write_concurrency: Some(config.write_concurrency),
539            collect_interval_ms: Some(config.collect_interval_ms),
540            watermark_interval_ms: Some(config.watermark_interval_ms),
541        }
542    }
543}
544
545impl From<PrunerConfig> for PrunerLayer {
546    fn from(config: PrunerConfig) -> Self {
547        Self {
548            interval_ms: Some(config.interval_ms),
549            delay_ms: Some(config.delay_ms),
550            retention: Some(config.retention),
551            max_chunk_size: Some(config.max_chunk_size),
552            prune_concurrency: Some(config.prune_concurrency),
553        }
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    macro_rules! assert_matches {
562        ($value:expr, $pattern:pat $(,)?) => {
563            let value = $value;
564            assert!(
565                matches!(value, $pattern),
566                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
567                stringify!($pattern)
568            );
569        };
570    }
571
572    #[test]
573    fn merge_recursive() {
574        let this = PipelineLayer {
575            sum_displays: Some(SequentialLayer {
576                committer: Some(CommitterLayer {
577                    write_concurrency: Some(10),
578                    collect_interval_ms: Some(1000),
579                    watermark_interval_ms: None,
580                }),
581                min_eager_rows: Some(100),
582                ..Default::default()
583            }),
584            ev_emit_mod: Some(ConcurrentLayer {
585                committer: Some(CommitterLayer {
586                    write_concurrency: Some(5),
587                    collect_interval_ms: Some(500),
588                    watermark_interval_ms: None,
589                }),
590                ..Default::default()
591            }),
592            ..Default::default()
593        };
594
595        let that = PipelineLayer {
596            sum_displays: Some(SequentialLayer {
597                committer: Some(CommitterLayer {
598                    write_concurrency: Some(5),
599                    collect_interval_ms: None,
600                    watermark_interval_ms: Some(500),
601                }),
602                min_eager_rows: Some(200),
603                ..Default::default()
604            }),
605            ev_emit_mod: None,
606            ..Default::default()
607        };
608
609        let this_then_that = this.clone().merge(that.clone()).unwrap();
610        let that_then_this = that.clone().merge(this.clone()).unwrap();
611
612        assert_matches!(
613            this_then_that,
614            PipelineLayer {
615                sum_displays: Some(SequentialLayer {
616                    committer: Some(CommitterLayer {
617                        write_concurrency: Some(5),
618                        collect_interval_ms: Some(1000),
619                        watermark_interval_ms: Some(500),
620                        ..
621                    }),
622                    min_eager_rows: Some(200),
623                    ..
624                }),
625                ev_emit_mod: Some(ConcurrentLayer {
626                    committer: Some(CommitterLayer {
627                        write_concurrency: Some(5),
628                        collect_interval_ms: Some(500),
629                        watermark_interval_ms: None,
630                        ..
631                    }),
632                    pruner: None,
633                    ..
634                }),
635                ..
636            },
637        );
638
639        assert_matches!(
640            that_then_this,
641            PipelineLayer {
642                sum_displays: Some(SequentialLayer {
643                    committer: Some(CommitterLayer {
644                        write_concurrency: Some(10),
645                        collect_interval_ms: Some(1000),
646                        watermark_interval_ms: Some(500),
647                        ..
648                    }),
649                    min_eager_rows: Some(100),
650                    ..
651                }),
652                ev_emit_mod: Some(ConcurrentLayer {
653                    committer: Some(CommitterLayer {
654                        write_concurrency: Some(5),
655                        collect_interval_ms: Some(500),
656                        watermark_interval_ms: None,
657                        ..
658                    }),
659                    pruner: None,
660                    ..
661                }),
662                ..
663            },
664        );
665    }
666
667    #[test]
668    fn merge_pruner() {
669        let this = PrunerLayer {
670            interval_ms: None,
671            delay_ms: Some(100),
672            retention: Some(200),
673            max_chunk_size: Some(300),
674            prune_concurrency: Some(1),
675        };
676
677        let that = PrunerLayer {
678            interval_ms: Some(400),
679            delay_ms: None,
680            retention: Some(500),
681            max_chunk_size: Some(600),
682            prune_concurrency: Some(2),
683        };
684
685        let this_then_that = this.clone().merge(that.clone()).unwrap();
686        let that_then_this = that.clone().merge(this.clone()).unwrap();
687
688        assert_matches!(
689            this_then_that,
690            PrunerLayer {
691                interval_ms: Some(400),
692                delay_ms: Some(100),
693                retention: Some(500),
694                max_chunk_size: Some(600),
695                prune_concurrency: Some(2),
696            },
697        );
698
699        assert_matches!(
700            that_then_this,
701            PrunerLayer {
702                interval_ms: Some(400),
703                delay_ms: Some(100),
704                retention: Some(500),
705                max_chunk_size: Some(300),
706                prune_concurrency: Some(1),
707            },
708        );
709    }
710
711    #[test]
712    fn finish_concurrent_unpruned_override() {
713        let layer = ConcurrentLayer {
714            committer: None,
715            pruner: None,
716            ..Default::default()
717        };
718
719        let base = ConcurrentConfig {
720            committer: CommitterConfig {
721                write_concurrency: 5,
722                collect_interval_ms: 50,
723                watermark_interval_ms: 500,
724                ..Default::default()
725            },
726            pruner: Some(PrunerConfig::default()),
727            ..Default::default()
728        };
729
730        assert_matches!(
731            layer.finish(base).unwrap(),
732            ConcurrentConfig {
733                committer: CommitterConfig {
734                    write_concurrency: 5,
735                    collect_interval_ms: 50,
736                    watermark_interval_ms: 500,
737                    ..
738                },
739                pruner: None,
740                ..
741            },
742        );
743    }
744
745    #[test]
746    fn finish_concurrent_no_pruner() {
747        let layer = ConcurrentLayer {
748            committer: None,
749            pruner: None,
750            ..Default::default()
751        };
752
753        let base = ConcurrentConfig {
754            committer: CommitterConfig {
755                write_concurrency: 5,
756                collect_interval_ms: 50,
757                watermark_interval_ms: 500,
758                ..Default::default()
759            },
760            pruner: None,
761            ..Default::default()
762        };
763
764        assert_matches!(
765            layer.finish(base).unwrap(),
766            ConcurrentConfig {
767                committer: CommitterConfig {
768                    write_concurrency: 5,
769                    collect_interval_ms: 50,
770                    watermark_interval_ms: 500,
771                    ..
772                },
773                pruner: None,
774                ..
775            },
776        );
777    }
778
779    #[test]
780    fn finish_concurrent_pruner() {
781        let layer = ConcurrentLayer {
782            committer: None,
783            pruner: Some(PrunerLayer {
784                interval_ms: Some(1000),
785                ..Default::default()
786            }),
787            ..Default::default()
788        };
789
790        let base = ConcurrentConfig {
791            committer: CommitterConfig {
792                write_concurrency: 5,
793                collect_interval_ms: 50,
794                watermark_interval_ms: 500,
795                ..Default::default()
796            },
797            pruner: Some(PrunerConfig {
798                interval_ms: 100,
799                delay_ms: 200,
800                retention: 300,
801                max_chunk_size: 400,
802                prune_concurrency: 1,
803            }),
804            ..Default::default()
805        };
806
807        assert_matches!(
808            layer.finish(base).unwrap(),
809            ConcurrentConfig {
810                committer: CommitterConfig {
811                    write_concurrency: 5,
812                    collect_interval_ms: 50,
813                    watermark_interval_ms: 500,
814                    ..
815                },
816                pruner: Some(PrunerConfig {
817                    interval_ms: 1000,
818                    delay_ms: 200,
819                    retention: 300,
820                    max_chunk_size: 400,
821                    prune_concurrency: 1,
822                }),
823                ..
824            },
825        );
826    }
827
828    #[test]
829    fn detect_unrecognized_fields() {
830        let err = toml::from_str::<IndexerConfig>(
831            r#"
832            i_dont_exist = "foo"
833            "#,
834        )
835        .unwrap_err();
836
837        assert!(
838            err.to_string().contains("i_dont_exist"),
839            "Unexpected error: {err}"
840        );
841    }
842
843    #[test]
844    fn deprecated_checkpoint_buffer_size_parses() {
845        let config: IndexerConfig = toml::from_str(
846            r#"
847            [ingestion]
848            checkpoint-buffer-size = 5000
849            "#,
850        )
851        .expect("deprecated `checkpoint-buffer-size` must deserialize without error");
852
853        assert_eq!(config.ingestion.checkpoint_buffer_size, Some(5000));
854    }
855}