1use sui_default_config::DefaultConfig;
5use sui_indexer_alt_framework::ingestion::IngestionConfig;
6use sui_indexer_alt_framework::pipeline::CommitterConfig;
7use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
8use sui_indexer_alt_framework::pipeline::concurrent::PrunerConfig;
9use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
10
11pub trait Merge: Sized {
13 fn merge(self, other: Self) -> anyhow::Result<Self>;
14}
15
16#[DefaultConfig]
17#[derive(Clone, Default, Debug)]
18#[serde(deny_unknown_fields)]
19pub struct IndexerConfig {
20 pub ingestion: IngestionLayer,
22
23 pub committer: CommitterLayer,
26
27 pub pruner: PrunerLayer,
33
34 pub pipeline: PipelineLayer,
36}
37
38#[DefaultConfig]
47#[derive(Clone, Default, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct IngestionLayer {
50 pub checkpoint_buffer_size: Option<usize>,
51 pub ingest_concurrency: Option<usize>,
52 pub retry_interval_ms: Option<u64>,
53 pub streaming_backoff_initial_batch_size: Option<usize>,
54 pub streaming_backoff_max_batch_size: Option<usize>,
55 pub streaming_connection_timeout_ms: Option<u64>,
56 pub streaming_statement_timeout_ms: Option<u64>,
57}
58
59#[DefaultConfig]
60#[derive(Clone, Default, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct SequentialLayer {
63 pub committer: Option<CommitterLayer>,
64 pub checkpoint_lag: Option<u64>,
65}
66
67#[DefaultConfig]
68#[derive(Clone, Default, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct ConcurrentLayer {
71 pub committer: Option<CommitterLayer>,
72 pub pruner: Option<PrunerLayer>,
73}
74
75#[DefaultConfig]
76#[derive(Clone, Default, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct CommitterLayer {
79 pub write_concurrency: Option<usize>,
80 pub collect_interval_ms: Option<u64>,
81 pub watermark_interval_ms: Option<u64>,
82}
83
84#[DefaultConfig]
85#[derive(Clone, Default, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct PrunerLayer {
88 pub interval_ms: Option<u64>,
89 pub delay_ms: Option<u64>,
90 pub retention: Option<u64>,
91 pub max_chunk_size: Option<u64>,
92 pub prune_concurrency: Option<u64>,
93}
94
95#[DefaultConfig]
96#[derive(Clone, Default, Debug)]
97#[serde(rename_all = "snake_case", deny_unknown_fields)]
98pub struct PipelineLayer {
99 pub sum_displays: Option<SequentialLayer>,
101
102 pub cp_bloom_blocks: Option<ConcurrentLayer>,
104 pub cp_blooms: Option<ConcurrentLayer>,
105 pub coin_balance_buckets: Option<ConcurrentLayer>,
106 pub obj_info: Option<ConcurrentLayer>,
107 pub cp_sequence_numbers: Option<ConcurrentLayer>,
108 pub ev_emit_mod: Option<ConcurrentLayer>,
109 pub ev_struct_inst: Option<ConcurrentLayer>,
110 pub kv_checkpoints: Option<ConcurrentLayer>,
111 pub kv_epoch_ends: Option<ConcurrentLayer>,
112 pub kv_epoch_starts: Option<ConcurrentLayer>,
113 pub kv_feature_flags: Option<ConcurrentLayer>,
114 pub kv_objects: Option<ConcurrentLayer>,
115 pub kv_packages: Option<ConcurrentLayer>,
116 pub kv_protocol_configs: Option<ConcurrentLayer>,
117 pub kv_transactions: Option<ConcurrentLayer>,
118 pub obj_versions: Option<ConcurrentLayer>,
119 pub tx_affected_addresses: Option<ConcurrentLayer>,
120 pub tx_affected_objects: Option<ConcurrentLayer>,
121 pub tx_balance_changes: Option<ConcurrentLayer>,
122 pub tx_calls: Option<ConcurrentLayer>,
123 pub tx_digests: Option<ConcurrentLayer>,
124 pub tx_kinds: Option<ConcurrentLayer>,
125}
126
127impl IndexerConfig {
128 pub fn example() -> Self {
131 let mut example: Self = Default::default();
132
133 example.ingestion = IngestionConfig::default().into();
134 example.committer = CommitterConfig::default().into();
135 example.pruner = PrunerConfig::default().into();
136 example.pipeline = PipelineLayer::example();
137
138 example
139 }
140
141 pub fn for_test() -> Self {
145 Self::example()
146 .merge(IndexerConfig {
147 ingestion: IngestionLayer {
148 retry_interval_ms: Some(10),
149 ingest_concurrency: Some(1),
150 ..Default::default()
151 },
152 committer: CommitterLayer {
153 collect_interval_ms: Some(50),
154 watermark_interval_ms: Some(50),
155 write_concurrency: Some(1),
156 },
157 pruner: PrunerLayer {
158 interval_ms: Some(50),
159 delay_ms: Some(0),
160 ..Default::default()
161 },
162 ..Default::default()
163 })
164 .expect("Merge failed for test configuration")
165 }
166}
167
168impl IngestionLayer {
169 pub fn finish(self, base: IngestionConfig) -> anyhow::Result<IngestionConfig> {
170 Ok(IngestionConfig {
171 checkpoint_buffer_size: self
172 .checkpoint_buffer_size
173 .unwrap_or(base.checkpoint_buffer_size),
174 ingest_concurrency: self.ingest_concurrency.unwrap_or(base.ingest_concurrency),
175 retry_interval_ms: self.retry_interval_ms.unwrap_or(base.retry_interval_ms),
176 streaming_backoff_initial_batch_size: self
177 .streaming_backoff_initial_batch_size
178 .unwrap_or(base.streaming_backoff_initial_batch_size),
179 streaming_backoff_max_batch_size: self
180 .streaming_backoff_max_batch_size
181 .unwrap_or(base.streaming_backoff_max_batch_size),
182 streaming_connection_timeout_ms: self
183 .streaming_connection_timeout_ms
184 .unwrap_or(base.streaming_connection_timeout_ms),
185 streaming_statement_timeout_ms: self
186 .streaming_statement_timeout_ms
187 .unwrap_or(base.streaming_statement_timeout_ms),
188 })
189 }
190}
191
192impl SequentialLayer {
193 pub fn finish(self, base: SequentialConfig) -> anyhow::Result<SequentialConfig> {
194 Ok(SequentialConfig {
195 committer: if let Some(committer) = self.committer {
196 committer.finish(base.committer)?
197 } else {
198 base.committer
199 },
200 checkpoint_lag: self.checkpoint_lag.unwrap_or(base.checkpoint_lag),
201 })
202 }
203}
204
205impl ConcurrentLayer {
206 pub fn finish(self, base: ConcurrentConfig) -> anyhow::Result<ConcurrentConfig> {
209 Ok(ConcurrentConfig {
210 committer: if let Some(committer) = self.committer {
211 committer.finish(base.committer)?
212 } else {
213 base.committer
214 },
215 pruner: match (self.pruner, base.pruner) {
216 (None, _) | (_, None) => None,
217 (Some(pruner), Some(base)) => Some(pruner.finish(base)?),
218 },
219 })
220 }
221}
222
223impl CommitterLayer {
224 pub fn finish(self, base: CommitterConfig) -> anyhow::Result<CommitterConfig> {
225 Ok(CommitterConfig {
226 write_concurrency: self.write_concurrency.unwrap_or(base.write_concurrency),
227 collect_interval_ms: self.collect_interval_ms.unwrap_or(base.collect_interval_ms),
228 watermark_interval_ms: self
229 .watermark_interval_ms
230 .unwrap_or(base.watermark_interval_ms),
231 watermark_interval_jitter_ms: 0,
232 })
233 }
234}
235
236impl PrunerLayer {
237 pub fn finish(self, base: PrunerConfig) -> anyhow::Result<PrunerConfig> {
238 Ok(PrunerConfig {
239 interval_ms: self.interval_ms.unwrap_or(base.interval_ms),
240 delay_ms: self.delay_ms.unwrap_or(base.delay_ms),
241 retention: self.retention.unwrap_or(base.retention),
242 max_chunk_size: self.max_chunk_size.unwrap_or(base.max_chunk_size),
243 prune_concurrency: self.prune_concurrency.unwrap_or(base.prune_concurrency),
244 })
245 }
246}
247
248impl PipelineLayer {
249 pub fn example() -> Self {
252 PipelineLayer {
253 cp_blooms: Some(Default::default()),
254 cp_bloom_blocks: Some(Default::default()),
255 coin_balance_buckets: Some(Default::default()),
256 obj_info: Some(Default::default()),
257 sum_displays: Some(Default::default()),
258 cp_sequence_numbers: Some(Default::default()),
259 ev_emit_mod: Some(Default::default()),
260 ev_struct_inst: Some(Default::default()),
261 kv_checkpoints: Some(Default::default()),
262 kv_epoch_ends: Some(Default::default()),
263 kv_epoch_starts: Some(Default::default()),
264 kv_feature_flags: Some(Default::default()),
265 kv_objects: Some(Default::default()),
266 kv_packages: Some(Default::default()),
267 kv_protocol_configs: Some(Default::default()),
268 kv_transactions: Some(Default::default()),
269 obj_versions: Some(Default::default()),
270 tx_affected_addresses: Some(Default::default()),
271 tx_affected_objects: Some(Default::default()),
272 tx_balance_changes: Some(Default::default()),
273 tx_calls: Some(Default::default()),
274 tx_digests: Some(Default::default()),
275 tx_kinds: Some(Default::default()),
276 }
277 }
278}
279
280impl Merge for IndexerConfig {
281 fn merge(self, other: IndexerConfig) -> anyhow::Result<IndexerConfig> {
282 Ok(IndexerConfig {
283 ingestion: self.ingestion.merge(other.ingestion)?,
284 committer: self.committer.merge(other.committer)?,
285 pruner: self.pruner.merge(other.pruner)?,
286 pipeline: self.pipeline.merge(other.pipeline)?,
287 })
288 }
289}
290
291impl Merge for IngestionLayer {
292 fn merge(self, other: IngestionLayer) -> anyhow::Result<IngestionLayer> {
293 Ok(IngestionLayer {
294 checkpoint_buffer_size: other.checkpoint_buffer_size.or(self.checkpoint_buffer_size),
295 ingest_concurrency: other.ingest_concurrency.or(self.ingest_concurrency),
296 retry_interval_ms: other.retry_interval_ms.or(self.retry_interval_ms),
297 streaming_backoff_initial_batch_size: other
298 .streaming_backoff_initial_batch_size
299 .or(self.streaming_backoff_initial_batch_size),
300 streaming_backoff_max_batch_size: other
301 .streaming_backoff_max_batch_size
302 .or(self.streaming_backoff_max_batch_size),
303 streaming_connection_timeout_ms: other
304 .streaming_connection_timeout_ms
305 .or(self.streaming_connection_timeout_ms),
306 streaming_statement_timeout_ms: other
307 .streaming_statement_timeout_ms
308 .or(self.streaming_statement_timeout_ms),
309 })
310 }
311}
312
313impl Merge for SequentialLayer {
314 fn merge(self, other: SequentialLayer) -> anyhow::Result<SequentialLayer> {
315 Ok(SequentialLayer {
316 committer: self.committer.merge(other.committer)?,
317 checkpoint_lag: other.checkpoint_lag.or(self.checkpoint_lag),
318 })
319 }
320}
321
322impl Merge for ConcurrentLayer {
323 fn merge(self, other: ConcurrentLayer) -> anyhow::Result<ConcurrentLayer> {
324 Ok(ConcurrentLayer {
325 committer: self.committer.merge(other.committer)?,
326 pruner: self.pruner.merge(other.pruner)?,
327 })
328 }
329}
330
331impl Merge for CommitterLayer {
332 fn merge(self, other: CommitterLayer) -> anyhow::Result<CommitterLayer> {
333 Ok(CommitterLayer {
334 write_concurrency: other.write_concurrency.or(self.write_concurrency),
335 collect_interval_ms: other.collect_interval_ms.or(self.collect_interval_ms),
336 watermark_interval_ms: other.watermark_interval_ms.or(self.watermark_interval_ms),
337 })
338 }
339}
340
341impl Merge for PrunerLayer {
342 fn merge(self, other: PrunerLayer) -> anyhow::Result<PrunerLayer> {
345 Ok(PrunerLayer {
346 interval_ms: other.interval_ms.or(self.interval_ms),
347 delay_ms: other.delay_ms.or(self.delay_ms),
348 retention: match (other.retention, self.retention) {
349 (Some(a), Some(b)) => Some(a.max(b)),
350 (Some(a), _) | (_, Some(a)) => Some(a),
351 (None, None) => None,
352 },
353 max_chunk_size: other.max_chunk_size.or(self.max_chunk_size),
354 prune_concurrency: other.prune_concurrency.or(self.prune_concurrency),
355 })
356 }
357}
358
359impl Merge for PipelineLayer {
360 fn merge(self, other: PipelineLayer) -> anyhow::Result<PipelineLayer> {
361 Ok(PipelineLayer {
362 cp_blooms: self.cp_blooms.merge(other.cp_blooms)?,
363 cp_bloom_blocks: self.cp_bloom_blocks.merge(other.cp_bloom_blocks)?,
364 coin_balance_buckets: self
365 .coin_balance_buckets
366 .merge(other.coin_balance_buckets)?,
367 obj_info: self.obj_info.merge(other.obj_info)?,
368 sum_displays: self.sum_displays.merge(other.sum_displays)?,
369 cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers)?,
370 ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod)?,
371 ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst)?,
372 kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints)?,
373 kv_epoch_ends: self.kv_epoch_ends.merge(other.kv_epoch_ends)?,
374 kv_epoch_starts: self.kv_epoch_starts.merge(other.kv_epoch_starts)?,
375 kv_feature_flags: self.kv_feature_flags.merge(other.kv_feature_flags)?,
376 kv_objects: self.kv_objects.merge(other.kv_objects)?,
377 kv_packages: self.kv_packages.merge(other.kv_packages)?,
378 kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs)?,
379 kv_transactions: self.kv_transactions.merge(other.kv_transactions)?,
380 obj_versions: self.obj_versions.merge(other.obj_versions)?,
381 tx_affected_addresses: self
382 .tx_affected_addresses
383 .merge(other.tx_affected_addresses)?,
384 tx_affected_objects: self.tx_affected_objects.merge(other.tx_affected_objects)?,
385 tx_balance_changes: self.tx_balance_changes.merge(other.tx_balance_changes)?,
386 tx_calls: self.tx_calls.merge(other.tx_calls)?,
387 tx_digests: self.tx_digests.merge(other.tx_digests)?,
388 tx_kinds: self.tx_kinds.merge(other.tx_kinds)?,
389 })
390 }
391}
392
393impl<T: Merge> Merge for Option<T> {
394 fn merge(self, other: Option<T>) -> anyhow::Result<Option<T>> {
395 Ok(match (self, other) {
396 (Some(a), Some(b)) => Some(a.merge(b)?),
397 (Some(a), _) | (_, Some(a)) => Some(a),
398 (None, None) => None,
399 })
400 }
401}
402
403impl From<IngestionConfig> for IngestionLayer {
404 fn from(config: IngestionConfig) -> Self {
405 Self {
406 checkpoint_buffer_size: Some(config.checkpoint_buffer_size),
407 ingest_concurrency: Some(config.ingest_concurrency),
408 retry_interval_ms: Some(config.retry_interval_ms),
409 streaming_backoff_initial_batch_size: Some(config.streaming_backoff_initial_batch_size),
410 streaming_backoff_max_batch_size: Some(config.streaming_backoff_max_batch_size),
411 streaming_connection_timeout_ms: Some(config.streaming_connection_timeout_ms),
412 streaming_statement_timeout_ms: Some(config.streaming_statement_timeout_ms),
413 }
414 }
415}
416
417impl From<SequentialConfig> for SequentialLayer {
418 fn from(config: SequentialConfig) -> Self {
419 Self {
420 committer: Some(config.committer.into()),
421 checkpoint_lag: Some(config.checkpoint_lag),
422 }
423 }
424}
425
426impl From<ConcurrentConfig> for ConcurrentLayer {
427 fn from(config: ConcurrentConfig) -> Self {
428 Self {
429 committer: Some(config.committer.into()),
430 pruner: config.pruner.map(Into::into),
431 }
432 }
433}
434
435impl From<CommitterConfig> for CommitterLayer {
436 fn from(config: CommitterConfig) -> Self {
437 Self {
438 write_concurrency: Some(config.write_concurrency),
439 collect_interval_ms: Some(config.collect_interval_ms),
440 watermark_interval_ms: Some(config.watermark_interval_ms),
441 }
442 }
443}
444
445impl From<PrunerConfig> for PrunerLayer {
446 fn from(config: PrunerConfig) -> Self {
447 Self {
448 interval_ms: Some(config.interval_ms),
449 delay_ms: Some(config.delay_ms),
450 retention: Some(config.retention),
451 max_chunk_size: Some(config.max_chunk_size),
452 prune_concurrency: Some(config.prune_concurrency),
453 }
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 macro_rules! assert_matches {
462 ($value:expr, $pattern:pat $(,)?) => {
463 let value = $value;
464 assert!(
465 matches!(value, $pattern),
466 "Did not match pattern:\nexpected: {}\nactual: {value:#?}",
467 stringify!($pattern)
468 );
469 };
470 }
471
472 #[test]
473 fn merge_recursive() {
474 let this = PipelineLayer {
475 sum_displays: Some(SequentialLayer {
476 committer: Some(CommitterLayer {
477 write_concurrency: Some(10),
478 collect_interval_ms: Some(1000),
479 watermark_interval_ms: None,
480 }),
481 checkpoint_lag: Some(100),
482 }),
483 ev_emit_mod: Some(ConcurrentLayer {
484 committer: Some(CommitterLayer {
485 write_concurrency: Some(5),
486 collect_interval_ms: Some(500),
487 watermark_interval_ms: None,
488 }),
489 ..Default::default()
490 }),
491 ..Default::default()
492 };
493
494 let that = PipelineLayer {
495 sum_displays: Some(SequentialLayer {
496 committer: Some(CommitterLayer {
497 write_concurrency: Some(5),
498 collect_interval_ms: None,
499 watermark_interval_ms: Some(500),
500 }),
501 checkpoint_lag: Some(200),
502 }),
503 ev_emit_mod: None,
504 ..Default::default()
505 };
506
507 let this_then_that = this.clone().merge(that.clone()).unwrap();
508 let that_then_this = that.clone().merge(this.clone()).unwrap();
509
510 assert_matches!(
511 this_then_that,
512 PipelineLayer {
513 sum_displays: Some(SequentialLayer {
514 committer: Some(CommitterLayer {
515 write_concurrency: Some(5),
516 collect_interval_ms: Some(1000),
517 watermark_interval_ms: Some(500),
518 }),
519 checkpoint_lag: Some(200),
520 }),
521 ev_emit_mod: Some(ConcurrentLayer {
522 committer: Some(CommitterLayer {
523 write_concurrency: Some(5),
524 collect_interval_ms: Some(500),
525 watermark_interval_ms: None,
526 }),
527 pruner: None,
528 }),
529 ..
530 },
531 );
532
533 assert_matches!(
534 that_then_this,
535 PipelineLayer {
536 sum_displays: Some(SequentialLayer {
537 committer: Some(CommitterLayer {
538 write_concurrency: Some(10),
539 collect_interval_ms: Some(1000),
540 watermark_interval_ms: Some(500),
541 }),
542 checkpoint_lag: Some(100),
543 }),
544 ev_emit_mod: Some(ConcurrentLayer {
545 committer: Some(CommitterLayer {
546 write_concurrency: Some(5),
547 collect_interval_ms: Some(500),
548 watermark_interval_ms: None,
549 }),
550 pruner: None,
551 }),
552 ..
553 },
554 );
555 }
556
557 #[test]
558 fn merge_pruner() {
559 let this = PrunerLayer {
560 interval_ms: None,
561 delay_ms: Some(100),
562 retention: Some(200),
563 max_chunk_size: Some(300),
564 prune_concurrency: Some(1),
565 };
566
567 let that = PrunerLayer {
568 interval_ms: Some(400),
569 delay_ms: None,
570 retention: Some(500),
571 max_chunk_size: Some(600),
572 prune_concurrency: Some(2),
573 };
574
575 let this_then_that = this.clone().merge(that.clone()).unwrap();
576 let that_then_this = that.clone().merge(this.clone()).unwrap();
577
578 assert_matches!(
579 this_then_that,
580 PrunerLayer {
581 interval_ms: Some(400),
582 delay_ms: Some(100),
583 retention: Some(500),
584 max_chunk_size: Some(600),
585 prune_concurrency: Some(2),
586 },
587 );
588
589 assert_matches!(
590 that_then_this,
591 PrunerLayer {
592 interval_ms: Some(400),
593 delay_ms: Some(100),
594 retention: Some(500),
595 max_chunk_size: Some(300),
596 prune_concurrency: Some(1),
597 },
598 );
599 }
600
601 #[test]
602 fn finish_concurrent_unpruned_override() {
603 let layer = ConcurrentLayer {
604 committer: None,
605 pruner: None,
606 };
607
608 let base = ConcurrentConfig {
609 committer: CommitterConfig {
610 write_concurrency: 5,
611 collect_interval_ms: 50,
612 watermark_interval_ms: 500,
613 ..Default::default()
614 },
615 pruner: Some(PrunerConfig::default()),
616 };
617
618 assert_matches!(
619 layer.finish(base).unwrap(),
620 ConcurrentConfig {
621 committer: CommitterConfig {
622 write_concurrency: 5,
623 collect_interval_ms: 50,
624 watermark_interval_ms: 500,
625 ..
626 },
627 pruner: None,
628 },
629 );
630 }
631
632 #[test]
633 fn finish_concurrent_no_pruner() {
634 let layer = ConcurrentLayer {
635 committer: None,
636 pruner: None,
637 };
638
639 let base = ConcurrentConfig {
640 committer: CommitterConfig {
641 write_concurrency: 5,
642 collect_interval_ms: 50,
643 watermark_interval_ms: 500,
644 ..Default::default()
645 },
646 pruner: None,
647 };
648
649 assert_matches!(
650 layer.finish(base).unwrap(),
651 ConcurrentConfig {
652 committer: CommitterConfig {
653 write_concurrency: 5,
654 collect_interval_ms: 50,
655 watermark_interval_ms: 500,
656 ..
657 },
658 pruner: None,
659 },
660 );
661 }
662
663 #[test]
664 fn finish_concurrent_pruner() {
665 let layer = ConcurrentLayer {
666 committer: None,
667 pruner: Some(PrunerLayer {
668 interval_ms: Some(1000),
669 ..Default::default()
670 }),
671 };
672
673 let base = ConcurrentConfig {
674 committer: CommitterConfig {
675 write_concurrency: 5,
676 collect_interval_ms: 50,
677 watermark_interval_ms: 500,
678 ..Default::default()
679 },
680 pruner: Some(PrunerConfig {
681 interval_ms: 100,
682 delay_ms: 200,
683 retention: 300,
684 max_chunk_size: 400,
685 prune_concurrency: 1,
686 }),
687 };
688
689 assert_matches!(
690 layer.finish(base).unwrap(),
691 ConcurrentConfig {
692 committer: CommitterConfig {
693 write_concurrency: 5,
694 collect_interval_ms: 50,
695 watermark_interval_ms: 500,
696 ..
697 },
698 pruner: Some(PrunerConfig {
699 interval_ms: 1000,
700 delay_ms: 200,
701 retention: 300,
702 max_chunk_size: 400,
703 prune_concurrency: 1,
704 }),
705 },
706 );
707 }
708
709 #[test]
710 fn detect_unrecognized_fields() {
711 let err = toml::from_str::<IndexerConfig>(
712 r#"
713 i_dont_exist = "foo"
714 "#,
715 )
716 .unwrap_err();
717
718 assert!(
719 err.to_string().contains("i_dont_exist"),
720 "Unexpected error: {err}"
721 );
722 }
723}