1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11pub trait Merge: Sized {
13 fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20 pub ingestion: IngestionLayer,
22
23 pub committer: CommitterLayer,
26
27 pub pruner: PrunerLayer,
33
34 pub pipeline: PipelineLayer,
36}
37
38#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50 pub checkpoint_buffer_size: Option<usize>,
51 pub ingest_concurrency: Option<usize>,
52 pub retry_interval_ms: Option<u64>,
53 pub streaming_backoff_initial_batch_size: Option<usize>,
54 pub streaming_backoff_max_batch_size: Option<usize>,
55 pub streaming_connection_timeout_ms: Option<u64>,
56 pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63 pub committer: Option<CommitterLayer>,
64 pub checkpoint_lag: Option<u64>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct ConcurrentLayer {
71 pub committer: Option<CommitterLayer>,
72 pub pruner: Option<PrunerLayer>,
73}
74
75#[DefaultConfig]
76#[derive(Clone, Default, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct CommitterLayer {
79 pub write_concurrency: Option<usize>,
80 pub collect_interval_ms: Option<u64>,
81 pub watermark_interval_ms: Option<u64>,
82}
83
84#[DefaultConfig]
85#[derive(Clone, Default, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct PrunerLayer {
88 pub interval_ms: Option<u64>,
89 pub delay_ms: Option<u64>,
90 pub retention: Option<u64>,
91 pub max_chunk_size: Option<u64>,
92 pub prune_concurrency: Option<u64>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(rename_all = "snake_case", deny_unknown_fields)]
98pub struct PipelineLayer {
99 pub sum_displays: Option<SequentialLayer>,
101
102 pub coin_balance_buckets: Option<ConcurrentLayer>,
104 pub obj_info: Option<ConcurrentLayer>,
105 pub cp_sequence_numbers: Option<ConcurrentLayer>,
106 pub ev_emit_mod: Option<ConcurrentLayer>,
107 pub ev_struct_inst: Option<ConcurrentLayer>,
108 pub kv_checkpoints: Option<ConcurrentLayer>,
109 pub kv_epoch_ends: Option<ConcurrentLayer>,
110 pub kv_epoch_starts: Option<ConcurrentLayer>,
111 pub kv_feature_flags: Option<ConcurrentLayer>,
112 pub kv_objects: Option<ConcurrentLayer>,
113 pub kv_packages: Option<ConcurrentLayer>,
114 pub kv_protocol_configs: Option<ConcurrentLayer>,
115 pub kv_transactions: Option<ConcurrentLayer>,
116 pub obj_versions: Option<ConcurrentLayer>,
117 pub tx_affected_addresses: Option<ConcurrentLayer>,
118 pub tx_affected_objects: Option<ConcurrentLayer>,
119 pub tx_balance_changes: Option<ConcurrentLayer>,
120 pub tx_calls: Option<ConcurrentLayer>,
121 pub tx_digests: Option<ConcurrentLayer>,
122 pub tx_kinds: Option<ConcurrentLayer>,
123}
124
125impl IndexerConfig {
126 pub fn example() -> Self {
129 let mut example: Self = Default::default();
130
131 example.ingestion = IngestionConfig::default().into();
132 example.committer = CommitterConfig::default().into();
133 example.pruner = PrunerConfig::default().into();
134 example.pipeline = PipelineLayer::example();
135
136 example
137 }
138
139 pub fn for_test() -> Self {
143 Self::example()
144 .merge(IndexerConfig {
145 ingestion: IngestionLayer {
146 retry_interval_ms: Some(10),
147 ingest_concurrency: Some(1),
148 ..Default::default()
149 },
150 committer: CommitterLayer {
151 collect_interval_ms: Some(50),
152 watermark_interval_ms: Some(50),
153 write_concurrency: Some(1),
154 },
155 pruner: PrunerLayer {
156 interval_ms: Some(50),
157 delay_ms: Some(0),
158 ..Default::default()
159 },
160 ..Default::default()
161 })
162 .expect("Merge failed for test configuration")
163 }
164}
165
166impl IngestionLayer {
167 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
168 Ok(IngestionConfig {
169 checkpoint_buffer_size: self
170 .checkpoint_buffer_size
171 .unwrap_or(base.checkpoint_buffer_size),
172 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
173 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
174 streaming_backoff_initial_batch_size: self
175 .streaming_backoff_initial_batch_size
176 .unwrap_or(base.streaming_backoff_initial_batch_size),
177 streaming_backoff_max_batch_size: self
178 .streaming_backoff_max_batch_size
179 .unwrap_or(base.streaming_backoff_max_batch_size),
180 streaming_connection_timeout_ms: self
181 .streaming_connection_timeout_ms
182 .unwrap_or(base.streaming_connection_timeout_ms),
183 streaming_statement_timeout_ms: self
184 .streaming_statement_timeout_ms
185 .unwrap_or(base.streaming_statement_timeout_ms),
186 })
187 }
188}
189
190impl SequentialLayer {
191 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
192 Ok(SequentialConfig {
193 committer: if let Some(committer) = self.committer {
194 committer.finish(base.committer)?
195 } else {
196 base.committer
197 },
198 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
199 })
200 }
201}
202
203impl ConcurrentLayer {
204 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
207 Ok(ConcurrentConfig {
208 committer: if let Some(committer) = self.committer {
209 committer.finish(base.committer)?
210 } else {
211 base.committer
212 },
213 pruner: match (self.pruner, base.pruner) {
214 (None, _) | (_, None) => None,
215 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
216 },
217 })
218 }
219}
220
221impl CommitterLayer {
222 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
223 Ok(CommitterConfig {
224 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
225 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
226 watermark_interval_ms: self
227 .watermark_interval_ms
228 .unwrap_or(base.watermark_interval_ms),
229 })
230 }
231}
232
233impl PrunerLayer {
234 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
235 Ok(PrunerConfig {
236 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
237 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
238 retention: self.retention.unwrap_or(base.retention),
239 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
240 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
241 })
242 }
243}
244
245impl PipelineLayer {
246 pub fn example() -> Self {
249 PipelineLayer {
250 coin_balance_buckets: Some(Default::default()),
251 obj_info: Some(Default::default()),
252 sum_displays: Some(Default::default()),
253 cp_sequence_numbers: Some(Default::default()),
254 ev_emit_mod: Some(Default::default()),
255 ev_struct_inst: Some(Default::default()),
256 kv_checkpoints: Some(Default::default()),
257 kv_epoch_ends: Some(Default::default()),
258 kv_epoch_starts: Some(Default::default()),
259 kv_feature_flags: Some(Default::default()),
260 kv_objects: Some(Default::default()),
261 kv_packages: Some(Default::default()),
262 kv_protocol_configs: Some(Default::default()),
263 kv_transactions: Some(Default::default()),
264 obj_versions: Some(Default::default()),
265 tx_affected_addresses: Some(Default::default()),
266 tx_affected_objects: Some(Default::default()),
267 tx_balance_changes: Some(Default::default()),
268 tx_calls: Some(Default::default()),
269 tx_digests: Some(Default::default()),
270 tx_kinds: Some(Default::default()),
271 }
272 }
273}
274
275impl Merge for IndexerConfig {
276 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
277 Ok(IndexerConfig {
278 ingestion: self.ingestion.merge(other.ingestion)?,
279 committer: self.committer.merge(other.committer)?,
280 pruner: self.pruner.merge(other.pruner)?,
281 pipeline: self.pipeline.merge(other.pipeline)?,
282 })
283 }
284}
285
286impl Merge for IngestionLayer {
287 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
288 Ok(IngestionLayer {
289 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
290 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
291 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
292 streaming_backoff_initial_batch_size: other
293 .streaming_backoff_initial_batch_size
294 .or(self.streaming_backoff_initial_batch_size),
295 streaming_backoff_max_batch_size: other
296 .streaming_backoff_max_batch_size
297 .or(self.streaming_backoff_max_batch_size),
298 streaming_connection_timeout_ms: other
299 .streaming_connection_timeout_ms
300 .or(self.streaming_connection_timeout_ms),
301 streaming_statement_timeout_ms: other
302 .streaming_statement_timeout_ms
303 .or(self.streaming_statement_timeout_ms),
304 })
305 }
306}
307
308impl Merge for SequentialLayer {
309 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
310 Ok(SequentialLayer {
311 committer: self.committer.merge(other.committer)?,
312 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
313 })
314 }
315}
316
317impl Merge for ConcurrentLayer {
318 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
319 Ok(ConcurrentLayer {
320 committer: self.committer.merge(other.committer)?,
321 pruner: self.pruner.merge(other.pruner)?,
322 })
323 }
324}
325
326impl Merge for CommitterLayer {
327 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
328 Ok(CommitterLayer {
329 write_concurrency: other.write_concurrency.or(self.write_concurrency),
330 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
331 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
332 })
333 }
334}
335
336impl Merge for PrunerLayer {
337 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
340 Ok(PrunerLayer {
341 interval_ms: other.interval_ms.or(self.interval_ms),
342 delay_ms: other.delay_ms.or(self.delay_ms),
343 retention: match (other.retention, self.retention) {
344 (Some(a), Some(b)) => Some(a.max(b)),
345 (Some(a), _) | (_, Some(a)) => Some(a),
346 (None, None) => None,
347 },
348 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
349 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
350 })
351 }
352}
353
354impl Merge for PipelineLayer {
355 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
356 Ok(PipelineLayer {
357 coin_balance_buckets: self
358 .coin_balance_buckets
359 .merge(other.coin_balance_buckets)?,
360 obj_info: self.obj_info.merge(other.obj_info)?,
361 sum_displays: self.sum_displays.merge(other.sum_displays)?,
362 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
363 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
364 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
365 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
366 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
367 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
368 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
369 kv_objects: self.kv_objects.merge(other.kv_objects)?,
370 kv_packages: self.kv_packages.merge(other.kv_packages)?,
371 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
372 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
373 obj_versions: self.obj_versions.merge(other.obj_versions)?,
374 tx_affected_addresses: self
375 .tx_affected_addresses
376 .merge(other.tx_affected_addresses)?,
377 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
378 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
379 tx_calls: self.tx_calls.merge(other.tx_calls)?,
380 tx_digests: self.tx_digests.merge(other.tx_digests)?,
381 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
382 })
383 }
384}
385
386impl<T: Merge> Merge for Option<T> {
387 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
388 Ok(match (self, other) {
389 (Some(a), Some(b)) => Some(a.merge(b)?),
390 (Some(a), _) | (_, Some(a)) => Some(a),
391 (None, None) => None,
392 })
393 }
394}
395
396impl From<IngestionConfig> for IngestionLayer {
397 fn from(config: IngestionConfig) -> Self {
398 Self {
399 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
400 ingest_concurrency: Some(config.ingest_concurrency),
401 retry_interval_ms: Some(config.retry_interval_ms),
402 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
403 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
404 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
405 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
406 }
407 }
408}
409
410impl From<SequentialConfig> for SequentialLayer {
411 fn from(config: SequentialConfig) -> Self {
412 Self {
413 committer: Some(config.committer.into()),
414 checkpoint_lag: Some(config.checkpoint_lag),
415 }
416 }
417}
418
419impl From<ConcurrentConfig> for ConcurrentLayer {
420 fn from(config: ConcurrentConfig) -> Self {
421 Self {
422 committer: Some(config.committer.into()),
423 pruner: config.pruner.map(Into::into),
424 }
425 }
426}
427
428impl From<CommitterConfig> for CommitterLayer {
429 fn from(config: CommitterConfig) -> Self {
430 Self {
431 write_concurrency: Some(config.write_concurrency),
432 collect_interval_ms: Some(config.collect_interval_ms),
433 watermark_interval_ms: Some(config.watermark_interval_ms),
434 }
435 }
436}
437
438impl From<PrunerConfig> for PrunerLayer {
439 fn from(config: PrunerConfig) -> Self {
440 Self {
441 interval_ms: Some(config.interval_ms),
442 delay_ms: Some(config.delay_ms),
443 retention: Some(config.retention),
444 max_chunk_size: Some(config.max_chunk_size),
445 prune_concurrency: Some(config.prune_concurrency),
446 }
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 macro_rules! assert_matches {
455 ($value:expr, $pattern:pat $(,)?) => {
456 let value = $value;
457 assert!(
458 matches!(value, $pattern),
459 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
460 stringify!($pattern)
461 );
462 };
463 }
464
465 #[test]
466 fn merge_recursive() {
467 let this = PipelineLayer {
468 sum_displays: Some(SequentialLayer {
469 committer: Some(CommitterLayer {
470 write_concurrency: Some(10),
471 collect_interval_ms: Some(1000),
472 watermark_interval_ms: None,
473 }),
474 checkpoint_lag: Some(100),
475 }),
476 ev_emit_mod: Some(ConcurrentLayer {
477 committer: Some(CommitterLayer {
478 write_concurrency: Some(5),
479 collect_interval_ms: Some(500),
480 watermark_interval_ms: None,
481 }),
482 ..Default::default()
483 }),
484 ..Default::default()
485 };
486
487 let that = PipelineLayer {
488 sum_displays: Some(SequentialLayer {
489 committer: Some(CommitterLayer {
490 write_concurrency: Some(5),
491 collect_interval_ms: None,
492 watermark_interval_ms: Some(500),
493 }),
494 checkpoint_lag: Some(200),
495 }),
496 ev_emit_mod: None,
497 ..Default::default()
498 };
499
500 let this_then_that = this.clone().merge(that.clone()).unwrap();
501 let that_then_this = that.clone().merge(this.clone()).unwrap();
502
503 assert_matches!(
504 this_then_that,
505 PipelineLayer {
506 sum_displays: Some(SequentialLayer {
507 committer: Some(CommitterLayer {
508 write_concurrency: Some(5),
509 collect_interval_ms: Some(1000),
510 watermark_interval_ms: Some(500),
511 }),
512 checkpoint_lag: Some(200),
513 }),
514 ev_emit_mod: Some(ConcurrentLayer {
515 committer: Some(CommitterLayer {
516 write_concurrency: Some(5),
517 collect_interval_ms: Some(500),
518 watermark_interval_ms: None,
519 }),
520 pruner: None,
521 }),
522 ..
523 },
524 );
525
526 assert_matches!(
527 that_then_this,
528 PipelineLayer {
529 sum_displays: Some(SequentialLayer {
530 committer: Some(CommitterLayer {
531 write_concurrency: Some(10),
532 collect_interval_ms: Some(1000),
533 watermark_interval_ms: Some(500),
534 }),
535 checkpoint_lag: Some(100),
536 }),
537 ev_emit_mod: Some(ConcurrentLayer {
538 committer: Some(CommitterLayer {
539 write_concurrency: Some(5),
540 collect_interval_ms: Some(500),
541 watermark_interval_ms: None,
542 }),
543 pruner: None,
544 }),
545 ..
546 },
547 );
548 }
549
550 #[test]
551 fn merge_pruner() {
552 let this = PrunerLayer {
553 interval_ms: None,
554 delay_ms: Some(100),
555 retention: Some(200),
556 max_chunk_size: Some(300),
557 prune_concurrency: Some(1),
558 };
559
560 let that = PrunerLayer {
561 interval_ms: Some(400),
562 delay_ms: None,
563 retention: Some(500),
564 max_chunk_size: Some(600),
565 prune_concurrency: Some(2),
566 };
567
568 let this_then_that = this.clone().merge(that.clone()).unwrap();
569 let that_then_this = that.clone().merge(this.clone()).unwrap();
570
571 assert_matches!(
572 this_then_that,
573 PrunerLayer {
574 interval_ms: Some(400),
575 delay_ms: Some(100),
576 retention: Some(500),
577 max_chunk_size: Some(600),
578 prune_concurrency: Some(2),
579 },
580 );
581
582 assert_matches!(
583 that_then_this,
584 PrunerLayer {
585 interval_ms: Some(400),
586 delay_ms: Some(100),
587 retention: Some(500),
588 max_chunk_size: Some(300),
589 prune_concurrency: Some(1),
590 },
591 );
592 }
593
594 #[test]
595 fn finish_concurrent_unpruned_override() {
596 let layer = ConcurrentLayer {
597 committer: None,
598 pruner: None,
599 };
600
601 let base = ConcurrentConfig {
602 committer: CommitterConfig {
603 write_concurrency: 5,
604 collect_interval_ms: 50,
605 watermark_interval_ms: 500,
606 },
607 pruner: Some(PrunerConfig::default()),
608 };
609
610 assert_matches!(
611 layer.finish(base).unwrap(),
612 ConcurrentConfig {
613 committer: CommitterConfig {
614 write_concurrency: 5,
615 collect_interval_ms: 50,
616 watermark_interval_ms: 500,
617 },
618 pruner: None,
619 },
620 );
621 }
622
623 #[test]
624 fn finish_concurrent_no_pruner() {
625 let layer = ConcurrentLayer {
626 committer: None,
627 pruner: None,
628 };
629
630 let base = ConcurrentConfig {
631 committer: CommitterConfig {
632 write_concurrency: 5,
633 collect_interval_ms: 50,
634 watermark_interval_ms: 500,
635 },
636 pruner: None,
637 };
638
639 assert_matches!(
640 layer.finish(base).unwrap(),
641 ConcurrentConfig {
642 committer: CommitterConfig {
643 write_concurrency: 5,
644 collect_interval_ms: 50,
645 watermark_interval_ms: 500,
646 },
647 pruner: None,
648 },
649 );
650 }
651
652 #[test]
653 fn finish_concurrent_pruner() {
654 let layer = ConcurrentLayer {
655 committer: None,
656 pruner: Some(PrunerLayer {
657 interval_ms: Some(1000),
658 ..Default::default()
659 }),
660 };
661
662 let base = ConcurrentConfig {
663 committer: CommitterConfig {
664 write_concurrency: 5,
665 collect_interval_ms: 50,
666 watermark_interval_ms: 500,
667 },
668 pruner: Some(PrunerConfig {
669 interval_ms: 100,
670 delay_ms: 200,
671 retention: 300,
672 max_chunk_size: 400,
673 prune_concurrency: 1,
674 }),
675 };
676
677 assert_matches!(
678 layer.finish(base).unwrap(),
679 ConcurrentConfig {
680 committer: CommitterConfig {
681 write_concurrency: 5,
682 collect_interval_ms: 50,
683 watermark_interval_ms: 500,
684 },
685 pruner: Some(PrunerConfig {
686 interval_ms: 1000,
687 delay_ms: 200,
688 retention: 300,
689 max_chunk_size: 400,
690 prune_concurrency: 1,
691 }),
692 },
693 );
694 }
695
696 #[test]
697 fn detect_unrecognized_fields() {
698 let err = toml::from_str::<IndexerConfig>(
699 r#"
700 i_dont_exist = "foo"
701 "#,
702 )
703 .unwrap_err();
704
705 assert!(
706 err.to_string().contains("i_dont_exist"),
707 "Unexpected error: {err}"
708 );
709 }
710}