1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::{
6 ingestion::IngestionConfig,
7 pipeline::{
8 CommitterConfig,
9 concurrent::{ConcurrentConfig, PrunerConfig},
10 sequential::SequentialConfig,
11 },
12};
13
14pub trait Merge: Sized {
16 fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23 pub ingestion: IngestionLayer,
25
26 pub committer: CommitterLayer,
29
30 pub pruner: PrunerLayer,
36
37 pub pipeline: PipelineLayer,
39}
40
41#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53 pub checkpoint_buffer_size: Option<usize>,
54 pub ingest_concurrency: Option<usize>,
55 pub retry_interval_ms: Option<u64>,
56}
57
58#[DefaultConfig]
59#[derive(Clone, Default, Debug)]
60#[serde(deny_unknown_fields)]
61pub struct SequentialLayer {
62 pub committer: Option<CommitterLayer>,
63 pub checkpoint_lag: Option<u64>,
64}
65
66#[DefaultConfig]
67#[derive(Clone, Default, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct ConcurrentLayer {
70 pub committer: Option<CommitterLayer>,
71 pub pruner: Option<PrunerLayer>,
72}
73
74#[DefaultConfig]
75#[derive(Clone, Default, Debug)]
76#[serde(deny_unknown_fields)]
77pub struct CommitterLayer {
78 pub write_concurrency: Option<usize>,
79 pub collect_interval_ms: Option<u64>,
80 pub watermark_interval_ms: Option<u64>,
81}
82
83#[DefaultConfig]
84#[derive(Clone, Default, Debug)]
85#[serde(deny_unknown_fields)]
86pub struct PrunerLayer {
87 pub interval_ms: Option<u64>,
88 pub delay_ms: Option<u64>,
89 pub retention: Option<u64>,
90 pub max_chunk_size: Option<u64>,
91 pub prune_concurrency: Option<u64>,
92}
93
94#[DefaultConfig]
95#[derive(Clone, Default, Debug)]
96#[serde(rename_all = "snake_case", deny_unknown_fields)]
97pub struct PipelineLayer {
98 pub sum_displays: Option<SequentialLayer>,
100
101 pub coin_balance_buckets: Option<ConcurrentLayer>,
103 pub obj_info: Option<ConcurrentLayer>,
104 pub cp_sequence_numbers: Option<ConcurrentLayer>,
105 pub ev_emit_mod: Option<ConcurrentLayer>,
106 pub ev_struct_inst: Option<ConcurrentLayer>,
107 pub kv_checkpoints: Option<ConcurrentLayer>,
108 pub kv_epoch_ends: Option<ConcurrentLayer>,
109 pub kv_epoch_starts: Option<ConcurrentLayer>,
110 pub kv_feature_flags: Option<ConcurrentLayer>,
111 pub kv_objects: Option<ConcurrentLayer>,
112 pub kv_packages: Option<ConcurrentLayer>,
113 pub kv_protocol_configs: Option<ConcurrentLayer>,
114 pub kv_transactions: Option<ConcurrentLayer>,
115 pub obj_versions: Option<ConcurrentLayer>,
116 pub tx_affected_addresses: Option<ConcurrentLayer>,
117 pub tx_affected_objects: Option<ConcurrentLayer>,
118 pub tx_balance_changes: Option<ConcurrentLayer>,
119 pub tx_calls: Option<ConcurrentLayer>,
120 pub tx_digests: Option<ConcurrentLayer>,
121 pub tx_kinds: Option<ConcurrentLayer>,
122}
123
124impl IndexerConfig {
125 pub fn example() -> Self {
128 let mut example: Self = Default::default();
129
130 example.ingestion = IngestionConfig::default().into();
131 example.committer = CommitterConfig::default().into();
132 example.pruner = PrunerConfig::default().into();
133 example.pipeline = PipelineLayer::example();
134
135 example
136 }
137
138 pub fn for_test() -> Self {
142 Self::example()
143 .merge(IndexerConfig {
144 ingestion: IngestionLayer {
145 retry_interval_ms: Some(10),
146 ingest_concurrency: Some(1),
147 ..Default::default()
148 },
149 committer: CommitterLayer {
150 collect_interval_ms: Some(50),
151 watermark_interval_ms: Some(50),
152 write_concurrency: Some(1),
153 },
154 pruner: PrunerLayer {
155 interval_ms: Some(50),
156 delay_ms: Some(0),
157 ..Default::default()
158 },
159 ..Default::default()
160 })
161 .expect("Merge failed for test configuration")
162 }
163}
164
165impl IngestionLayer {
166 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
167 Ok(IngestionConfig {
168 checkpoint_buffer_size: self
169 .checkpoint_buffer_size
170 .unwrap_or(base.checkpoint_buffer_size),
171 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
172 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
173 })
174 }
175}
176
177impl SequentialLayer {
178 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
179 Ok(SequentialConfig {
180 committer: if let Some(committer) = self.committer {
181 committer.finish(base.committer)?
182 } else {
183 base.committer
184 },
185 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
186 })
187 }
188}
189
190impl ConcurrentLayer {
191 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
194 Ok(ConcurrentConfig {
195 committer: if let Some(committer) = self.committer {
196 committer.finish(base.committer)?
197 } else {
198 base.committer
199 },
200 pruner: match (self.pruner, base.pruner) {
201 (None, _) | (_, None) => None,
202 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
203 },
204 })
205 }
206}
207
208impl CommitterLayer {
209 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
210 Ok(CommitterConfig {
211 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
212 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
213 watermark_interval_ms: self
214 .watermark_interval_ms
215 .unwrap_or(base.watermark_interval_ms),
216 })
217 }
218}
219
220impl PrunerLayer {
221 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
222 Ok(PrunerConfig {
223 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
224 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
225 retention: self.retention.unwrap_or(base.retention),
226 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
227 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
228 })
229 }
230}
231
232impl PipelineLayer {
233 pub fn example() -> Self {
236 PipelineLayer {
237 coin_balance_buckets: Some(Default::default()),
238 obj_info: Some(Default::default()),
239 sum_displays: Some(Default::default()),
240 cp_sequence_numbers: Some(Default::default()),
241 ev_emit_mod: Some(Default::default()),
242 ev_struct_inst: Some(Default::default()),
243 kv_checkpoints: Some(Default::default()),
244 kv_epoch_ends: Some(Default::default()),
245 kv_epoch_starts: Some(Default::default()),
246 kv_feature_flags: Some(Default::default()),
247 kv_objects: Some(Default::default()),
248 kv_packages: Some(Default::default()),
249 kv_protocol_configs: Some(Default::default()),
250 kv_transactions: Some(Default::default()),
251 obj_versions: Some(Default::default()),
252 tx_affected_addresses: Some(Default::default()),
253 tx_affected_objects: Some(Default::default()),
254 tx_balance_changes: Some(Default::default()),
255 tx_calls: Some(Default::default()),
256 tx_digests: Some(Default::default()),
257 tx_kinds: Some(Default::default()),
258 }
259 }
260}
261
262impl Merge for IndexerConfig {
263 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
264 Ok(IndexerConfig {
265 ingestion: self.ingestion.merge(other.ingestion)?,
266 committer: self.committer.merge(other.committer)?,
267 pruner: self.pruner.merge(other.pruner)?,
268 pipeline: self.pipeline.merge(other.pipeline)?,
269 })
270 }
271}
272
273impl Merge for IngestionLayer {
274 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
275 Ok(IngestionLayer {
276 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
277 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
278 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
279 })
280 }
281}
282
283impl Merge for SequentialLayer {
284 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
285 Ok(SequentialLayer {
286 committer: self.committer.merge(other.committer)?,
287 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
288 })
289 }
290}
291
292impl Merge for ConcurrentLayer {
293 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
294 Ok(ConcurrentLayer {
295 committer: self.committer.merge(other.committer)?,
296 pruner: self.pruner.merge(other.pruner)?,
297 })
298 }
299}
300
301impl Merge for CommitterLayer {
302 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
303 Ok(CommitterLayer {
304 write_concurrency: other.write_concurrency.or(self.write_concurrency),
305 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
306 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
307 })
308 }
309}
310
311impl Merge for PrunerLayer {
312 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
315 Ok(PrunerLayer {
316 interval_ms: other.interval_ms.or(self.interval_ms),
317 delay_ms: other.delay_ms.or(self.delay_ms),
318 retention: match (other.retention, self.retention) {
319 (Some(a), Some(b)) => Some(a.max(b)),
320 (Some(a), _) | (_, Some(a)) => Some(a),
321 (None, None) => None,
322 },
323 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
324 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
325 })
326 }
327}
328
329impl Merge for PipelineLayer {
330 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
331 Ok(PipelineLayer {
332 coin_balance_buckets: self
333 .coin_balance_buckets
334 .merge(other.coin_balance_buckets)?,
335 obj_info: self.obj_info.merge(other.obj_info)?,
336 sum_displays: self.sum_displays.merge(other.sum_displays)?,
337 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
338 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
339 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
340 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
341 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
342 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
343 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
344 kv_objects: self.kv_objects.merge(other.kv_objects)?,
345 kv_packages: self.kv_packages.merge(other.kv_packages)?,
346 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
347 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
348 obj_versions: self.obj_versions.merge(other.obj_versions)?,
349 tx_affected_addresses: self
350 .tx_affected_addresses
351 .merge(other.tx_affected_addresses)?,
352 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
353 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
354 tx_calls: self.tx_calls.merge(other.tx_calls)?,
355 tx_digests: self.tx_digests.merge(other.tx_digests)?,
356 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
357 })
358 }
359}
360
361impl<T: Merge> Merge for Option<T> {
362 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
363 Ok(match (self, other) {
364 (Some(a), Some(b)) => Some(a.merge(b)?),
365 (Some(a), _) | (_, Some(a)) => Some(a),
366 (None, None) => None,
367 })
368 }
369}
370
371impl From<IngestionConfig> for IngestionLayer {
372 fn from(config: IngestionConfig) -> Self {
373 Self {
374 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
375 ingest_concurrency: Some(config.ingest_concurrency),
376 retry_interval_ms: Some(config.retry_interval_ms),
377 }
378 }
379}
380
381impl From<SequentialConfig> for SequentialLayer {
382 fn from(config: SequentialConfig) -> Self {
383 Self {
384 committer: Some(config.committer.into()),
385 checkpoint_lag: Some(config.checkpoint_lag),
386 }
387 }
388}
389
390impl From<ConcurrentConfig> for ConcurrentLayer {
391 fn from(config: ConcurrentConfig) -> Self {
392 Self {
393 committer: Some(config.committer.into()),
394 pruner: config.pruner.map(Into::into),
395 }
396 }
397}
398
399impl From<CommitterConfig> for CommitterLayer {
400 fn from(config: CommitterConfig) -> Self {
401 Self {
402 write_concurrency: Some(config.write_concurrency),
403 collect_interval_ms: Some(config.collect_interval_ms),
404 watermark_interval_ms: Some(config.watermark_interval_ms),
405 }
406 }
407}
408
409impl From<PrunerConfig> for PrunerLayer {
410 fn from(config: PrunerConfig) -> Self {
411 Self {
412 interval_ms: Some(config.interval_ms),
413 delay_ms: Some(config.delay_ms),
414 retention: Some(config.retention),
415 max_chunk_size: Some(config.max_chunk_size),
416 prune_concurrency: Some(config.prune_concurrency),
417 }
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424
425 macro_rules! assert_matches {
426 ($value:expr, $pattern:pat $(,)?) => {
427 let value = $value;
428 assert!(
429 matches!(value, $pattern),
430 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
431 stringify!($pattern)
432 );
433 };
434 }
435
436 #[test]
437 fn merge_recursive() {
438 let this = PipelineLayer {
439 sum_displays: Some(SequentialLayer {
440 committer: Some(CommitterLayer {
441 write_concurrency: Some(10),
442 collect_interval_ms: Some(1000),
443 watermark_interval_ms: None,
444 }),
445 checkpoint_lag: Some(100),
446 }),
447 ev_emit_mod: Some(ConcurrentLayer {
448 committer: Some(CommitterLayer {
449 write_concurrency: Some(5),
450 collect_interval_ms: Some(500),
451 watermark_interval_ms: None,
452 }),
453 ..Default::default()
454 }),
455 ..Default::default()
456 };
457
458 let that = PipelineLayer {
459 sum_displays: Some(SequentialLayer {
460 committer: Some(CommitterLayer {
461 write_concurrency: Some(5),
462 collect_interval_ms: None,
463 watermark_interval_ms: Some(500),
464 }),
465 checkpoint_lag: Some(200),
466 }),
467 ev_emit_mod: None,
468 ..Default::default()
469 };
470
471 let this_then_that = this.clone().merge(that.clone()).unwrap();
472 let that_then_this = that.clone().merge(this.clone()).unwrap();
473
474 assert_matches!(
475 this_then_that,
476 PipelineLayer {
477 sum_displays: Some(SequentialLayer {
478 committer: Some(CommitterLayer {
479 write_concurrency: Some(5),
480 collect_interval_ms: Some(1000),
481 watermark_interval_ms: Some(500),
482 }),
483 checkpoint_lag: Some(200),
484 }),
485 ev_emit_mod: Some(ConcurrentLayer {
486 committer: Some(CommitterLayer {
487 write_concurrency: Some(5),
488 collect_interval_ms: Some(500),
489 watermark_interval_ms: None,
490 }),
491 pruner: None,
492 }),
493 ..
494 },
495 );
496
497 assert_matches!(
498 that_then_this,
499 PipelineLayer {
500 sum_displays: Some(SequentialLayer {
501 committer: Some(CommitterLayer {
502 write_concurrency: Some(10),
503 collect_interval_ms: Some(1000),
504 watermark_interval_ms: Some(500),
505 }),
506 checkpoint_lag: Some(100),
507 }),
508 ev_emit_mod: Some(ConcurrentLayer {
509 committer: Some(CommitterLayer {
510 write_concurrency: Some(5),
511 collect_interval_ms: Some(500),
512 watermark_interval_ms: None,
513 }),
514 pruner: None,
515 }),
516 ..
517 },
518 );
519 }
520
521 #[test]
522 fn merge_pruner() {
523 let this = PrunerLayer {
524 interval_ms: None,
525 delay_ms: Some(100),
526 retention: Some(200),
527 max_chunk_size: Some(300),
528 prune_concurrency: Some(1),
529 };
530
531 let that = PrunerLayer {
532 interval_ms: Some(400),
533 delay_ms: None,
534 retention: Some(500),
535 max_chunk_size: Some(600),
536 prune_concurrency: Some(2),
537 };
538
539 let this_then_that = this.clone().merge(that.clone()).unwrap();
540 let that_then_this = that.clone().merge(this.clone()).unwrap();
541
542 assert_matches!(
543 this_then_that,
544 PrunerLayer {
545 interval_ms: Some(400),
546 delay_ms: Some(100),
547 retention: Some(500),
548 max_chunk_size: Some(600),
549 prune_concurrency: Some(2),
550 },
551 );
552
553 assert_matches!(
554 that_then_this,
555 PrunerLayer {
556 interval_ms: Some(400),
557 delay_ms: Some(100),
558 retention: Some(500),
559 max_chunk_size: Some(300),
560 prune_concurrency: Some(1),
561 },
562 );
563 }
564
565 #[test]
566 fn finish_concurrent_unpruned_override() {
567 let layer = ConcurrentLayer {
568 committer: None,
569 pruner: None,
570 };
571
572 let base = ConcurrentConfig {
573 committer: CommitterConfig {
574 write_concurrency: 5,
575 collect_interval_ms: 50,
576 watermark_interval_ms: 500,
577 },
578 pruner: Some(PrunerConfig::default()),
579 };
580
581 assert_matches!(
582 layer.finish(base).unwrap(),
583 ConcurrentConfig {
584 committer: CommitterConfig {
585 write_concurrency: 5,
586 collect_interval_ms: 50,
587 watermark_interval_ms: 500,
588 },
589 pruner: None,
590 },
591 );
592 }
593
594 #[test]
595 fn finish_concurrent_no_pruner() {
596 let layer = ConcurrentLayer {
597 committer: None,
598 pruner: None,
599 };
600
601 let base = ConcurrentConfig {
602 committer: CommitterConfig {
603 write_concurrency: 5,
604 collect_interval_ms: 50,
605 watermark_interval_ms: 500,
606 },
607 pruner: None,
608 };
609
610 assert_matches!(
611 layer.finish(base).unwrap(),
612 ConcurrentConfig {
613 committer: CommitterConfig {
614 write_concurrency: 5,
615 collect_interval_ms: 50,
616 watermark_interval_ms: 500,
617 },
618 pruner: None,
619 },
620 );
621 }
622
623 #[test]
624 fn finish_concurrent_pruner() {
625 let layer = ConcurrentLayer {
626 committer: None,
627 pruner: Some(PrunerLayer {
628 interval_ms: Some(1000),
629 ..Default::default()
630 }),
631 };
632
633 let base = ConcurrentConfig {
634 committer: CommitterConfig {
635 write_concurrency: 5,
636 collect_interval_ms: 50,
637 watermark_interval_ms: 500,
638 },
639 pruner: Some(PrunerConfig {
640 interval_ms: 100,
641 delay_ms: 200,
642 retention: 300,
643 max_chunk_size: 400,
644 prune_concurrency: 1,
645 }),
646 };
647
648 assert_matches!(
649 layer.finish(base).unwrap(),
650 ConcurrentConfig {
651 committer: CommitterConfig {
652 write_concurrency: 5,
653 collect_interval_ms: 50,
654 watermark_interval_ms: 500,
655 },
656 pruner: Some(PrunerConfig {
657 interval_ms: 1000,
658 delay_ms: 200,
659 retention: 300,
660 max_chunk_size: 400,
661 prune_concurrency: 1,
662 }),
663 },
664 );
665 }
666
667 #[test]
668 fn detect_unrecognized_fields() {
669 let err = toml::from_str::<IndexerConfig>(
670 r#"
671 i_dont_exist = "foo"
672 "#,
673 )
674 .unwrap_err();
675
676 assert!(
677 err.to_string().contains("i_dont_exist"),
678 "Unexpected error: {err}"
679 );
680 }
681}