1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::{
6 ingestion::IngestionConfig,
7 pipeline::{
8 CommitterConfig,
9 concurrent::{ConcurrentConfig, PrunerConfig},
10 sequential::SequentialConfig,
11 },
12};
13
14pub trait Merge: Sized {
16 fn merge(self, other: Self) -> anyhow::Result<Self>;
17}
18
19#[DefaultConfig]
20#[derive(Clone, Default, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct IndexerConfig {
23 pub ingestion: IngestionLayer,
25
26 pub committer: CommitterLayer,
29
30 pub pruner: PrunerLayer,
36
37 pub pipeline: PipelineLayer,
39}
40
41#[DefaultConfig]
50#[derive(Clone, Default, Debug)]
51#[serde(deny_unknown_fields)]
52pub struct IngestionLayer {
53 pub checkpoint_buffer_size: Option<usize>,
54 pub ingest_concurrency: Option<usize>,
55 pub retry_interval_ms: Option<u64>,
56 pub streaming_backoff_initial_batch_size: Option<usize>,
57 pub streaming_backoff_max_batch_size: Option<usize>,
58 pub streaming_connection_timeout_ms: Option<u64>,
59 pub streaming_statement_timeout_ms: Option<u64>,
60}
61
62#[DefaultConfig]
63#[derive(Clone, Default, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct SequentialLayer {
66 pub committer: Option<CommitterLayer>,
67 pub checkpoint_lag: Option<u64>,
68}
69
70#[DefaultConfig]
71#[derive(Clone, Default, Debug)]
72#[serde(deny_unknown_fields)]
73pub struct ConcurrentLayer {
74 pub committer: Option<CommitterLayer>,
75 pub pruner: Option<PrunerLayer>,
76}
77
78#[DefaultConfig]
79#[derive(Clone, Default, Debug)]
80#[serde(deny_unknown_fields)]
81pub struct CommitterLayer {
82 pub write_concurrency: Option<usize>,
83 pub collect_interval_ms: Option<u64>,
84 pub watermark_interval_ms: Option<u64>,
85}
86
87#[DefaultConfig]
88#[derive(Clone, Default, Debug)]
89#[serde(deny_unknown_fields)]
90pub struct PrunerLayer {
91 pub interval_ms: Option<u64>,
92 pub delay_ms: Option<u64>,
93 pub retention: Option<u64>,
94 pub max_chunk_size: Option<u64>,
95 pub prune_concurrency: Option<u64>,
96}
97
98#[DefaultConfig]
99#[derive(Clone, Default, Debug)]
100#[serde(rename_all = "snake_case", deny_unknown_fields)]
101pub struct PipelineLayer {
102 pub sum_displays: Option<SequentialLayer>,
104
105 pub coin_balance_buckets: Option<ConcurrentLayer>,
107 pub obj_info: Option<ConcurrentLayer>,
108 pub cp_sequence_numbers: Option<ConcurrentLayer>,
109 pub ev_emit_mod: Option<ConcurrentLayer>,
110 pub ev_struct_inst: Option<ConcurrentLayer>,
111 pub kv_checkpoints: Option<ConcurrentLayer>,
112 pub kv_epoch_ends: Option<ConcurrentLayer>,
113 pub kv_epoch_starts: Option<ConcurrentLayer>,
114 pub kv_feature_flags: Option<ConcurrentLayer>,
115 pub kv_objects: Option<ConcurrentLayer>,
116 pub kv_packages: Option<ConcurrentLayer>,
117 pub kv_protocol_configs: Option<ConcurrentLayer>,
118 pub kv_transactions: Option<ConcurrentLayer>,
119 pub obj_versions: Option<ConcurrentLayer>,
120 pub tx_affected_addresses: Option<ConcurrentLayer>,
121 pub tx_affected_objects: Option<ConcurrentLayer>,
122 pub tx_balance_changes: Option<ConcurrentLayer>,
123 pub tx_calls: Option<ConcurrentLayer>,
124 pub tx_digests: Option<ConcurrentLayer>,
125 pub tx_kinds: Option<ConcurrentLayer>,
126}
127
128impl IndexerConfig {
129 pub fn example() -> Self {
132 let mut example: Self = Default::default();
133
134 example.ingestion = IngestionConfig::default().into();
135 example.committer = CommitterConfig::default().into();
136 example.pruner = PrunerConfig::default().into();
137 example.pipeline = PipelineLayer::example();
138
139 example
140 }
141
142 pub fn for_test() -> Self {
146 Self::example()
147 .merge(IndexerConfig {
148 ingestion: IngestionLayer {
149 retry_interval_ms: Some(10),
150 ingest_concurrency: Some(1),
151 ..Default::default()
152 },
153 committer: CommitterLayer {
154 collect_interval_ms: Some(50),
155 watermark_interval_ms: Some(50),
156 write_concurrency: Some(1),
157 },
158 pruner: PrunerLayer {
159 interval_ms: Some(50),
160 delay_ms: Some(0),
161 ..Default::default()
162 },
163 ..Default::default()
164 })
165 .expect("Merge failed for test configuration")
166 }
167}
168
169impl IngestionLayer {
170 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
171 Ok(IngestionConfig {
172 checkpoint_buffer_size: self
173 .checkpoint_buffer_size
174 .unwrap_or(base.checkpoint_buffer_size),
175 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
176 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
177 streaming_backoff_initial_batch_size: self
178 .streaming_backoff_initial_batch_size
179 .unwrap_or(base.streaming_backoff_initial_batch_size),
180 streaming_backoff_max_batch_size: self
181 .streaming_backoff_max_batch_size
182 .unwrap_or(base.streaming_backoff_max_batch_size),
183 streaming_connection_timeout_ms: self
184 .streaming_connection_timeout_ms
185 .unwrap_or(base.streaming_connection_timeout_ms),
186 streaming_statement_timeout_ms: self
187 .streaming_statement_timeout_ms
188 .unwrap_or(base.streaming_statement_timeout_ms),
189 })
190 }
191}
192
193impl SequentialLayer {
194 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
195 Ok(SequentialConfig {
196 committer: if let Some(committer) = self.committer {
197 committer.finish(base.committer)?
198 } else {
199 base.committer
200 },
201 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
202 })
203 }
204}
205
206impl ConcurrentLayer {
207 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
210 Ok(ConcurrentConfig {
211 committer: if let Some(committer) = self.committer {
212 committer.finish(base.committer)?
213 } else {
214 base.committer
215 },
216 pruner: match (self.pruner, base.pruner) {
217 (None, _) | (_, None) => None,
218 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
219 },
220 })
221 }
222}
223
224impl CommitterLayer {
225 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
226 Ok(CommitterConfig {
227 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
228 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
229 watermark_interval_ms: self
230 .watermark_interval_ms
231 .unwrap_or(base.watermark_interval_ms),
232 })
233 }
234}
235
236impl PrunerLayer {
237 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
238 Ok(PrunerConfig {
239 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
240 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
241 retention: self.retention.unwrap_or(base.retention),
242 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
243 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
244 })
245 }
246}
247
248impl PipelineLayer {
249 pub fn example() -> Self {
252 PipelineLayer {
253 coin_balance_buckets: Some(Default::default()),
254 obj_info: Some(Default::default()),
255 sum_displays: Some(Default::default()),
256 cp_sequence_numbers: Some(Default::default()),
257 ev_emit_mod: Some(Default::default()),
258 ev_struct_inst: Some(Default::default()),
259 kv_checkpoints: Some(Default::default()),
260 kv_epoch_ends: Some(Default::default()),
261 kv_epoch_starts: Some(Default::default()),
262 kv_feature_flags: Some(Default::default()),
263 kv_objects: Some(Default::default()),
264 kv_packages: Some(Default::default()),
265 kv_protocol_configs: Some(Default::default()),
266 kv_transactions: Some(Default::default()),
267 obj_versions: Some(Default::default()),
268 tx_affected_addresses: Some(Default::default()),
269 tx_affected_objects: Some(Default::default()),
270 tx_balance_changes: Some(Default::default()),
271 tx_calls: Some(Default::default()),
272 tx_digests: Some(Default::default()),
273 tx_kinds: Some(Default::default()),
274 }
275 }
276}
277
278impl Merge for IndexerConfig {
279 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
280 Ok(IndexerConfig {
281 ingestion: self.ingestion.merge(other.ingestion)?,
282 committer: self.committer.merge(other.committer)?,
283 pruner: self.pruner.merge(other.pruner)?,
284 pipeline: self.pipeline.merge(other.pipeline)?,
285 })
286 }
287}
288
289impl Merge for IngestionLayer {
290 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
291 Ok(IngestionLayer {
292 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
293 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
294 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
295 streaming_backoff_initial_batch_size: other
296 .streaming_backoff_initial_batch_size
297 .or(self.streaming_backoff_initial_batch_size),
298 streaming_backoff_max_batch_size: other
299 .streaming_backoff_max_batch_size
300 .or(self.streaming_backoff_max_batch_size),
301 streaming_connection_timeout_ms: other
302 .streaming_connection_timeout_ms
303 .or(self.streaming_connection_timeout_ms),
304 streaming_statement_timeout_ms: other
305 .streaming_statement_timeout_ms
306 .or(self.streaming_statement_timeout_ms),
307 })
308 }
309}
310
311impl Merge for SequentialLayer {
312 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
313 Ok(SequentialLayer {
314 committer: self.committer.merge(other.committer)?,
315 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
316 })
317 }
318}
319
320impl Merge for ConcurrentLayer {
321 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
322 Ok(ConcurrentLayer {
323 committer: self.committer.merge(other.committer)?,
324 pruner: self.pruner.merge(other.pruner)?,
325 })
326 }
327}
328
329impl Merge for CommitterLayer {
330 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
331 Ok(CommitterLayer {
332 write_concurrency: other.write_concurrency.or(self.write_concurrency),
333 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
334 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
335 })
336 }
337}
338
339impl Merge for PrunerLayer {
340 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
343 Ok(PrunerLayer {
344 interval_ms: other.interval_ms.or(self.interval_ms),
345 delay_ms: other.delay_ms.or(self.delay_ms),
346 retention: match (other.retention, self.retention) {
347 (Some(a), Some(b)) => Some(a.max(b)),
348 (Some(a), _) | (_, Some(a)) => Some(a),
349 (None, None) => None,
350 },
351 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
352 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
353 })
354 }
355}
356
357impl Merge for PipelineLayer {
358 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
359 Ok(PipelineLayer {
360 coin_balance_buckets: self
361 .coin_balance_buckets
362 .merge(other.coin_balance_buckets)?,
363 obj_info: self.obj_info.merge(other.obj_info)?,
364 sum_displays: self.sum_displays.merge(other.sum_displays)?,
365 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
366 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
367 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
368 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
369 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
370 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
371 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
372 kv_objects: self.kv_objects.merge(other.kv_objects)?,
373 kv_packages: self.kv_packages.merge(other.kv_packages)?,
374 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
375 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
376 obj_versions: self.obj_versions.merge(other.obj_versions)?,
377 tx_affected_addresses: self
378 .tx_affected_addresses
379 .merge(other.tx_affected_addresses)?,
380 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
381 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
382 tx_calls: self.tx_calls.merge(other.tx_calls)?,
383 tx_digests: self.tx_digests.merge(other.tx_digests)?,
384 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
385 })
386 }
387}
388
389impl<T: Merge> Merge for Option<T> {
390 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
391 Ok(match (self, other) {
392 (Some(a), Some(b)) => Some(a.merge(b)?),
393 (Some(a), _) | (_, Some(a)) => Some(a),
394 (None, None) => None,
395 })
396 }
397}
398
399impl From<IngestionConfig> for IngestionLayer {
400 fn from(config: IngestionConfig) -> Self {
401 Self {
402 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
403 ingest_concurrency: Some(config.ingest_concurrency),
404 retry_interval_ms: Some(config.retry_interval_ms),
405 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
406 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
407 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
408 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
409 }
410 }
411}
412
413impl From<SequentialConfig> for SequentialLayer {
414 fn from(config: SequentialConfig) -> Self {
415 Self {
416 committer: Some(config.committer.into()),
417 checkpoint_lag: Some(config.checkpoint_lag),
418 }
419 }
420}
421
422impl From<ConcurrentConfig> for ConcurrentLayer {
423 fn from(config: ConcurrentConfig) -> Self {
424 Self {
425 committer: Some(config.committer.into()),
426 pruner: config.pruner.map(Into::into),
427 }
428 }
429}
430
431impl From<CommitterConfig> for CommitterLayer {
432 fn from(config: CommitterConfig) -> Self {
433 Self {
434 write_concurrency: Some(config.write_concurrency),
435 collect_interval_ms: Some(config.collect_interval_ms),
436 watermark_interval_ms: Some(config.watermark_interval_ms),
437 }
438 }
439}
440
441impl From<PrunerConfig> for PrunerLayer {
442 fn from(config: PrunerConfig) -> Self {
443 Self {
444 interval_ms: Some(config.interval_ms),
445 delay_ms: Some(config.delay_ms),
446 retention: Some(config.retention),
447 max_chunk_size: Some(config.max_chunk_size),
448 prune_concurrency: Some(config.prune_concurrency),
449 }
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456
457 macro_rules! assert_matches {
458 ($value:expr, $pattern:pat $(,)?) => {
459 let value = $value;
460 assert!(
461 matches!(value, $pattern),
462 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
463 stringify!($pattern)
464 );
465 };
466 }
467
468 #[test]
469 fn merge_recursive() {
470 let this = PipelineLayer {
471 sum_displays: Some(SequentialLayer {
472 committer: Some(CommitterLayer {
473 write_concurrency: Some(10),
474 collect_interval_ms: Some(1000),
475 watermark_interval_ms: None,
476 }),
477 checkpoint_lag: Some(100),
478 }),
479 ev_emit_mod: Some(ConcurrentLayer {
480 committer: Some(CommitterLayer {
481 write_concurrency: Some(5),
482 collect_interval_ms: Some(500),
483 watermark_interval_ms: None,
484 }),
485 ..Default::default()
486 }),
487 ..Default::default()
488 };
489
490 let that = PipelineLayer {
491 sum_displays: Some(SequentialLayer {
492 committer: Some(CommitterLayer {
493 write_concurrency: Some(5),
494 collect_interval_ms: None,
495 watermark_interval_ms: Some(500),
496 }),
497 checkpoint_lag: Some(200),
498 }),
499 ev_emit_mod: None,
500 ..Default::default()
501 };
502
503 let this_then_that = this.clone().merge(that.clone()).unwrap();
504 let that_then_this = that.clone().merge(this.clone()).unwrap();
505
506 assert_matches!(
507 this_then_that,
508 PipelineLayer {
509 sum_displays: Some(SequentialLayer {
510 committer: Some(CommitterLayer {
511 write_concurrency: Some(5),
512 collect_interval_ms: Some(1000),
513 watermark_interval_ms: Some(500),
514 }),
515 checkpoint_lag: Some(200),
516 }),
517 ev_emit_mod: Some(ConcurrentLayer {
518 committer: Some(CommitterLayer {
519 write_concurrency: Some(5),
520 collect_interval_ms: Some(500),
521 watermark_interval_ms: None,
522 }),
523 pruner: None,
524 }),
525 ..
526 },
527 );
528
529 assert_matches!(
530 that_then_this,
531 PipelineLayer {
532 sum_displays: Some(SequentialLayer {
533 committer: Some(CommitterLayer {
534 write_concurrency: Some(10),
535 collect_interval_ms: Some(1000),
536 watermark_interval_ms: Some(500),
537 }),
538 checkpoint_lag: Some(100),
539 }),
540 ev_emit_mod: Some(ConcurrentLayer {
541 committer: Some(CommitterLayer {
542 write_concurrency: Some(5),
543 collect_interval_ms: Some(500),
544 watermark_interval_ms: None,
545 }),
546 pruner: None,
547 }),
548 ..
549 },
550 );
551 }
552
553 #[test]
554 fn merge_pruner() {
555 let this = PrunerLayer {
556 interval_ms: None,
557 delay_ms: Some(100),
558 retention: Some(200),
559 max_chunk_size: Some(300),
560 prune_concurrency: Some(1),
561 };
562
563 let that = PrunerLayer {
564 interval_ms: Some(400),
565 delay_ms: None,
566 retention: Some(500),
567 max_chunk_size: Some(600),
568 prune_concurrency: Some(2),
569 };
570
571 let this_then_that = this.clone().merge(that.clone()).unwrap();
572 let that_then_this = that.clone().merge(this.clone()).unwrap();
573
574 assert_matches!(
575 this_then_that,
576 PrunerLayer {
577 interval_ms: Some(400),
578 delay_ms: Some(100),
579 retention: Some(500),
580 max_chunk_size: Some(600),
581 prune_concurrency: Some(2),
582 },
583 );
584
585 assert_matches!(
586 that_then_this,
587 PrunerLayer {
588 interval_ms: Some(400),
589 delay_ms: Some(100),
590 retention: Some(500),
591 max_chunk_size: Some(300),
592 prune_concurrency: Some(1),
593 },
594 );
595 }
596
597 #[test]
598 fn finish_concurrent_unpruned_override() {
599 let layer = ConcurrentLayer {
600 committer: None,
601 pruner: None,
602 };
603
604 let base = ConcurrentConfig {
605 committer: CommitterConfig {
606 write_concurrency: 5,
607 collect_interval_ms: 50,
608 watermark_interval_ms: 500,
609 },
610 pruner: Some(PrunerConfig::default()),
611 };
612
613 assert_matches!(
614 layer.finish(base).unwrap(),
615 ConcurrentConfig {
616 committer: CommitterConfig {
617 write_concurrency: 5,
618 collect_interval_ms: 50,
619 watermark_interval_ms: 500,
620 },
621 pruner: None,
622 },
623 );
624 }
625
626 #[test]
627 fn finish_concurrent_no_pruner() {
628 let layer = ConcurrentLayer {
629 committer: None,
630 pruner: None,
631 };
632
633 let base = ConcurrentConfig {
634 committer: CommitterConfig {
635 write_concurrency: 5,
636 collect_interval_ms: 50,
637 watermark_interval_ms: 500,
638 },
639 pruner: None,
640 };
641
642 assert_matches!(
643 layer.finish(base).unwrap(),
644 ConcurrentConfig {
645 committer: CommitterConfig {
646 write_concurrency: 5,
647 collect_interval_ms: 50,
648 watermark_interval_ms: 500,
649 },
650 pruner: None,
651 },
652 );
653 }
654
655 #[test]
656 fn finish_concurrent_pruner() {
657 let layer = ConcurrentLayer {
658 committer: None,
659 pruner: Some(PrunerLayer {
660 interval_ms: Some(1000),
661 ..Default::default()
662 }),
663 };
664
665 let base = ConcurrentConfig {
666 committer: CommitterConfig {
667 write_concurrency: 5,
668 collect_interval_ms: 50,
669 watermark_interval_ms: 500,
670 },
671 pruner: Some(PrunerConfig {
672 interval_ms: 100,
673 delay_ms: 200,
674 retention: 300,
675 max_chunk_size: 400,
676 prune_concurrency: 1,
677 }),
678 };
679
680 assert_matches!(
681 layer.finish(base).unwrap(),
682 ConcurrentConfig {
683 committer: CommitterConfig {
684 write_concurrency: 5,
685 collect_interval_ms: 50,
686 watermark_interval_ms: 500,
687 },
688 pruner: Some(PrunerConfig {
689 interval_ms: 1000,
690 delay_ms: 200,
691 retention: 300,
692 max_chunk_size: 400,
693 prune_concurrency: 1,
694 }),
695 },
696 );
697 }
698
699 #[test]
700 fn detect_unrecognized_fields() {
701 let err = toml::from_str::<IndexerConfig>(
702 r#"
703 i_dont_exist = "foo"
704 "#,
705 )
706 .unwrap_err();
707
708 assert!(
709 err.to_string().contains("i_dont_exist"),
710 "Unexpected error: {err}"
711 );
712 }
713}