sui_indexer_alt/
config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::{
6    ingestion::IngestionConfig,
7    pipeline::{
8        CommitterConfig,
9        concurrent::{ConcurrentConfig, PrunerConfig},
10        sequential::SequentialConfig,
11    },
12};
13
14/// Trait for merging configuration structs together.
15pub trait Merge: Sized {
16    fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23    /// How checkpoints are read by the indexer.
24    pub ingestion: IngestionLayer,
25
26    /// Default configuration for committers that is shared by all pipelines. Pipelines can
27    /// override individual settings in their own configuration sections.
28    pub committer: CommitterLayer,
29
30    /// Default configuration for pruners that is shared by all concurrent pipelines. Pipelines can
31    /// override individual settings in their own configuration sections. Concurrent pipelines
32    /// still need to specify a pruner configuration (although it can be empty) to indicate that
33    /// they want to enable pruning, but when they do, any missing values will be filled in by this
34    /// config.
35    pub pruner: PrunerLayer,
36
37    /// Per-pipeline configurations.
38    pub pipeline: PipelineLayer,
39}
40
41// Configuration layers apply overrides over a base configuration. When reading configs from a
42// file, we read them into layer types, and then apply those layers onto an existing configuration
43// (such as the default configuration) to `finish()` them.
44//
45// Treating configs as layers allows us to support configuration merging, where multiple
46// configuration files can be combined into one final configuration. Having a separate type for
47// reading configs also allows us to detect and warn against unrecognised fields.
48
49#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53    pub checkpoint_buffer_size: Option<usize>,
54    pub ingest_concurrency: Option<usize>,
55    pub retry_interval_ms: Option<u64>,
56}
57
58#[DefaultConfig]
59#[derive(Clone, Default, Debug)]
60#[serde(deny_unknown_fields)]
61pub struct SequentialLayer {
62    pub committer: Option<CommitterLayer>,
63    pub checkpoint_lag: Option<u64>,
64}
65
66#[DefaultConfig]
67#[derive(Clone, Default, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct ConcurrentLayer {
70    pub committer: Option<CommitterLayer>,
71    pub pruner: Option<PrunerLayer>,
72}
73
74#[DefaultConfig]
75#[derive(Clone, Default, Debug)]
76#[serde(deny_unknown_fields)]
77pub struct CommitterLayer {
78    pub write_concurrency: Option<usize>,
79    pub collect_interval_ms: Option<u64>,
80    pub watermark_interval_ms: Option<u64>,
81}
82
83#[DefaultConfig]
84#[derive(Clone, Default, Debug)]
85#[serde(deny_unknown_fields)]
86pub struct PrunerLayer {
87    pub interval_ms: Option<u64>,
88    pub delay_ms: Option<u64>,
89    pub retention: Option<u64>,
90    pub max_chunk_size: Option<u64>,
91    pub prune_concurrency: Option<u64>,
92}
93
94#[DefaultConfig]
95#[derive(Clone, Default, Debug)]
96#[serde(rename_all = "snake_case", deny_unknown_fields)]
97pub struct PipelineLayer {
98    // Sequential pipelines
99    pub sum_displays: Option<SequentialLayer>,
100
101    // All concurrent pipelines
102    pub coin_balance_buckets: Option<ConcurrentLayer>,
103    pub obj_info: Option<ConcurrentLayer>,
104    pub cp_sequence_numbers: Option<ConcurrentLayer>,
105    pub ev_emit_mod: Option<ConcurrentLayer>,
106    pub ev_struct_inst: Option<ConcurrentLayer>,
107    pub kv_checkpoints: Option<ConcurrentLayer>,
108    pub kv_epoch_ends: Option<ConcurrentLayer>,
109    pub kv_epoch_starts: Option<ConcurrentLayer>,
110    pub kv_feature_flags: Option<ConcurrentLayer>,
111    pub kv_objects: Option<ConcurrentLayer>,
112    pub kv_packages: Option<ConcurrentLayer>,
113    pub kv_protocol_configs: Option<ConcurrentLayer>,
114    pub kv_transactions: Option<ConcurrentLayer>,
115    pub obj_versions: Option<ConcurrentLayer>,
116    pub tx_affected_addresses: Option<ConcurrentLayer>,
117    pub tx_affected_objects: Option<ConcurrentLayer>,
118    pub tx_balance_changes: Option<ConcurrentLayer>,
119    pub tx_calls: Option<ConcurrentLayer>,
120    pub tx_digests: Option<ConcurrentLayer>,
121    pub tx_kinds: Option<ConcurrentLayer>,
122}
123
124impl IndexerConfig {
125    /// Generate an example configuration, suitable for demonstrating the fields available to
126    /// configure.
127    pub fn example() -> Self {
128        let mut example: Self = Default::default();
129
130        example.ingestion = IngestionConfig::default().into();
131        example.committer = CommitterConfig::default().into();
132        example.pruner = PrunerConfig::default().into();
133        example.pipeline = PipelineLayer::example();
134
135        example
136    }
137
138    /// Generate a configuration suitable for testing. This is the same as the example
139    /// configuration, but with reduced concurrency and faster polling intervals so tests spend
140    /// less time waiting.
141    pub fn for_test() -> Self {
142        Self::example()
143            .merge(IndexerConfig {
144                ingestion: IngestionLayer {
145                    retry_interval_ms: Some(10),
146                    ingest_concurrency: Some(1),
147                    ..Default::default()
148                },
149                committer: CommitterLayer {
150                    collect_interval_ms: Some(50),
151                    watermark_interval_ms: Some(50),
152                    write_concurrency: Some(1),
153                },
154                pruner: PrunerLayer {
155                    interval_ms: Some(50),
156                    delay_ms: Some(0),
157                    ..Default::default()
158                },
159                ..Default::default()
160            })
161            .expect("Merge failed for test configuration")
162    }
163}
164
165impl IngestionLayer {
166    pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
167        Ok(IngestionConfig {
168            checkpoint_buffer_size: self
169                .checkpoint_buffer_size
170                .unwrap_or(base.checkpoint_buffer_size),
171            ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
172            retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
173        })
174    }
175}
176
177impl SequentialLayer {
178    pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
179        Ok(SequentialConfig {
180            committer: if let Some(committer) = self.committer {
181                committer.finish(base.committer)?
182            } else {
183                base.committer
184            },
185            checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
186        })
187    }
188}
189
190impl ConcurrentLayer {
191    /// Unlike other parameters, `pruner` will appear in the finished configuration only if they
192    /// appear in the layer *and* in the base.
193    pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
194        Ok(ConcurrentConfig {
195            committer: if let Some(committer) = self.committer {
196                committer.finish(base.committer)?
197            } else {
198                base.committer
199            },
200            pruner: match (self.pruner, base.pruner) {
201                (None, _) | (_, None) => None,
202                (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
203            },
204        })
205    }
206}
207
208impl CommitterLayer {
209    pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
210        Ok(CommitterConfig {
211            write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
212            collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
213            watermark_interval_ms: self
214                .watermark_interval_ms
215                .unwrap_or(base.watermark_interval_ms),
216        })
217    }
218}
219
220impl PrunerLayer {
221    pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
222        Ok(PrunerConfig {
223            interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
224            delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
225            retention: self.retention.unwrap_or(base.retention),
226            max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
227            prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
228        })
229    }
230}
231
232impl PipelineLayer {
233    /// Generate an example configuration, suitable for demonstrating the fields available to
234    /// configure.
235    pub fn example() -> Self {
236        PipelineLayer {
237            coin_balance_buckets: Some(Default::default()),
238            obj_info: Some(Default::default()),
239            sum_displays: Some(Default::default()),
240            cp_sequence_numbers: Some(Default::default()),
241            ev_emit_mod: Some(Default::default()),
242            ev_struct_inst: Some(Default::default()),
243            kv_checkpoints: Some(Default::default()),
244            kv_epoch_ends: Some(Default::default()),
245            kv_epoch_starts: Some(Default::default()),
246            kv_feature_flags: Some(Default::default()),
247            kv_objects: Some(Default::default()),
248            kv_packages: Some(Default::default()),
249            kv_protocol_configs: Some(Default::default()),
250            kv_transactions: Some(Default::default()),
251            obj_versions: Some(Default::default()),
252            tx_affected_addresses: Some(Default::default()),
253            tx_affected_objects: Some(Default::default()),
254            tx_balance_changes: Some(Default::default()),
255            tx_calls: Some(Default::default()),
256            tx_digests: Some(Default::default()),
257            tx_kinds: Some(Default::default()),
258        }
259    }
260}
261
262impl Merge for IndexerConfig {
263    fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
264        Ok(IndexerConfig {
265            ingestion: self.ingestion.merge(other.ingestion)?,
266            committer: self.committer.merge(other.committer)?,
267            pruner: self.pruner.merge(other.pruner)?,
268            pipeline: self.pipeline.merge(other.pipeline)?,
269        })
270    }
271}
272
273impl Merge for IngestionLayer {
274    fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
275        Ok(IngestionLayer {
276            checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
277            ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
278            retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
279        })
280    }
281}
282
283impl Merge for SequentialLayer {
284    fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
285        Ok(SequentialLayer {
286            committer: self.committer.merge(other.committer)?,
287            checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
288        })
289    }
290}
291
292impl Merge for ConcurrentLayer {
293    fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
294        Ok(ConcurrentLayer {
295            committer: self.committer.merge(other.committer)?,
296            pruner: self.pruner.merge(other.pruner)?,
297        })
298    }
299}
300
301impl Merge for CommitterLayer {
302    fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
303        Ok(CommitterLayer {
304            write_concurrency: other.write_concurrency.or(self.write_concurrency),
305            collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
306            watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
307        })
308    }
309}
310
311impl Merge for PrunerLayer {
312    /// Last write takes precedence for all fields except the `retention`, which takes the max of
313    /// all available values.
314    fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
315        Ok(PrunerLayer {
316            interval_ms: other.interval_ms.or(self.interval_ms),
317            delay_ms: other.delay_ms.or(self.delay_ms),
318            retention: match (other.retention, self.retention) {
319                (Some(a), Some(b)) => Some(a.max(b)),
320                (Some(a), _) | (_, Some(a)) => Some(a),
321                (None, None) => None,
322            },
323            max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
324            prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
325        })
326    }
327}
328
329impl Merge for PipelineLayer {
330    fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
331        Ok(PipelineLayer {
332            coin_balance_buckets: self
333                .coin_balance_buckets
334                .merge(other.coin_balance_buckets)?,
335            obj_info: self.obj_info.merge(other.obj_info)?,
336            sum_displays: self.sum_displays.merge(other.sum_displays)?,
337            cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
338            ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
339            ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
340            kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
341            kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
342            kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
343            kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
344            kv_objects: self.kv_objects.merge(other.kv_objects)?,
345            kv_packages: self.kv_packages.merge(other.kv_packages)?,
346            kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
347            kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
348            obj_versions: self.obj_versions.merge(other.obj_versions)?,
349            tx_affected_addresses: self
350                .tx_affected_addresses
351                .merge(other.tx_affected_addresses)?,
352            tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
353            tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
354            tx_calls: self.tx_calls.merge(other.tx_calls)?,
355            tx_digests: self.tx_digests.merge(other.tx_digests)?,
356            tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
357        })
358    }
359}
360
361impl<T: Merge> Merge for Option<T> {
362    fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
363        Ok(match (self, other) {
364            (Some(a), Some(b)) => Some(a.merge(b)?),
365            (Some(a), _) | (_, Some(a)) => Some(a),
366            (None, None) => None,
367        })
368    }
369}
370
371impl From<IngestionConfig> for IngestionLayer {
372    fn from(config: IngestionConfig) -> Self {
373        Self {
374            checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
375            ingest_concurrency: Some(config.ingest_concurrency),
376            retry_interval_ms: Some(config.retry_interval_ms),
377        }
378    }
379}
380
381impl From<SequentialConfig> for SequentialLayer {
382    fn from(config: SequentialConfig) -> Self {
383        Self {
384            committer: Some(config.committer.into()),
385            checkpoint_lag: Some(config.checkpoint_lag),
386        }
387    }
388}
389
390impl From<ConcurrentConfig> for ConcurrentLayer {
391    fn from(config: ConcurrentConfig) -> Self {
392        Self {
393            committer: Some(config.committer.into()),
394            pruner: config.pruner.map(Into::into),
395        }
396    }
397}
398
399impl From<CommitterConfig> for CommitterLayer {
400    fn from(config: CommitterConfig) -> Self {
401        Self {
402            write_concurrency: Some(config.write_concurrency),
403            collect_interval_ms: Some(config.collect_interval_ms),
404            watermark_interval_ms: Some(config.watermark_interval_ms),
405        }
406    }
407}
408
409impl From<PrunerConfig> for PrunerLayer {
410    fn from(config: PrunerConfig) -> Self {
411        Self {
412            interval_ms: Some(config.interval_ms),
413            delay_ms: Some(config.delay_ms),
414            retention: Some(config.retention),
415            max_chunk_size: Some(config.max_chunk_size),
416            prune_concurrency: Some(config.prune_concurrency),
417        }
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424
425    macro_rules! assert_matches {
426        ($value:expr, $pattern:pat $(,)?) => {
427            let value = $value;
428            assert!(
429                matches!(value, $pattern),
430                "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
431                stringify!($pattern)
432            );
433        };
434    }
435
436    #[test]
437    fn merge_recursive() {
438        let this = PipelineLayer {
439            sum_displays: Some(SequentialLayer {
440                committer: Some(CommitterLayer {
441                    write_concurrency: Some(10),
442                    collect_interval_ms: Some(1000),
443                    watermark_interval_ms: None,
444                }),
445                checkpoint_lag: Some(100),
446            }),
447            ev_emit_mod: Some(ConcurrentLayer {
448                committer: Some(CommitterLayer {
449                    write_concurrency: Some(5),
450                    collect_interval_ms: Some(500),
451                    watermark_interval_ms: None,
452                }),
453                ..Default::default()
454            }),
455            ..Default::default()
456        };
457
458        let that = PipelineLayer {
459            sum_displays: Some(SequentialLayer {
460                committer: Some(CommitterLayer {
461                    write_concurrency: Some(5),
462                    collect_interval_ms: None,
463                    watermark_interval_ms: Some(500),
464                }),
465                checkpoint_lag: Some(200),
466            }),
467            ev_emit_mod: None,
468            ..Default::default()
469        };
470
471        let this_then_that = this.clone().merge(that.clone()).unwrap();
472        let that_then_this = that.clone().merge(this.clone()).unwrap();
473
474        assert_matches!(
475            this_then_that,
476            PipelineLayer {
477                sum_displays: Some(SequentialLayer {
478                    committer: Some(CommitterLayer {
479                        write_concurrency: Some(5),
480                        collect_interval_ms: Some(1000),
481                        watermark_interval_ms: Some(500),
482                    }),
483                    checkpoint_lag: Some(200),
484                }),
485                ev_emit_mod: Some(ConcurrentLayer {
486                    committer: Some(CommitterLayer {
487                        write_concurrency: Some(5),
488                        collect_interval_ms: Some(500),
489                        watermark_interval_ms: None,
490                    }),
491                    pruner: None,
492                }),
493                ..
494            },
495        );
496
497        assert_matches!(
498            that_then_this,
499            PipelineLayer {
500                sum_displays: Some(SequentialLayer {
501                    committer: Some(CommitterLayer {
502                        write_concurrency: Some(10),
503                        collect_interval_ms: Some(1000),
504                        watermark_interval_ms: Some(500),
505                    }),
506                    checkpoint_lag: Some(100),
507                }),
508                ev_emit_mod: Some(ConcurrentLayer {
509                    committer: Some(CommitterLayer {
510                        write_concurrency: Some(5),
511                        collect_interval_ms: Some(500),
512                        watermark_interval_ms: None,
513                    }),
514                    pruner: None,
515                }),
516                ..
517            },
518        );
519    }
520
521    #[test]
522    fn merge_pruner() {
523        let this = PrunerLayer {
524            interval_ms: None,
525            delay_ms: Some(100),
526            retention: Some(200),
527            max_chunk_size: Some(300),
528            prune_concurrency: Some(1),
529        };
530
531        let that = PrunerLayer {
532            interval_ms: Some(400),
533            delay_ms: None,
534            retention: Some(500),
535            max_chunk_size: Some(600),
536            prune_concurrency: Some(2),
537        };
538
539        let this_then_that = this.clone().merge(that.clone()).unwrap();
540        let that_then_this = that.clone().merge(this.clone()).unwrap();
541
542        assert_matches!(
543            this_then_that,
544            PrunerLayer {
545                interval_ms: Some(400),
546                delay_ms: Some(100),
547                retention: Some(500),
548                max_chunk_size: Some(600),
549                prune_concurrency: Some(2),
550            },
551        );
552
553        assert_matches!(
554            that_then_this,
555            PrunerLayer {
556                interval_ms: Some(400),
557                delay_ms: Some(100),
558                retention: Some(500),
559                max_chunk_size: Some(300),
560                prune_concurrency: Some(1),
561            },
562        );
563    }
564
565    #[test]
566    fn finish_concurrent_unpruned_override() {
567        let layer = ConcurrentLayer {
568            committer: None,
569            pruner: None,
570        };
571
572        let base = ConcurrentConfig {
573            committer: CommitterConfig {
574                write_concurrency: 5,
575                collect_interval_ms: 50,
576                watermark_interval_ms: 500,
577            },
578            pruner: Some(PrunerConfig::default()),
579        };
580
581        assert_matches!(
582            layer.finish(base).unwrap(),
583            ConcurrentConfig {
584                committer: CommitterConfig {
585                    write_concurrency: 5,
586                    collect_interval_ms: 50,
587                    watermark_interval_ms: 500,
588                },
589                pruner: None,
590            },
591        );
592    }
593
594    #[test]
595    fn finish_concurrent_no_pruner() {
596        let layer = ConcurrentLayer {
597            committer: None,
598            pruner: None,
599        };
600
601        let base = ConcurrentConfig {
602            committer: CommitterConfig {
603                write_concurrency: 5,
604                collect_interval_ms: 50,
605                watermark_interval_ms: 500,
606            },
607            pruner: None,
608        };
609
610        assert_matches!(
611            layer.finish(base).unwrap(),
612            ConcurrentConfig {
613                committer: CommitterConfig {
614                    write_concurrency: 5,
615                    collect_interval_ms: 50,
616                    watermark_interval_ms: 500,
617                },
618                pruner: None,
619            },
620        );
621    }
622
623    #[test]
624    fn finish_concurrent_pruner() {
625        let layer = ConcurrentLayer {
626            committer: None,
627            pruner: Some(PrunerLayer {
628                interval_ms: Some(1000),
629                ..Default::default()
630            }),
631        };
632
633        let base = ConcurrentConfig {
634            committer: CommitterConfig {
635                write_concurrency: 5,
636                collect_interval_ms: 50,
637                watermark_interval_ms: 500,
638            },
639            pruner: Some(PrunerConfig {
640                interval_ms: 100,
641                delay_ms: 200,
642                retention: 300,
643                max_chunk_size: 400,
644                prune_concurrency: 1,
645            }),
646        };
647
648        assert_matches!(
649            layer.finish(base).unwrap(),
650            ConcurrentConfig {
651                committer: CommitterConfig {
652                    write_concurrency: 5,
653                    collect_interval_ms: 50,
654                    watermark_interval_ms: 500,
655                },
656                pruner: Some(PrunerConfig {
657                    interval_ms: 1000,
658                    delay_ms: 200,
659                    retention: 300,
660                    max_chunk_size: 400,
661                    prune_concurrency: 1,
662                }),
663            },
664        );
665    }
666
667    #[test]
668    fn detect_unrecognized_fields() {
669        let err = toml::from_str::<IndexerConfig>(
670            r#"
671            i_dont_exist = "foo"
672            "#,
673        )
674        .unwrap_err();
675
676        assert!(
677            err.to_string().contains("i_dont_exist"),
678            "Unexpected error: {err}"
679        );
680    }
681}