1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55 #[arg(long)]
62 pub first_checkpoint: Option<u64>,
63
64 #[arg(long)]
67 pub last_checkpoint: Option<u64>,
68
69 #[arg(long, action = clap::ArgAction::Append)]
72 pub pipeline: Vec<String>,
73
74 #[clap(flatten)]
76 pub task: TaskArgs,
77}
78
79#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82 #[arg(long, requires = "reader_interval_ms")]
94 task: Option<String>,
95
96 #[arg(long, requires = "task")]
107 reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111 store: S,
115
116 metrics: Arc<IndexerMetrics>,
118
119 ingestion_service: IngestionService,
121
122 first_checkpoint: Option<u64>,
125
126 last_checkpoint: Option<u64>,
129
130 next_checkpoint: u64,
133
134 next_sequential_checkpoint: Option<u64>,
137
138 task: Option<Task>,
150
151 enabled_pipelines: Option<BTreeSet<String>>,
155
156 added_pipelines: BTreeSet<&'static str>,
159
160 pipelines: Vec<Service>,
162}
163
164#[derive(Clone)]
166pub(crate) struct Task {
167 task: String,
170 reader_interval: Duration,
173}
174
175impl TaskArgs {
176 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
177 Self {
178 task: Some(task),
179 reader_interval_ms: Some(reader_interval_ms),
180 }
181 }
182
183 fn into_task(self) -> Option<Task> {
184 Some(Task {
185 task: self.task?,
186 reader_interval: Duration::from_millis(self.reader_interval_ms?),
187 })
188 }
189}
190
191impl<S: Store> Indexer<S> {
192 pub async fn new(
204 store: S,
205 indexer_args: IndexerArgs,
206 client_args: ClientArgs,
207 ingestion_config: IngestionConfig,
208 metrics_prefix: Option<&str>,
209 registry: &Registry,
210 ) -> Result<Self> {
211 let IndexerArgs {
212 first_checkpoint,
213 last_checkpoint,
214 pipeline,
215 task,
216 } = indexer_args;
217
218 let metrics = IndexerMetrics::new(metrics_prefix, registry);
219
220 let ingestion_service =
221 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
222
223 Ok(Self {
224 store,
225 metrics,
226 ingestion_service,
227 first_checkpoint,
228 last_checkpoint,
229 next_checkpoint: u64::MAX,
230 next_sequential_checkpoint: None,
231 task: task.into_task(),
232 enabled_pipelines: if pipeline.is_empty() {
233 None
234 } else {
235 Some(pipeline.into_iter().collect())
236 },
237 added_pipelines: BTreeSet::new(),
238 pipelines: vec![],
239 })
240 }
241
242 pub fn store(&self) -> &S {
244 &self.store
245 }
246
247 pub fn ingestion_client(&self) -> &IngestionClient {
249 self.ingestion_service.ingestion_client()
250 }
251
252 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
254 &self.metrics
255 }
256
257 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
259 self.ingestion_service.metrics()
260 }
261
262 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
264 self.added_pipelines.iter().copied().filter(|p| {
265 self.enabled_pipelines
266 .as_ref()
267 .is_none_or(|e| e.contains(*p))
268 })
269 }
270
271 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
275 self.next_sequential_checkpoint
276 }
277
278 pub async fn run(self) -> anyhow::Result<Service> {
284 if let Some(enabled_pipelines) = self.enabled_pipelines {
285 ensure!(
286 enabled_pipelines.is_empty(),
287 "Tried to enable pipelines that this indexer does not know about: \
288 {enabled_pipelines:#?}",
289 );
290 }
291
292 let start = self.next_checkpoint;
293 let end = self.last_checkpoint;
294 info!(start, end, "Ingestion range");
295
296 let mut service = self
297 .ingestion_service
298 .run(
299 (
300 Bound::Included(start),
301 end.map_or(Bound::Unbounded, Bound::Included),
302 ),
303 self.next_sequential_checkpoint,
304 )
305 .await
306 .context("Failed to start ingestion service")?;
307
308 for pipeline in self.pipelines {
309 service = service.merge(pipeline);
310 }
311
312 Ok(service)
313 }
314
315 async fn add_pipeline<P: Processor + 'static>(
324 &mut self,
325 pipeline_task: String,
326 ) -> Result<Option<u64>> {
327 ensure!(
328 self.added_pipelines.insert(P::NAME),
329 "Pipeline {:?} already added",
330 P::NAME,
331 );
332
333 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
334 && !enabled_pipelines.remove(P::NAME)
335 {
336 info!(pipeline = P::NAME, "Skipping");
337 return Ok(None);
338 }
339
340 let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
343 let mut conn = self.store.connect().await?;
344 let init_watermark = conn
345 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
346 .await
347 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
348
349 let next_checkpoint = if let Some(init_watermark) = init_watermark {
350 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
351 checkpoint_hi_inclusive + 1
352 } else {
353 0
354 }
355 } else {
356 proposed_next_checkpoint
357 };
358
359 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
360
361 Ok(Some(next_checkpoint))
362 }
363}
364
365impl<S: ConcurrentStore> Indexer<S> {
366 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
373 &mut self,
374 handler: H,
375 config: ConcurrentConfig,
376 ) -> Result<()> {
377 let pipeline_task =
378 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
379 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task).await? else {
380 return Ok(());
381 };
382
383 self.pipelines.push(concurrent::pipeline::<H>(
384 handler,
385 next_checkpoint,
386 config,
387 self.store.clone(),
388 self.task.clone(),
389 self.ingestion_service.subscribe().0,
390 self.metrics.clone(),
391 ));
392
393 Ok(())
394 }
395}
396
397impl<T: SequentialStore> Indexer<T> {
398 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
409 &mut self,
410 handler: H,
411 config: SequentialConfig,
412 ) -> Result<()> {
413 if self.task.is_some() {
414 bail!(
415 "Sequential pipelines do not support pipeline tasks. \
416 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417 Running the same pipeline under a different task would violate these guarantees."
418 );
419 }
420
421 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned()).await? else {
422 return Ok(());
423 };
424
425 self.next_sequential_checkpoint = Some(
427 self.next_sequential_checkpoint
428 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
429 );
430
431 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
432
433 self.pipelines.push(sequential::pipeline::<H>(
434 handler,
435 next_checkpoint,
436 config,
437 self.store.clone(),
438 checkpoint_rx,
439 commit_hi_tx,
440 self.metrics.clone(),
441 ));
442
443 Ok(())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use std::sync::Arc;
450
451 use async_trait::async_trait;
452 use clap::Parser;
453 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
454 use sui_synthetic_ingestion::synthetic_ingestion;
455 use tokio::sync::watch;
456
457 use crate::FieldCount;
458 use crate::config::ConcurrencyConfig;
459 use crate::ingestion::ingestion_client::IngestionClientArgs;
460 use crate::mocks::store::MockStore;
461 use crate::pipeline::CommitterConfig;
462 use crate::pipeline::Processor;
463 use crate::pipeline::concurrent::ConcurrentConfig;
464 use crate::store::CommitterWatermark;
465 use crate::store::ConcurrentConnection as _;
466 use crate::store::Connection as _;
467
468 use super::*;
469
470 #[allow(dead_code)]
471 #[derive(Clone, FieldCount)]
472 struct MockValue(u64);
473
474 struct ControllableHandler {
476 process_below: watch::Receiver<u64>,
478 }
479
480 impl ControllableHandler {
481 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
482 let (tx, rx) = watch::channel(limit);
483 (Self { process_below: rx }, tx)
484 }
485 }
486
487 #[async_trait]
488 impl Processor for ControllableHandler {
489 const NAME: &'static str = "controllable";
490 type Value = MockValue;
491
492 async fn process(
493 &self,
494 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
495 ) -> anyhow::Result<Vec<Self::Value>> {
496 let cp_num = checkpoint.summary.sequence_number;
497
498 self.process_below
500 .clone()
501 .wait_for(|&limit| cp_num <= limit)
502 .await
503 .ok();
504
505 Ok(vec![MockValue(cp_num)])
506 }
507 }
508
509 #[async_trait]
510 impl concurrent::Handler for ControllableHandler {
511 type Store = MockStore;
512 type Batch = Vec<MockValue>;
513
514 fn batch(
515 &self,
516 batch: &mut Self::Batch,
517 values: &mut std::vec::IntoIter<Self::Value>,
518 ) -> concurrent::BatchStatus {
519 batch.extend(values);
520 concurrent::BatchStatus::Ready
521 }
522
523 async fn commit<'a>(
524 &self,
525 batch: &Self::Batch,
526 conn: &mut <Self::Store as Store>::Connection<'a>,
527 ) -> anyhow::Result<usize> {
528 for value in batch {
529 conn.0
530 .commit_data(Self::NAME, value.0, vec![value.0])
531 .await?;
532 }
533 Ok(batch.len())
534 }
535 }
536
537 macro_rules! test_pipeline {
538 ($handler:ident, $name:literal) => {
539 struct $handler;
540
541 #[async_trait]
542 impl Processor for $handler {
543 const NAME: &'static str = $name;
544 type Value = MockValue;
545 async fn process(
546 &self,
547 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
548 ) -> anyhow::Result<Vec<Self::Value>> {
549 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
550 }
551 }
552
553 #[async_trait]
554 impl crate::pipeline::concurrent::Handler for $handler {
555 type Store = MockStore;
556 type Batch = Vec<Self::Value>;
557
558 fn batch(
559 &self,
560 batch: &mut Self::Batch,
561 values: &mut std::vec::IntoIter<Self::Value>,
562 ) -> crate::pipeline::concurrent::BatchStatus {
563 batch.extend(values);
564 crate::pipeline::concurrent::BatchStatus::Pending
565 }
566
567 async fn commit<'a>(
568 &self,
569 batch: &Self::Batch,
570 conn: &mut <Self::Store as Store>::Connection<'a>,
571 ) -> anyhow::Result<usize> {
572 for value in batch {
573 conn.0
574 .commit_data(Self::NAME, value.0, vec![value.0])
575 .await?;
576 }
577 Ok(batch.len())
578 }
579 }
580
581 #[async_trait]
582 impl crate::pipeline::sequential::Handler for $handler {
583 type Store = MockStore;
584 type Batch = Vec<Self::Value>;
585
586 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
587 batch.extend(values);
588 }
589
590 async fn commit<'a>(
591 &self,
592 _batch: &Self::Batch,
593 _conn: &mut <Self::Store as Store>::Connection<'a>,
594 ) -> anyhow::Result<usize> {
595 Ok(1)
596 }
597 }
598 };
599 }
600
601 test_pipeline!(MockHandler, "test_processor");
602 test_pipeline!(SequentialHandler, "sequential_handler");
603 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
604
605 async fn test_init_watermark(
606 first_checkpoint: Option<u64>,
607 is_concurrent: bool,
608 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
609 let registry = Registry::new();
610 let store = MockStore::default();
611
612 test_pipeline!(A, "pipeline_name");
613
614 let mut conn = store.connect().await.unwrap();
615
616 let indexer_args = IndexerArgs {
617 first_checkpoint,
618 ..IndexerArgs::default()
619 };
620 let temp_dir = tempfile::tempdir().unwrap();
621 let client_args = ClientArgs {
622 ingestion: IngestionClientArgs {
623 local_ingestion_path: Some(temp_dir.path().to_owned()),
624 ..Default::default()
625 },
626 ..Default::default()
627 };
628 let ingestion_config = IngestionConfig::default();
629
630 let mut indexer = Indexer::new(
631 store.clone(),
632 indexer_args,
633 client_args,
634 ingestion_config,
635 None,
636 ®istry,
637 )
638 .await
639 .unwrap();
640
641 if is_concurrent {
642 indexer
643 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
644 .await
645 .unwrap();
646 } else {
647 indexer
648 .sequential_pipeline::<A>(A, SequentialConfig::default())
649 .await
650 .unwrap();
651 }
652
653 (
654 conn.committer_watermark(A::NAME).await.unwrap(),
655 conn.pruner_watermark(A::NAME, Duration::ZERO)
656 .await
657 .unwrap(),
658 )
659 }
660
661 #[test]
662 fn test_arg_parsing() {
663 #[derive(Parser)]
664 struct Args {
665 #[clap(flatten)]
666 indexer: IndexerArgs,
667 }
668
669 let args = Args::try_parse_from([
670 "cmd",
671 "--first-checkpoint",
672 "10",
673 "--last-checkpoint",
674 "100",
675 "--pipeline",
676 "a",
677 "--pipeline",
678 "b",
679 "--task",
680 "t",
681 "--reader-interval-ms",
682 "5000",
683 ])
684 .unwrap();
685
686 assert_eq!(args.indexer.first_checkpoint, Some(10));
687 assert_eq!(args.indexer.last_checkpoint, Some(100));
688 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
689 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
690 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
691 }
692
693 #[tokio::test]
695 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
696 let registry = Registry::new();
697 let store = MockStore::default();
698
699 test_pipeline!(A, "concurrent_a");
700 test_pipeline!(B, "concurrent_b");
701 test_pipeline!(C, "sequential_c");
702 test_pipeline!(D, "sequential_d");
703
704 let mut conn = store.connect().await.unwrap();
705
706 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
707 conn.set_committer_watermark(
708 A::NAME,
709 CommitterWatermark {
710 checkpoint_hi_inclusive: 100,
711 ..Default::default()
712 },
713 )
714 .await
715 .unwrap();
716
717 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
718 conn.set_committer_watermark(
719 B::NAME,
720 CommitterWatermark {
721 checkpoint_hi_inclusive: 10,
722 ..Default::default()
723 },
724 )
725 .await
726 .unwrap();
727
728 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
729 conn.set_committer_watermark(
730 C::NAME,
731 CommitterWatermark {
732 checkpoint_hi_inclusive: 1,
733 ..Default::default()
734 },
735 )
736 .await
737 .unwrap();
738
739 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
740 conn.set_committer_watermark(
741 D::NAME,
742 CommitterWatermark {
743 checkpoint_hi_inclusive: 50,
744 ..Default::default()
745 },
746 )
747 .await
748 .unwrap();
749
750 let indexer_args = IndexerArgs::default();
751 let temp_dir = tempfile::tempdir().unwrap();
752 let client_args = ClientArgs {
753 ingestion: IngestionClientArgs {
754 local_ingestion_path: Some(temp_dir.path().to_owned()),
755 ..Default::default()
756 },
757 ..Default::default()
758 };
759 let ingestion_config = IngestionConfig::default();
760
761 let mut indexer = Indexer::new(
762 store,
763 indexer_args,
764 client_args,
765 ingestion_config,
766 None,
767 ®istry,
768 )
769 .await
770 .unwrap();
771
772 indexer
773 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
774 .await
775 .unwrap();
776 indexer
777 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
778 .await
779 .unwrap();
780 indexer
781 .sequential_pipeline::<C>(C, SequentialConfig::default())
782 .await
783 .unwrap();
784 indexer
785 .sequential_pipeline::<D>(D, SequentialConfig::default())
786 .await
787 .unwrap();
788
789 assert_eq!(indexer.first_checkpoint, None);
790 assert_eq!(indexer.last_checkpoint, None);
791 assert_eq!(indexer.next_checkpoint, 2);
792 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
793 }
794
795 #[tokio::test]
797 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
798 let registry = Registry::new();
799 let store = MockStore::default();
800
801 test_pipeline!(A, "concurrent_a");
802 test_pipeline!(B, "concurrent_b");
803 test_pipeline!(C, "sequential_c");
804 test_pipeline!(D, "sequential_d");
805
806 let mut conn = store.connect().await.unwrap();
807 conn.set_committer_watermark(
808 B::NAME,
809 CommitterWatermark {
810 checkpoint_hi_inclusive: 10,
811 ..Default::default()
812 },
813 )
814 .await
815 .unwrap();
816 conn.set_committer_watermark(
817 C::NAME,
818 CommitterWatermark {
819 checkpoint_hi_inclusive: 1,
820 ..Default::default()
821 },
822 )
823 .await
824 .unwrap();
825
826 let indexer_args = IndexerArgs::default();
827 let temp_dir = tempfile::tempdir().unwrap();
828 let client_args = ClientArgs {
829 ingestion: IngestionClientArgs {
830 local_ingestion_path: Some(temp_dir.path().to_owned()),
831 ..Default::default()
832 },
833 ..Default::default()
834 };
835 let ingestion_config = IngestionConfig::default();
836
837 let mut indexer = Indexer::new(
838 store,
839 indexer_args,
840 client_args,
841 ingestion_config,
842 None,
843 ®istry,
844 )
845 .await
846 .unwrap();
847
848 indexer
849 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
850 .await
851 .unwrap();
852 indexer
853 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
854 .await
855 .unwrap();
856 indexer
857 .sequential_pipeline::<C>(C, SequentialConfig::default())
858 .await
859 .unwrap();
860 indexer
861 .sequential_pipeline::<D>(D, SequentialConfig::default())
862 .await
863 .unwrap();
864
865 assert_eq!(indexer.first_checkpoint, None);
866 assert_eq!(indexer.last_checkpoint, None);
867 assert_eq!(indexer.next_checkpoint, 0);
868 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
869 }
870
871 #[tokio::test]
873 async fn test_next_checkpoint_smallest_is_0() {
874 let registry = Registry::new();
875 let store = MockStore::default();
876
877 test_pipeline!(A, "concurrent_a");
878 test_pipeline!(B, "concurrent_b");
879 test_pipeline!(C, "sequential_c");
880 test_pipeline!(D, "sequential_d");
881
882 let mut conn = store.connect().await.unwrap();
883 conn.set_committer_watermark(
884 A::NAME,
885 CommitterWatermark {
886 checkpoint_hi_inclusive: 100,
887 ..Default::default()
888 },
889 )
890 .await
891 .unwrap();
892 conn.set_committer_watermark(
893 B::NAME,
894 CommitterWatermark {
895 checkpoint_hi_inclusive: 10,
896 ..Default::default()
897 },
898 )
899 .await
900 .unwrap();
901 conn.set_committer_watermark(
902 C::NAME,
903 CommitterWatermark {
904 checkpoint_hi_inclusive: 1,
905 ..Default::default()
906 },
907 )
908 .await
909 .unwrap();
910 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
911 .await
912 .unwrap();
913
914 let indexer_args = IndexerArgs::default();
915 let temp_dir = tempfile::tempdir().unwrap();
916 let client_args = ClientArgs {
917 ingestion: IngestionClientArgs {
918 local_ingestion_path: Some(temp_dir.path().to_owned()),
919 ..Default::default()
920 },
921 ..Default::default()
922 };
923 let ingestion_config = IngestionConfig::default();
924
925 let mut indexer = Indexer::new(
926 store,
927 indexer_args,
928 client_args,
929 ingestion_config,
930 None,
931 ®istry,
932 )
933 .await
934 .unwrap();
935
936 indexer
937 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
938 .await
939 .unwrap();
940 indexer
941 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
942 .await
943 .unwrap();
944 indexer
945 .sequential_pipeline::<C>(C, SequentialConfig::default())
946 .await
947 .unwrap();
948 indexer
949 .sequential_pipeline::<D>(D, SequentialConfig::default())
950 .await
951 .unwrap();
952
953 assert_eq!(indexer.next_checkpoint, 1);
954 }
955
956 #[tokio::test]
959 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
960 let registry = Registry::new();
961 let store = MockStore::default();
962
963 test_pipeline!(A, "concurrent_a");
964 test_pipeline!(B, "concurrent_b");
965 test_pipeline!(C, "sequential_c");
966 test_pipeline!(D, "sequential_d");
967
968 let mut conn = store.connect().await.unwrap();
969 conn.set_committer_watermark(
970 B::NAME,
971 CommitterWatermark {
972 checkpoint_hi_inclusive: 50,
973 ..Default::default()
974 },
975 )
976 .await
977 .unwrap();
978 conn.set_committer_watermark(
979 C::NAME,
980 CommitterWatermark {
981 checkpoint_hi_inclusive: 10,
982 ..Default::default()
983 },
984 )
985 .await
986 .unwrap();
987
988 let indexer_args = IndexerArgs {
989 first_checkpoint: Some(5),
990 ..Default::default()
991 };
992 let temp_dir = tempfile::tempdir().unwrap();
993 let client_args = ClientArgs {
994 ingestion: IngestionClientArgs {
995 local_ingestion_path: Some(temp_dir.path().to_owned()),
996 ..Default::default()
997 },
998 ..Default::default()
999 };
1000 let ingestion_config = IngestionConfig::default();
1001
1002 let mut indexer = Indexer::new(
1003 store,
1004 indexer_args,
1005 client_args,
1006 ingestion_config,
1007 None,
1008 ®istry,
1009 )
1010 .await
1011 .unwrap();
1012
1013 indexer
1014 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1015 .await
1016 .unwrap();
1017 indexer
1018 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1019 .await
1020 .unwrap();
1021 indexer
1022 .sequential_pipeline::<C>(C, SequentialConfig::default())
1023 .await
1024 .unwrap();
1025 indexer
1026 .sequential_pipeline::<D>(D, SequentialConfig::default())
1027 .await
1028 .unwrap();
1029
1030 assert_eq!(indexer.first_checkpoint, Some(5));
1031 assert_eq!(indexer.last_checkpoint, None);
1032 assert_eq!(indexer.next_checkpoint, 5);
1033 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1034 }
1035
1036 #[tokio::test]
1039 async fn test_next_checkpoint_ignore_first_checkpoint() {
1040 let registry = Registry::new();
1041 let store = MockStore::default();
1042
1043 test_pipeline!(B, "concurrent_b");
1044 test_pipeline!(C, "sequential_c");
1045
1046 let mut conn = store.connect().await.unwrap();
1047 conn.set_committer_watermark(
1048 B::NAME,
1049 CommitterWatermark {
1050 checkpoint_hi_inclusive: 50,
1051 ..Default::default()
1052 },
1053 )
1054 .await
1055 .unwrap();
1056 conn.set_committer_watermark(
1057 C::NAME,
1058 CommitterWatermark {
1059 checkpoint_hi_inclusive: 10,
1060 ..Default::default()
1061 },
1062 )
1063 .await
1064 .unwrap();
1065
1066 let indexer_args = IndexerArgs {
1067 first_checkpoint: Some(5),
1068 ..Default::default()
1069 };
1070 let temp_dir = tempfile::tempdir().unwrap();
1071 let client_args = ClientArgs {
1072 ingestion: IngestionClientArgs {
1073 local_ingestion_path: Some(temp_dir.path().to_owned()),
1074 ..Default::default()
1075 },
1076 ..Default::default()
1077 };
1078 let ingestion_config = IngestionConfig::default();
1079
1080 let mut indexer = Indexer::new(
1081 store,
1082 indexer_args,
1083 client_args,
1084 ingestion_config,
1085 None,
1086 ®istry,
1087 )
1088 .await
1089 .unwrap();
1090
1091 indexer
1092 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1093 .await
1094 .unwrap();
1095 indexer
1096 .sequential_pipeline::<C>(C, SequentialConfig::default())
1097 .await
1098 .unwrap();
1099
1100 assert_eq!(indexer.first_checkpoint, Some(5));
1101 assert_eq!(indexer.last_checkpoint, None);
1102 assert_eq!(indexer.next_checkpoint, 11);
1103 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1104 }
1105
1106 #[tokio::test]
1110 async fn test_next_checkpoint_large_first_checkpoint() {
1111 let registry = Registry::new();
1112 let store = MockStore::default();
1113
1114 test_pipeline!(A, "concurrent_a");
1115 test_pipeline!(B, "concurrent_b");
1116 test_pipeline!(C, "sequential_c");
1117
1118 let mut conn = store.connect().await.unwrap();
1119 conn.set_committer_watermark(
1120 B::NAME,
1121 CommitterWatermark {
1122 checkpoint_hi_inclusive: 50,
1123 ..Default::default()
1124 },
1125 )
1126 .await
1127 .unwrap();
1128 conn.set_committer_watermark(
1129 C::NAME,
1130 CommitterWatermark {
1131 checkpoint_hi_inclusive: 10,
1132 ..Default::default()
1133 },
1134 )
1135 .await
1136 .unwrap();
1137
1138 let indexer_args = IndexerArgs {
1139 first_checkpoint: Some(24),
1140 ..Default::default()
1141 };
1142 let temp_dir = tempfile::tempdir().unwrap();
1143 let client_args = ClientArgs {
1144 ingestion: IngestionClientArgs {
1145 local_ingestion_path: Some(temp_dir.path().to_owned()),
1146 ..Default::default()
1147 },
1148 ..Default::default()
1149 };
1150 let ingestion_config = IngestionConfig::default();
1151
1152 let mut indexer = Indexer::new(
1153 store,
1154 indexer_args,
1155 client_args,
1156 ingestion_config,
1157 None,
1158 ®istry,
1159 )
1160 .await
1161 .unwrap();
1162
1163 indexer
1164 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1165 .await
1166 .unwrap();
1167
1168 indexer
1169 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1170 .await
1171 .unwrap();
1172
1173 indexer
1174 .sequential_pipeline::<C>(C, SequentialConfig::default())
1175 .await
1176 .unwrap();
1177
1178 assert_eq!(indexer.first_checkpoint, Some(24));
1179 assert_eq!(indexer.last_checkpoint, None);
1180 assert_eq!(indexer.next_checkpoint, 11);
1181 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1182 }
1183
1184 #[tokio::test]
1186 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1187 let registry = Registry::new();
1188 let store = MockStore::default();
1189
1190 test_pipeline!(A, "concurrent_a");
1191 test_pipeline!(B, "concurrent_b");
1192 test_pipeline!(C, "sequential_c");
1193 test_pipeline!(D, "sequential_d");
1194
1195 let mut conn = store.connect().await.unwrap();
1196 conn.set_committer_watermark(
1197 A::NAME,
1198 CommitterWatermark {
1199 checkpoint_hi_inclusive: 5,
1200 ..Default::default()
1201 },
1202 )
1203 .await
1204 .unwrap();
1205 conn.set_committer_watermark(
1206 B::NAME,
1207 CommitterWatermark {
1208 checkpoint_hi_inclusive: 10,
1209 ..Default::default()
1210 },
1211 )
1212 .await
1213 .unwrap();
1214 conn.set_committer_watermark(
1215 C::NAME,
1216 CommitterWatermark {
1217 checkpoint_hi_inclusive: 15,
1218 ..Default::default()
1219 },
1220 )
1221 .await
1222 .unwrap();
1223 conn.set_committer_watermark(
1224 D::NAME,
1225 CommitterWatermark {
1226 checkpoint_hi_inclusive: 20,
1227 ..Default::default()
1228 },
1229 )
1230 .await
1231 .unwrap();
1232
1233 let temp_dir = tempfile::tempdir().unwrap();
1235 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1236 ingestion_dir: temp_dir.path().to_owned(),
1237 starting_checkpoint: 0,
1238 num_checkpoints: 30,
1239 checkpoint_size: 1,
1240 })
1241 .await;
1242
1243 let indexer_args = IndexerArgs {
1244 last_checkpoint: Some(29),
1245 ..Default::default()
1246 };
1247
1248 let client_args = ClientArgs {
1249 ingestion: IngestionClientArgs {
1250 local_ingestion_path: Some(temp_dir.path().to_owned()),
1251 ..Default::default()
1252 },
1253 ..Default::default()
1254 };
1255
1256 let ingestion_config = IngestionConfig::default();
1257
1258 let mut indexer = Indexer::new(
1259 store.clone(),
1260 indexer_args,
1261 client_args,
1262 ingestion_config,
1263 None,
1264 ®istry,
1265 )
1266 .await
1267 .unwrap();
1268
1269 indexer
1270 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1271 .await
1272 .unwrap();
1273 indexer
1274 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1275 .await
1276 .unwrap();
1277 indexer
1278 .sequential_pipeline::<C>(C, SequentialConfig::default())
1279 .await
1280 .unwrap();
1281 indexer
1282 .sequential_pipeline::<D>(D, SequentialConfig::default())
1283 .await
1284 .unwrap();
1285
1286 let ingestion_metrics = indexer.ingestion_metrics().clone();
1287 let indexer_metrics = indexer.indexer_metrics().clone();
1288
1289 indexer.run().await.unwrap().join().await.unwrap();
1290
1291 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1292 assert_eq!(
1293 indexer_metrics
1294 .total_watermarks_out_of_order
1295 .get_metric_with_label_values(&[A::NAME])
1296 .unwrap()
1297 .get(),
1298 0
1299 );
1300 assert_eq!(
1301 indexer_metrics
1302 .total_watermarks_out_of_order
1303 .get_metric_with_label_values(&[B::NAME])
1304 .unwrap()
1305 .get(),
1306 5
1307 );
1308 assert_eq!(
1309 indexer_metrics
1310 .total_watermarks_out_of_order
1311 .get_metric_with_label_values(&[C::NAME])
1312 .unwrap()
1313 .get(),
1314 10
1315 );
1316 assert_eq!(
1317 indexer_metrics
1318 .total_watermarks_out_of_order
1319 .get_metric_with_label_values(&[D::NAME])
1320 .unwrap()
1321 .get(),
1322 15
1323 );
1324 }
1325
1326 #[tokio::test]
1328 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1329 let registry = Registry::new();
1330 let store = MockStore::default();
1331
1332 test_pipeline!(A, "concurrent_a");
1333 test_pipeline!(B, "concurrent_b");
1334 test_pipeline!(C, "sequential_c");
1335 test_pipeline!(D, "sequential_d");
1336
1337 let mut conn = store.connect().await.unwrap();
1338 conn.set_committer_watermark(
1339 A::NAME,
1340 CommitterWatermark {
1341 checkpoint_hi_inclusive: 5,
1342 ..Default::default()
1343 },
1344 )
1345 .await
1346 .unwrap();
1347 conn.set_committer_watermark(
1348 B::NAME,
1349 CommitterWatermark {
1350 checkpoint_hi_inclusive: 10,
1351 ..Default::default()
1352 },
1353 )
1354 .await
1355 .unwrap();
1356 conn.set_committer_watermark(
1357 C::NAME,
1358 CommitterWatermark {
1359 checkpoint_hi_inclusive: 15,
1360 ..Default::default()
1361 },
1362 )
1363 .await
1364 .unwrap();
1365 conn.set_committer_watermark(
1366 D::NAME,
1367 CommitterWatermark {
1368 checkpoint_hi_inclusive: 20,
1369 ..Default::default()
1370 },
1371 )
1372 .await
1373 .unwrap();
1374
1375 let temp_dir = tempfile::tempdir().unwrap();
1377 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1378 ingestion_dir: temp_dir.path().to_owned(),
1379 starting_checkpoint: 0,
1380 num_checkpoints: 30,
1381 checkpoint_size: 1,
1382 })
1383 .await;
1384
1385 let indexer_args = IndexerArgs {
1386 first_checkpoint: Some(3),
1387 last_checkpoint: Some(29),
1388 ..Default::default()
1389 };
1390
1391 let client_args = ClientArgs {
1392 ingestion: IngestionClientArgs {
1393 local_ingestion_path: Some(temp_dir.path().to_owned()),
1394 ..Default::default()
1395 },
1396 ..Default::default()
1397 };
1398
1399 let ingestion_config = IngestionConfig::default();
1400
1401 let mut indexer = Indexer::new(
1402 store.clone(),
1403 indexer_args,
1404 client_args,
1405 ingestion_config,
1406 None,
1407 ®istry,
1408 )
1409 .await
1410 .unwrap();
1411
1412 indexer
1413 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1414 .await
1415 .unwrap();
1416 indexer
1417 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1418 .await
1419 .unwrap();
1420 indexer
1421 .sequential_pipeline::<C>(C, SequentialConfig::default())
1422 .await
1423 .unwrap();
1424 indexer
1425 .sequential_pipeline::<D>(D, SequentialConfig::default())
1426 .await
1427 .unwrap();
1428
1429 let ingestion_metrics = indexer.ingestion_metrics().clone();
1430 let metrics = indexer.indexer_metrics().clone();
1431 indexer.run().await.unwrap().join().await.unwrap();
1432
1433 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1434 assert_eq!(
1435 metrics
1436 .total_watermarks_out_of_order
1437 .get_metric_with_label_values(&[A::NAME])
1438 .unwrap()
1439 .get(),
1440 0
1441 );
1442 assert_eq!(
1443 metrics
1444 .total_watermarks_out_of_order
1445 .get_metric_with_label_values(&[B::NAME])
1446 .unwrap()
1447 .get(),
1448 5
1449 );
1450 assert_eq!(
1451 metrics
1452 .total_watermarks_out_of_order
1453 .get_metric_with_label_values(&[C::NAME])
1454 .unwrap()
1455 .get(),
1456 10
1457 );
1458 assert_eq!(
1459 metrics
1460 .total_watermarks_out_of_order
1461 .get_metric_with_label_values(&[D::NAME])
1462 .unwrap()
1463 .get(),
1464 15
1465 );
1466 }
1467
1468 #[tokio::test]
1470 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1471 let registry = Registry::new();
1472 let store = MockStore::default();
1473
1474 test_pipeline!(A, "concurrent_a");
1475 test_pipeline!(B, "concurrent_b");
1476 test_pipeline!(C, "sequential_c");
1477 test_pipeline!(D, "sequential_d");
1478
1479 let mut conn = store.connect().await.unwrap();
1480 conn.set_committer_watermark(
1481 B::NAME,
1482 CommitterWatermark {
1483 checkpoint_hi_inclusive: 10,
1484 ..Default::default()
1485 },
1486 )
1487 .await
1488 .unwrap();
1489 conn.set_committer_watermark(
1490 C::NAME,
1491 CommitterWatermark {
1492 checkpoint_hi_inclusive: 15,
1493 ..Default::default()
1494 },
1495 )
1496 .await
1497 .unwrap();
1498 conn.set_committer_watermark(
1499 D::NAME,
1500 CommitterWatermark {
1501 checkpoint_hi_inclusive: 20,
1502 ..Default::default()
1503 },
1504 )
1505 .await
1506 .unwrap();
1507
1508 let temp_dir = tempfile::tempdir().unwrap();
1510 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1511 ingestion_dir: temp_dir.path().to_owned(),
1512 starting_checkpoint: 0,
1513 num_checkpoints: 30,
1514 checkpoint_size: 1,
1515 })
1516 .await;
1517
1518 let indexer_args = IndexerArgs {
1519 last_checkpoint: Some(29),
1520 ..Default::default()
1521 };
1522
1523 let client_args = ClientArgs {
1524 ingestion: IngestionClientArgs {
1525 local_ingestion_path: Some(temp_dir.path().to_owned()),
1526 ..Default::default()
1527 },
1528 ..Default::default()
1529 };
1530
1531 let ingestion_config = IngestionConfig::default();
1532
1533 let mut indexer = Indexer::new(
1534 store.clone(),
1535 indexer_args,
1536 client_args,
1537 ingestion_config,
1538 None,
1539 ®istry,
1540 )
1541 .await
1542 .unwrap();
1543
1544 indexer
1545 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1546 .await
1547 .unwrap();
1548 indexer
1549 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1550 .await
1551 .unwrap();
1552 indexer
1553 .sequential_pipeline::<C>(C, SequentialConfig::default())
1554 .await
1555 .unwrap();
1556 indexer
1557 .sequential_pipeline::<D>(D, SequentialConfig::default())
1558 .await
1559 .unwrap();
1560
1561 let ingestion_metrics = indexer.ingestion_metrics().clone();
1562 let metrics = indexer.indexer_metrics().clone();
1563 indexer.run().await.unwrap().join().await.unwrap();
1564
1565 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1566 assert_eq!(
1567 metrics
1568 .total_watermarks_out_of_order
1569 .get_metric_with_label_values(&[A::NAME])
1570 .unwrap()
1571 .get(),
1572 0
1573 );
1574 assert_eq!(
1575 metrics
1576 .total_watermarks_out_of_order
1577 .get_metric_with_label_values(&[B::NAME])
1578 .unwrap()
1579 .get(),
1580 11
1581 );
1582 assert_eq!(
1583 metrics
1584 .total_watermarks_out_of_order
1585 .get_metric_with_label_values(&[C::NAME])
1586 .unwrap()
1587 .get(),
1588 16
1589 );
1590 assert_eq!(
1591 metrics
1592 .total_watermarks_out_of_order
1593 .get_metric_with_label_values(&[D::NAME])
1594 .unwrap()
1595 .get(),
1596 21
1597 );
1598 }
1599
1600 #[tokio::test]
1602 async fn test_indexer_ingestion_use_first_checkpoint() {
1603 let registry = Registry::new();
1604 let store = MockStore::default();
1605
1606 test_pipeline!(A, "concurrent_a");
1607 test_pipeline!(B, "concurrent_b");
1608 test_pipeline!(C, "sequential_c");
1609 test_pipeline!(D, "sequential_d");
1610
1611 let mut conn = store.connect().await.unwrap();
1612 conn.set_committer_watermark(
1613 B::NAME,
1614 CommitterWatermark {
1615 checkpoint_hi_inclusive: 10,
1616 ..Default::default()
1617 },
1618 )
1619 .await
1620 .unwrap();
1621 conn.set_committer_watermark(
1622 C::NAME,
1623 CommitterWatermark {
1624 checkpoint_hi_inclusive: 15,
1625 ..Default::default()
1626 },
1627 )
1628 .await
1629 .unwrap();
1630 conn.set_committer_watermark(
1631 D::NAME,
1632 CommitterWatermark {
1633 checkpoint_hi_inclusive: 20,
1634 ..Default::default()
1635 },
1636 )
1637 .await
1638 .unwrap();
1639
1640 let temp_dir = tempfile::tempdir().unwrap();
1642 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1643 ingestion_dir: temp_dir.path().to_owned(),
1644 starting_checkpoint: 0,
1645 num_checkpoints: 30,
1646 checkpoint_size: 1,
1647 })
1648 .await;
1649
1650 let indexer_args = IndexerArgs {
1651 first_checkpoint: Some(10),
1652 last_checkpoint: Some(29),
1653 ..Default::default()
1654 };
1655
1656 let client_args = ClientArgs {
1657 ingestion: IngestionClientArgs {
1658 local_ingestion_path: Some(temp_dir.path().to_owned()),
1659 ..Default::default()
1660 },
1661 ..Default::default()
1662 };
1663
1664 let ingestion_config = IngestionConfig::default();
1665
1666 let mut indexer = Indexer::new(
1667 store.clone(),
1668 indexer_args,
1669 client_args,
1670 ingestion_config,
1671 None,
1672 ®istry,
1673 )
1674 .await
1675 .unwrap();
1676
1677 indexer
1678 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1679 .await
1680 .unwrap();
1681 indexer
1682 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1683 .await
1684 .unwrap();
1685 indexer
1686 .sequential_pipeline::<C>(C, SequentialConfig::default())
1687 .await
1688 .unwrap();
1689 indexer
1690 .sequential_pipeline::<D>(D, SequentialConfig::default())
1691 .await
1692 .unwrap();
1693
1694 let ingestion_metrics = indexer.ingestion_metrics().clone();
1695 let metrics = indexer.indexer_metrics().clone();
1696 indexer.run().await.unwrap().join().await.unwrap();
1697
1698 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1699 assert_eq!(
1700 metrics
1701 .total_watermarks_out_of_order
1702 .get_metric_with_label_values(&[A::NAME])
1703 .unwrap()
1704 .get(),
1705 0
1706 );
1707 assert_eq!(
1708 metrics
1709 .total_watermarks_out_of_order
1710 .get_metric_with_label_values(&[B::NAME])
1711 .unwrap()
1712 .get(),
1713 1
1714 );
1715 assert_eq!(
1716 metrics
1717 .total_watermarks_out_of_order
1718 .get_metric_with_label_values(&[C::NAME])
1719 .unwrap()
1720 .get(),
1721 6
1722 );
1723 assert_eq!(
1724 metrics
1725 .total_watermarks_out_of_order
1726 .get_metric_with_label_values(&[D::NAME])
1727 .unwrap()
1728 .get(),
1729 11
1730 );
1731 }
1732
1733 #[tokio::test]
1734 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1735 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1736 assert_eq!(committer_watermark, None);
1737 assert_eq!(pruner_watermark, None);
1738 }
1739
1740 #[tokio::test]
1741 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1742 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1743 assert_eq!(committer_watermark, None);
1744 assert_eq!(pruner_watermark, None);
1745 }
1746
1747 #[tokio::test]
1748 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1749 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1750
1751 let committer_watermark = committer_watermark.unwrap();
1752 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1753
1754 let pruner_watermark = pruner_watermark.unwrap();
1755 assert_eq!(pruner_watermark.reader_lo, 1);
1756 assert_eq!(pruner_watermark.pruner_hi, 1);
1757 }
1758
1759 #[tokio::test]
1760 async fn test_init_watermark_sequential() {
1761 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1762
1763 let committer_watermark = committer_watermark.unwrap();
1764 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1765
1766 let pruner_watermark = pruner_watermark.unwrap();
1767 assert_eq!(pruner_watermark.reader_lo, 1);
1768 assert_eq!(pruner_watermark.pruner_hi, 1);
1769 }
1770
1771 #[tokio::test]
1772 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1773 let registry = Registry::new();
1774 let store = MockStore::default();
1775
1776 let mut conn = store.connect().await.unwrap();
1778
1779 conn.set_committer_watermark(
1781 MockHandler::NAME,
1782 CommitterWatermark {
1783 epoch_hi_inclusive: 0,
1784 checkpoint_hi_inclusive: 10,
1785 tx_hi: 20,
1786 timestamp_ms_hi_inclusive: 10000,
1787 },
1788 )
1789 .await
1790 .unwrap();
1791
1792 conn.set_committer_watermark(
1794 SequentialHandler::NAME,
1795 CommitterWatermark {
1796 epoch_hi_inclusive: 0,
1797 checkpoint_hi_inclusive: 5,
1798 tx_hi: 10,
1799 timestamp_ms_hi_inclusive: 5000,
1800 },
1801 )
1802 .await
1803 .unwrap();
1804
1805 let temp_dir = tempfile::tempdir().unwrap();
1807 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1808 ingestion_dir: temp_dir.path().to_owned(),
1809 starting_checkpoint: 0,
1810 num_checkpoints: 20,
1811 checkpoint_size: 2,
1812 })
1813 .await;
1814
1815 let indexer_args = IndexerArgs {
1816 first_checkpoint: None,
1817 last_checkpoint: Some(19),
1818 pipeline: vec![],
1819 ..Default::default()
1820 };
1821
1822 let client_args = ClientArgs {
1823 ingestion: IngestionClientArgs {
1824 local_ingestion_path: Some(temp_dir.path().to_owned()),
1825 ..Default::default()
1826 },
1827 ..Default::default()
1828 };
1829
1830 let ingestion_config = IngestionConfig::default();
1831
1832 let mut indexer = Indexer::new(
1833 store.clone(),
1834 indexer_args,
1835 client_args,
1836 ingestion_config,
1837 None,
1838 ®istry,
1839 )
1840 .await
1841 .unwrap();
1842
1843 indexer
1845 .sequential_pipeline(MockHandler, SequentialConfig::default())
1846 .await
1847 .unwrap();
1848
1849 assert_eq!(
1851 indexer.next_sequential_checkpoint(),
1852 Some(11),
1853 "next_sequential_checkpoint should be 11"
1854 );
1855
1856 indexer
1858 .sequential_pipeline(SequentialHandler, SequentialConfig::default())
1859 .await
1860 .unwrap();
1861
1862 assert_eq!(
1864 indexer.next_sequential_checkpoint(),
1865 Some(6),
1866 "next_sequential_checkpoint should still be 6"
1867 );
1868
1869 indexer.run().await.unwrap().join().await.unwrap();
1871
1872 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1874 let watermark2 = conn
1875 .committer_watermark(SequentialHandler::NAME)
1876 .await
1877 .unwrap();
1878
1879 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1880 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1881 }
1882
1883 #[tokio::test]
1887 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1888 let registry = Registry::new();
1889 let store = MockStore::default();
1890
1891 let mut conn = store.connect().await.unwrap();
1894 conn.set_committer_watermark(
1895 MockCheckpointSequenceNumberHandler::NAME,
1896 CommitterWatermark {
1897 checkpoint_hi_inclusive: 10,
1898 ..Default::default()
1899 },
1900 )
1901 .await
1902 .unwrap();
1903 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1904 .await
1905 .unwrap();
1906
1907 let indexer_args = IndexerArgs {
1910 first_checkpoint: Some(0),
1911 last_checkpoint: Some(15),
1912 task: TaskArgs::tasked("task".to_string(), 10),
1913 ..Default::default()
1914 };
1915 let temp_dir = tempfile::tempdir().unwrap();
1916 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1917 ingestion_dir: temp_dir.path().to_owned(),
1918 starting_checkpoint: 0,
1919 num_checkpoints: 16,
1920 checkpoint_size: 2,
1921 })
1922 .await;
1923
1924 let client_args = ClientArgs {
1925 ingestion: IngestionClientArgs {
1926 local_ingestion_path: Some(temp_dir.path().to_owned()),
1927 ..Default::default()
1928 },
1929 ..Default::default()
1930 };
1931
1932 let ingestion_config = IngestionConfig::default();
1933
1934 let mut tasked_indexer = Indexer::new(
1935 store.clone(),
1936 indexer_args,
1937 client_args,
1938 ingestion_config,
1939 None,
1940 ®istry,
1941 )
1942 .await
1943 .unwrap();
1944
1945 let _ = tasked_indexer
1946 .concurrent_pipeline(
1947 MockCheckpointSequenceNumberHandler,
1948 ConcurrentConfig::default(),
1949 )
1950 .await;
1951
1952 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1953 let metrics = tasked_indexer.indexer_metrics().clone();
1954
1955 tasked_indexer.run().await.unwrap().join().await.unwrap();
1956
1957 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1958 assert_eq!(
1959 metrics
1960 .total_collector_skipped_checkpoints
1961 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1962 .unwrap()
1963 .get(),
1964 7
1965 );
1966 let data = store
1967 .data
1968 .get(MockCheckpointSequenceNumberHandler::NAME)
1969 .unwrap();
1970 assert_eq!(data.len(), 9);
1971 for i in 0..7 {
1972 assert!(data.get(&i).is_none());
1973 }
1974 for i in 7..16 {
1975 assert!(data.get(&i).is_some());
1976 }
1977 }
1978
1979 #[tokio::test]
1981 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1982 let registry = Registry::new();
1983 let store = MockStore::default();
1984
1985 let mut conn = store.connect().await.unwrap();
1986 conn.set_committer_watermark(
1987 "test",
1988 CommitterWatermark {
1989 checkpoint_hi_inclusive: 10,
1990 ..Default::default()
1991 },
1992 )
1993 .await
1994 .unwrap();
1995 conn.set_reader_watermark("test", 5).await.unwrap();
1996
1997 let indexer_args = IndexerArgs {
2000 first_checkpoint: Some(9),
2001 last_checkpoint: Some(25),
2002 task: TaskArgs::tasked("task".to_string(), 10),
2003 ..Default::default()
2004 };
2005 let temp_dir = tempfile::tempdir().unwrap();
2006 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2007 ingestion_dir: temp_dir.path().to_owned(),
2008 starting_checkpoint: 0,
2009 num_checkpoints: 26,
2010 checkpoint_size: 2,
2011 })
2012 .await;
2013
2014 let client_args = ClientArgs {
2015 ingestion: IngestionClientArgs {
2016 local_ingestion_path: Some(temp_dir.path().to_owned()),
2017 ..Default::default()
2018 },
2019 ..Default::default()
2020 };
2021
2022 let ingestion_config = IngestionConfig::default();
2023
2024 let mut tasked_indexer = Indexer::new(
2025 store.clone(),
2026 indexer_args,
2027 client_args,
2028 ingestion_config,
2029 None,
2030 ®istry,
2031 )
2032 .await
2033 .unwrap();
2034
2035 let _ = tasked_indexer
2036 .concurrent_pipeline(
2037 MockCheckpointSequenceNumberHandler,
2038 ConcurrentConfig::default(),
2039 )
2040 .await;
2041
2042 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2043 let metrics = tasked_indexer.indexer_metrics().clone();
2044
2045 tasked_indexer.run().await.unwrap().join().await.unwrap();
2046
2047 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2048 assert_eq!(
2049 metrics
2050 .total_watermarks_out_of_order
2051 .get_metric_with_label_values(&["test"])
2052 .unwrap()
2053 .get(),
2054 0
2055 );
2056 assert_eq!(
2057 metrics
2058 .total_collector_skipped_checkpoints
2059 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2060 .unwrap()
2061 .get(),
2062 0
2063 );
2064
2065 let data = store.data.get("test").unwrap();
2066 assert_eq!(data.len(), 17);
2067 for i in 0..9 {
2068 assert!(data.get(&i).is_none());
2069 }
2070 for i in 9..26 {
2071 assert!(data.get(&i).is_some());
2072 }
2073 let main_pipeline_watermark = store.watermark("test").unwrap();
2074 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2076 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2077 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2078 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2079 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2080 }
2081
2082 #[tokio::test]
2085 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2086 let registry = Registry::new();
2087 let store = MockStore::default();
2088 let mut conn = store.connect().await.unwrap();
2089 conn.set_committer_watermark(
2091 ControllableHandler::NAME,
2092 CommitterWatermark {
2093 checkpoint_hi_inclusive: 11,
2094 ..Default::default()
2095 },
2096 )
2097 .await
2098 .unwrap();
2099
2100 let temp_dir = tempfile::tempdir().unwrap();
2102 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2103 ingestion_dir: temp_dir.path().to_owned(),
2104 starting_checkpoint: 0,
2105 num_checkpoints: 501,
2106 checkpoint_size: 2,
2107 })
2108 .await;
2109 let indexer_args = IndexerArgs {
2110 first_checkpoint: Some(0),
2111 last_checkpoint: Some(500),
2112 task: TaskArgs::tasked("task".to_string(), 10 ),
2113 ..Default::default()
2114 };
2115 let client_args = ClientArgs {
2116 ingestion: IngestionClientArgs {
2117 local_ingestion_path: Some(temp_dir.path().to_owned()),
2118 ..Default::default()
2119 },
2120 ..Default::default()
2121 };
2122 let ingestion_config = IngestionConfig::default();
2123 let mut tasked_indexer = Indexer::new(
2124 store.clone(),
2125 indexer_args,
2126 client_args,
2127 ingestion_config,
2128 None,
2129 ®istry,
2130 )
2131 .await
2132 .unwrap();
2133 let mut allow_process = 10;
2134 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2136 let _ = tasked_indexer
2137 .concurrent_pipeline(
2138 controllable_handler,
2139 ConcurrentConfig {
2140 committer: CommitterConfig {
2141 collect_interval_ms: 10,
2142 watermark_interval_ms: 10,
2143 ..Default::default()
2144 },
2145 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2148 ..Default::default()
2149 },
2150 )
2151 .await;
2152 let metrics = tasked_indexer.indexer_metrics().clone();
2153
2154 let mut s_indexer = tasked_indexer.run().await.unwrap();
2155
2156 store
2160 .wait_for_watermark(
2161 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2162 10,
2163 Duration::from_secs(10),
2164 )
2165 .await;
2166
2167 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2172 .await
2173 .unwrap();
2174
2175 let reader_lo = metrics
2176 .collector_reader_lo
2177 .with_label_values(&[ControllableHandler::NAME]);
2178
2179 let mut interval = tokio::time::interval(Duration::from_millis(10));
2183 while reader_lo.get() != 250 {
2184 interval.tick().await;
2185 allow_process += 1;
2187 assert!(
2188 allow_process <= 500,
2189 "Released all checkpoints but collector never observed new reader_lo"
2190 );
2191 process_below.send(allow_process).ok();
2192 }
2193
2194 process_below.send(500).ok();
2201
2202 s_indexer.join().await.unwrap();
2203
2204 let data = store.data.get(ControllableHandler::NAME).unwrap();
2205
2206 for chkpt in (allow_process + 1)..250 {
2208 assert!(
2209 data.get(&chkpt).is_none(),
2210 "Checkpoint {chkpt} should have been skipped"
2211 );
2212 }
2213
2214 for chkpt in 250..=500 {
2216 assert!(
2217 data.get(&chkpt).is_some(),
2218 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2219 );
2220 }
2221
2222 for chkpt in 0..=10 {
2224 assert!(
2225 data.get(&chkpt).is_some(),
2226 "Checkpoint {chkpt} should have been committed (baseline)"
2227 );
2228 }
2229 }
2230}