1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::{
6 ingestion::IngestionConfig,
7 pipeline::{
8 CommitterConfig,
9 concurrent::{ConcurrentConfig, PrunerConfig},
10 sequential::SequentialConfig,
11 },
12};
13
14pub trait Merge: Sized {
16 fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23 pub ingestion: IngestionLayer,
25
26 pub committer: CommitterLayer,
29
30 pub pruner: PrunerLayer,
36
37 pub pipeline: PipelineLayer,
39}
40
41#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53 pub checkpoint_buffer_size: Option<usize>,
54 pub ingest_concurrency: Option<usize>,
55 pub retry_interval_ms: Option<u64>,
56 pub streaming_backoff_initial_batch_size: Option<usize>,
57 pub streaming_backoff_max_batch_size: Option<usize>,
58}
59
60#[DefaultConfig]
61#[derive(Clone, Default, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct SequentialLayer {
64 pub committer: Option<CommitterLayer>,
65 pub checkpoint_lag: Option<u64>,
66}
67
68#[DefaultConfig]
69#[derive(Clone, Default, Debug)]
70#[serde(deny_unknown_fields)]
71pub struct ConcurrentLayer {
72 pub committer: Option<CommitterLayer>,
73 pub pruner: Option<PrunerLayer>,
74}
75
76#[DefaultConfig]
77#[derive(Clone, Default, Debug)]
78#[serde(deny_unknown_fields)]
79pub struct CommitterLayer {
80 pub write_concurrency: Option<usize>,
81 pub collect_interval_ms: Option<u64>,
82 pub watermark_interval_ms: Option<u64>,
83}
84
85#[DefaultConfig]
86#[derive(Clone, Default, Debug)]
87#[serde(deny_unknown_fields)]
88pub struct PrunerLayer {
89 pub interval_ms: Option<u64>,
90 pub delay_ms: Option<u64>,
91 pub retention: Option<u64>,
92 pub max_chunk_size: Option<u64>,
93 pub prune_concurrency: Option<u64>,
94}
95
96#[DefaultConfig]
97#[derive(Clone, Default, Debug)]
98#[serde(rename_all = "snake_case", deny_unknown_fields)]
99pub struct PipelineLayer {
100 pub sum_displays: Option<SequentialLayer>,
102
103 pub coin_balance_buckets: Option<ConcurrentLayer>,
105 pub obj_info: Option<ConcurrentLayer>,
106 pub cp_sequence_numbers: Option<ConcurrentLayer>,
107 pub ev_emit_mod: Option<ConcurrentLayer>,
108 pub ev_struct_inst: Option<ConcurrentLayer>,
109 pub kv_checkpoints: Option<ConcurrentLayer>,
110 pub kv_epoch_ends: Option<ConcurrentLayer>,
111 pub kv_epoch_starts: Option<ConcurrentLayer>,
112 pub kv_feature_flags: Option<ConcurrentLayer>,
113 pub kv_objects: Option<ConcurrentLayer>,
114 pub kv_packages: Option<ConcurrentLayer>,
115 pub kv_protocol_configs: Option<ConcurrentLayer>,
116 pub kv_transactions: Option<ConcurrentLayer>,
117 pub obj_versions: Option<ConcurrentLayer>,
118 pub tx_affected_addresses: Option<ConcurrentLayer>,
119 pub tx_affected_objects: Option<ConcurrentLayer>,
120 pub tx_balance_changes: Option<ConcurrentLayer>,
121 pub tx_calls: Option<ConcurrentLayer>,
122 pub tx_digests: Option<ConcurrentLayer>,
123 pub tx_kinds: Option<ConcurrentLayer>,
124}
125
126impl IndexerConfig {
127 pub fn example() -> Self {
130 let mut example: Self = Default::default();
131
132 example.ingestion = IngestionConfig::default().into();
133 example.committer = CommitterConfig::default().into();
134 example.pruner = PrunerConfig::default().into();
135 example.pipeline = PipelineLayer::example();
136
137 example
138 }
139
140 pub fn for_test() -> Self {
144 Self::example()
145 .merge(IndexerConfig {
146 ingestion: IngestionLayer {
147 retry_interval_ms: Some(10),
148 ingest_concurrency: Some(1),
149 ..Default::default()
150 },
151 committer: CommitterLayer {
152 collect_interval_ms: Some(50),
153 watermark_interval_ms: Some(50),
154 write_concurrency: Some(1),
155 },
156 pruner: PrunerLayer {
157 interval_ms: Some(50),
158 delay_ms: Some(0),
159 ..Default::default()
160 },
161 ..Default::default()
162 })
163 .expect("Merge failed for test configuration")
164 }
165}
166
167impl IngestionLayer {
168 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
169 Ok(IngestionConfig {
170 checkpoint_buffer_size: self
171 .checkpoint_buffer_size
172 .unwrap_or(base.checkpoint_buffer_size),
173 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
174 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
175 streaming_backoff_initial_batch_size: self
176 .streaming_backoff_initial_batch_size
177 .unwrap_or(base.streaming_backoff_initial_batch_size),
178 streaming_backoff_max_batch_size: self
179 .streaming_backoff_max_batch_size
180 .unwrap_or(base.streaming_backoff_max_batch_size),
181 })
182 }
183}
184
185impl SequentialLayer {
186 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
187 Ok(SequentialConfig {
188 committer: if let Some(committer) = self.committer {
189 committer.finish(base.committer)?
190 } else {
191 base.committer
192 },
193 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
194 })
195 }
196}
197
198impl ConcurrentLayer {
199 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
202 Ok(ConcurrentConfig {
203 committer: if let Some(committer) = self.committer {
204 committer.finish(base.committer)?
205 } else {
206 base.committer
207 },
208 pruner: match (self.pruner, base.pruner) {
209 (None, _) | (_, None) => None,
210 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
211 },
212 })
213 }
214}
215
216impl CommitterLayer {
217 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
218 Ok(CommitterConfig {
219 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
220 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
221 watermark_interval_ms: self
222 .watermark_interval_ms
223 .unwrap_or(base.watermark_interval_ms),
224 })
225 }
226}
227
228impl PrunerLayer {
229 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
230 Ok(PrunerConfig {
231 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
232 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
233 retention: self.retention.unwrap_or(base.retention),
234 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
235 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
236 })
237 }
238}
239
240impl PipelineLayer {
241 pub fn example() -> Self {
244 PipelineLayer {
245 coin_balance_buckets: Some(Default::default()),
246 obj_info: Some(Default::default()),
247 sum_displays: Some(Default::default()),
248 cp_sequence_numbers: Some(Default::default()),
249 ev_emit_mod: Some(Default::default()),
250 ev_struct_inst: Some(Default::default()),
251 kv_checkpoints: Some(Default::default()),
252 kv_epoch_ends: Some(Default::default()),
253 kv_epoch_starts: Some(Default::default()),
254 kv_feature_flags: Some(Default::default()),
255 kv_objects: Some(Default::default()),
256 kv_packages: Some(Default::default()),
257 kv_protocol_configs: Some(Default::default()),
258 kv_transactions: Some(Default::default()),
259 obj_versions: Some(Default::default()),
260 tx_affected_addresses: Some(Default::default()),
261 tx_affected_objects: Some(Default::default()),
262 tx_balance_changes: Some(Default::default()),
263 tx_calls: Some(Default::default()),
264 tx_digests: Some(Default::default()),
265 tx_kinds: Some(Default::default()),
266 }
267 }
268}
269
270impl Merge for IndexerConfig {
271 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
272 Ok(IndexerConfig {
273 ingestion: self.ingestion.merge(other.ingestion)?,
274 committer: self.committer.merge(other.committer)?,
275 pruner: self.pruner.merge(other.pruner)?,
276 pipeline: self.pipeline.merge(other.pipeline)?,
277 })
278 }
279}
280
281impl Merge for IngestionLayer {
282 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
283 Ok(IngestionLayer {
284 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
285 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
286 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
287 streaming_backoff_initial_batch_size: other
288 .streaming_backoff_initial_batch_size
289 .or(self.streaming_backoff_initial_batch_size),
290 streaming_backoff_max_batch_size: other
291 .streaming_backoff_max_batch_size
292 .or(self.streaming_backoff_max_batch_size),
293 })
294 }
295}
296
297impl Merge for SequentialLayer {
298 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
299 Ok(SequentialLayer {
300 committer: self.committer.merge(other.committer)?,
301 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
302 })
303 }
304}
305
306impl Merge for ConcurrentLayer {
307 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
308 Ok(ConcurrentLayer {
309 committer: self.committer.merge(other.committer)?,
310 pruner: self.pruner.merge(other.pruner)?,
311 })
312 }
313}
314
315impl Merge for CommitterLayer {
316 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
317 Ok(CommitterLayer {
318 write_concurrency: other.write_concurrency.or(self.write_concurrency),
319 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
320 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
321 })
322 }
323}
324
325impl Merge for PrunerLayer {
326 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
329 Ok(PrunerLayer {
330 interval_ms: other.interval_ms.or(self.interval_ms),
331 delay_ms: other.delay_ms.or(self.delay_ms),
332 retention: match (other.retention, self.retention) {
333 (Some(a), Some(b)) => Some(a.max(b)),
334 (Some(a), _) | (_, Some(a)) => Some(a),
335 (None, None) => None,
336 },
337 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
338 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
339 })
340 }
341}
342
343impl Merge for PipelineLayer {
344 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
345 Ok(PipelineLayer {
346 coin_balance_buckets: self
347 .coin_balance_buckets
348 .merge(other.coin_balance_buckets)?,
349 obj_info: self.obj_info.merge(other.obj_info)?,
350 sum_displays: self.sum_displays.merge(other.sum_displays)?,
351 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
352 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
353 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
354 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
355 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
356 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
357 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
358 kv_objects: self.kv_objects.merge(other.kv_objects)?,
359 kv_packages: self.kv_packages.merge(other.kv_packages)?,
360 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
361 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
362 obj_versions: self.obj_versions.merge(other.obj_versions)?,
363 tx_affected_addresses: self
364 .tx_affected_addresses
365 .merge(other.tx_affected_addresses)?,
366 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
367 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
368 tx_calls: self.tx_calls.merge(other.tx_calls)?,
369 tx_digests: self.tx_digests.merge(other.tx_digests)?,
370 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
371 })
372 }
373}
374
375impl<T: Merge> Merge for Option<T> {
376 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
377 Ok(match (self, other) {
378 (Some(a), Some(b)) => Some(a.merge(b)?),
379 (Some(a), _) | (_, Some(a)) => Some(a),
380 (None, None) => None,
381 })
382 }
383}
384
385impl From<IngestionConfig> for IngestionLayer {
386 fn from(config: IngestionConfig) -> Self {
387 Self {
388 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
389 ingest_concurrency: Some(config.ingest_concurrency),
390 retry_interval_ms: Some(config.retry_interval_ms),
391 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
392 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
393 }
394 }
395}
396
397impl From<SequentialConfig> for SequentialLayer {
398 fn from(config: SequentialConfig) -> Self {
399 Self {
400 committer: Some(config.committer.into()),
401 checkpoint_lag: Some(config.checkpoint_lag),
402 }
403 }
404}
405
406impl From<ConcurrentConfig> for ConcurrentLayer {
407 fn from(config: ConcurrentConfig) -> Self {
408 Self {
409 committer: Some(config.committer.into()),
410 pruner: config.pruner.map(Into::into),
411 }
412 }
413}
414
415impl From<CommitterConfig> for CommitterLayer {
416 fn from(config: CommitterConfig) -> Self {
417 Self {
418 write_concurrency: Some(config.write_concurrency),
419 collect_interval_ms: Some(config.collect_interval_ms),
420 watermark_interval_ms: Some(config.watermark_interval_ms),
421 }
422 }
423}
424
425impl From<PrunerConfig> for PrunerLayer {
426 fn from(config: PrunerConfig) -> Self {
427 Self {
428 interval_ms: Some(config.interval_ms),
429 delay_ms: Some(config.delay_ms),
430 retention: Some(config.retention),
431 max_chunk_size: Some(config.max_chunk_size),
432 prune_concurrency: Some(config.prune_concurrency),
433 }
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 macro_rules! assert_matches {
442 ($value:expr, $pattern:pat $(,)?) => {
443 let value = $value;
444 assert!(
445 matches!(value, $pattern),
446 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
447 stringify!($pattern)
448 );
449 };
450 }
451
452 #[test]
453 fn merge_recursive() {
454 let this = PipelineLayer {
455 sum_displays: Some(SequentialLayer {
456 committer: Some(CommitterLayer {
457 write_concurrency: Some(10),
458 collect_interval_ms: Some(1000),
459 watermark_interval_ms: None,
460 }),
461 checkpoint_lag: Some(100),
462 }),
463 ev_emit_mod: Some(ConcurrentLayer {
464 committer: Some(CommitterLayer {
465 write_concurrency: Some(5),
466 collect_interval_ms: Some(500),
467 watermark_interval_ms: None,
468 }),
469 ..Default::default()
470 }),
471 ..Default::default()
472 };
473
474 let that = PipelineLayer {
475 sum_displays: Some(SequentialLayer {
476 committer: Some(CommitterLayer {
477 write_concurrency: Some(5),
478 collect_interval_ms: None,
479 watermark_interval_ms: Some(500),
480 }),
481 checkpoint_lag: Some(200),
482 }),
483 ev_emit_mod: None,
484 ..Default::default()
485 };
486
487 let this_then_that = this.clone().merge(that.clone()).unwrap();
488 let that_then_this = that.clone().merge(this.clone()).unwrap();
489
490 assert_matches!(
491 this_then_that,
492 PipelineLayer {
493 sum_displays: Some(SequentialLayer {
494 committer: Some(CommitterLayer {
495 write_concurrency: Some(5),
496 collect_interval_ms: Some(1000),
497 watermark_interval_ms: Some(500),
498 }),
499 checkpoint_lag: Some(200),
500 }),
501 ev_emit_mod: Some(ConcurrentLayer {
502 committer: Some(CommitterLayer {
503 write_concurrency: Some(5),
504 collect_interval_ms: Some(500),
505 watermark_interval_ms: None,
506 }),
507 pruner: None,
508 }),
509 ..
510 },
511 );
512
513 assert_matches!(
514 that_then_this,
515 PipelineLayer {
516 sum_displays: Some(SequentialLayer {
517 committer: Some(CommitterLayer {
518 write_concurrency: Some(10),
519 collect_interval_ms: Some(1000),
520 watermark_interval_ms: Some(500),
521 }),
522 checkpoint_lag: Some(100),
523 }),
524 ev_emit_mod: Some(ConcurrentLayer {
525 committer: Some(CommitterLayer {
526 write_concurrency: Some(5),
527 collect_interval_ms: Some(500),
528 watermark_interval_ms: None,
529 }),
530 pruner: None,
531 }),
532 ..
533 },
534 );
535 }
536
537 #[test]
538 fn merge_pruner() {
539 let this = PrunerLayer {
540 interval_ms: None,
541 delay_ms: Some(100),
542 retention: Some(200),
543 max_chunk_size: Some(300),
544 prune_concurrency: Some(1),
545 };
546
547 let that = PrunerLayer {
548 interval_ms: Some(400),
549 delay_ms: None,
550 retention: Some(500),
551 max_chunk_size: Some(600),
552 prune_concurrency: Some(2),
553 };
554
555 let this_then_that = this.clone().merge(that.clone()).unwrap();
556 let that_then_this = that.clone().merge(this.clone()).unwrap();
557
558 assert_matches!(
559 this_then_that,
560 PrunerLayer {
561 interval_ms: Some(400),
562 delay_ms: Some(100),
563 retention: Some(500),
564 max_chunk_size: Some(600),
565 prune_concurrency: Some(2),
566 },
567 );
568
569 assert_matches!(
570 that_then_this,
571 PrunerLayer {
572 interval_ms: Some(400),
573 delay_ms: Some(100),
574 retention: Some(500),
575 max_chunk_size: Some(300),
576 prune_concurrency: Some(1),
577 },
578 );
579 }
580
581 #[test]
582 fn finish_concurrent_unpruned_override() {
583 let layer = ConcurrentLayer {
584 committer: None,
585 pruner: None,
586 };
587
588 let base = ConcurrentConfig {
589 committer: CommitterConfig {
590 write_concurrency: 5,
591 collect_interval_ms: 50,
592 watermark_interval_ms: 500,
593 },
594 pruner: Some(PrunerConfig::default()),
595 };
596
597 assert_matches!(
598 layer.finish(base).unwrap(),
599 ConcurrentConfig {
600 committer: CommitterConfig {
601 write_concurrency: 5,
602 collect_interval_ms: 50,
603 watermark_interval_ms: 500,
604 },
605 pruner: None,
606 },
607 );
608 }
609
610 #[test]
611 fn finish_concurrent_no_pruner() {
612 let layer = ConcurrentLayer {
613 committer: None,
614 pruner: None,
615 };
616
617 let base = ConcurrentConfig {
618 committer: CommitterConfig {
619 write_concurrency: 5,
620 collect_interval_ms: 50,
621 watermark_interval_ms: 500,
622 },
623 pruner: None,
624 };
625
626 assert_matches!(
627 layer.finish(base).unwrap(),
628 ConcurrentConfig {
629 committer: CommitterConfig {
630 write_concurrency: 5,
631 collect_interval_ms: 50,
632 watermark_interval_ms: 500,
633 },
634 pruner: None,
635 },
636 );
637 }
638
639 #[test]
640 fn finish_concurrent_pruner() {
641 let layer = ConcurrentLayer {
642 committer: None,
643 pruner: Some(PrunerLayer {
644 interval_ms: Some(1000),
645 ..Default::default()
646 }),
647 };
648
649 let base = ConcurrentConfig {
650 committer: CommitterConfig {
651 write_concurrency: 5,
652 collect_interval_ms: 50,
653 watermark_interval_ms: 500,
654 },
655 pruner: Some(PrunerConfig {
656 interval_ms: 100,
657 delay_ms: 200,
658 retention: 300,
659 max_chunk_size: 400,
660 prune_concurrency: 1,
661 }),
662 };
663
664 assert_matches!(
665 layer.finish(base).unwrap(),
666 ConcurrentConfig {
667 committer: CommitterConfig {
668 write_concurrency: 5,
669 collect_interval_ms: 50,
670 watermark_interval_ms: 500,
671 },
672 pruner: Some(PrunerConfig {
673 interval_ms: 1000,
674 delay_ms: 200,
675 retention: 300,
676 max_chunk_size: 400,
677 prune_concurrency: 1,
678 }),
679 },
680 );
681 }
682
683 #[test]
684 fn detect_unrecognized_fields() {
685 let err = toml::from_str::<IndexerConfig>(
686 r#"
687 i_dont_exist = "foo"
688 "#,
689 )
690 .unwrap_err();
691
692 assert!(
693 err.to_string().contains("i_dont_exist"),
694 "Unexpected error: {err}"
695 );
696 }
697}