1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53 #[arg(long)]
60 pub first_checkpoint: Option<u64>,
61
62 #[arg(long)]
65 pub last_checkpoint: Option<u64>,
66
67 #[arg(long, action = clap::ArgAction::Append)]
70 pub pipeline: Vec<String>,
71
72 #[clap(flatten)]
74 pub task: TaskArgs,
75}
76
77#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80 #[arg(long, requires = "reader_interval_ms")]
92 task: Option<String>,
93
94 #[arg(long, requires = "task")]
105 reader_interval_ms: Option<u64>,
106}
107
108pub struct Indexer<S: Store> {
109 store: S,
113
114 metrics: Arc<IndexerMetrics>,
116
117 ingestion_service: IngestionService,
119
120 default_next_checkpoint: u64,
127
128 last_checkpoint: Option<u64>,
131
132 task: Option<Task>,
144
145 enabled_pipelines: Option<BTreeSet<String>>,
149
150 added_pipelines: BTreeSet<&'static str>,
153
154 first_ingestion_checkpoint: u64,
158
159 next_sequential_checkpoint: Option<u64>,
162
163 pipelines: Vec<Service>,
165}
166
167#[derive(Clone)]
169pub(crate) struct Task {
170 task: String,
173 reader_interval: Duration,
176}
177
178impl TaskArgs {
179 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
180 Self {
181 task: Some(task),
182 reader_interval_ms: Some(reader_interval_ms),
183 }
184 }
185
186 fn into_task(self) -> Option<Task> {
187 Some(Task {
188 task: self.task?,
189 reader_interval: Duration::from_millis(self.reader_interval_ms?),
190 })
191 }
192}
193
194impl<S: Store> Indexer<S> {
195 pub async fn new(
207 store: S,
208 indexer_args: IndexerArgs,
209 client_args: ClientArgs,
210 ingestion_config: IngestionConfig,
211 metrics_prefix: Option<&str>,
212 registry: &Registry,
213 ) -> Result<Self> {
214 let IndexerArgs {
215 first_checkpoint,
216 last_checkpoint,
217 pipeline,
218 task,
219 } = indexer_args;
220
221 let metrics = IndexerMetrics::new(metrics_prefix, registry);
222
223 let ingestion_service =
224 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
225
226 Ok(Self {
227 store,
228 metrics,
229 ingestion_service,
230 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
231 last_checkpoint,
232 task: task.into_task(),
233 enabled_pipelines: if pipeline.is_empty() {
234 None
235 } else {
236 Some(pipeline.into_iter().collect())
237 },
238 added_pipelines: BTreeSet::new(),
239 first_ingestion_checkpoint: u64::MAX,
240 next_sequential_checkpoint: None,
241 pipelines: vec![],
242 })
243 }
244
245 pub fn store(&self) -> &S {
247 &self.store
248 }
249
250 pub fn ingestion_client(&self) -> &IngestionClient {
252 self.ingestion_service.ingestion_client()
253 }
254
255 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
257 &self.metrics
258 }
259
260 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
262 self.ingestion_service.metrics()
263 }
264
265 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
267 self.added_pipelines.iter().copied().filter(|p| {
268 self.enabled_pipelines
269 .as_ref()
270 .is_none_or(|e| e.contains(*p))
271 })
272 }
273
274 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
278 self.next_sequential_checkpoint
279 }
280
281 pub async fn concurrent_pipeline<H>(
288 &mut self,
289 handler: H,
290 config: ConcurrentConfig,
291 ) -> Result<()>
292 where
293 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
294 {
295 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
296 return Ok(());
297 };
298
299 self.pipelines.push(concurrent::pipeline::<H>(
300 handler,
301 next_checkpoint,
302 config,
303 self.store.clone(),
304 self.task.clone(),
305 self.ingestion_service.subscribe().0,
306 self.metrics.clone(),
307 ));
308
309 Ok(())
310 }
311
312 pub async fn run(self) -> Result<Service> {
318 if let Some(enabled_pipelines) = self.enabled_pipelines {
319 ensure!(
320 enabled_pipelines.is_empty(),
321 "Tried to enable pipelines that this indexer does not know about: \
322 {enabled_pipelines:#?}",
323 );
324 }
325
326 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
327
328 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
329
330 let mut service = self
331 .ingestion_service
332 .run(
333 self.first_ingestion_checkpoint..=last_checkpoint,
334 self.next_sequential_checkpoint,
335 )
336 .await
337 .context("Failed to start ingestion service")?;
338
339 for pipeline in self.pipelines {
340 service = service.merge(pipeline);
341 }
342
343 Ok(service)
344 }
345
346 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
355 ensure!(
356 self.added_pipelines.insert(P::NAME),
357 "Pipeline {:?} already added",
358 P::NAME,
359 );
360
361 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
362 && !enabled_pipelines.remove(P::NAME)
363 {
364 info!(pipeline = P::NAME, "Skipping");
365 return Ok(None);
366 }
367
368 let mut conn = self
369 .store
370 .connect()
371 .await
372 .context("Failed to establish connection to store")?;
373
374 let pipeline_task =
375 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
376
377 let checkpoint_hi_inclusive = conn
378 .init_watermark(&pipeline_task, self.default_next_checkpoint)
379 .await
380 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
381
382 let next_checkpoint =
383 checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
384
385 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
386
387 Ok(Some(next_checkpoint))
388 }
389}
390
391impl<T: TransactionalStore> Indexer<T> {
392 pub async fn sequential_pipeline<H>(
403 &mut self,
404 handler: H,
405 config: SequentialConfig,
406 ) -> Result<()>
407 where
408 H: Handler<Store = T> + Send + Sync + 'static,
409 {
410 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
411 return Ok(());
412 };
413
414 if self.task.is_some() {
415 bail!(
416 "Sequential pipelines do not support pipeline tasks. \
417 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
418 Running the same pipeline under a different task would violate these guarantees."
419 );
420 }
421
422 self.next_sequential_checkpoint = Some(
424 self.next_sequential_checkpoint
425 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
426 );
427
428 let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
429
430 self.pipelines.push(sequential::pipeline::<H>(
431 handler,
432 next_checkpoint,
433 config,
434 self.store.clone(),
435 checkpoint_rx,
436 watermark_tx,
437 self.metrics.clone(),
438 ));
439
440 Ok(())
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::sync::Arc;
447
448 use async_trait::async_trait;
449 use clap::Parser;
450 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
451 use sui_synthetic_ingestion::synthetic_ingestion;
452 use tokio::sync::watch;
453
454 use crate::FieldCount;
455 use crate::ingestion::ingestion_client::IngestionClientArgs;
456 use crate::mocks::store::MockStore;
457 use crate::pipeline::CommitterConfig;
458 use crate::pipeline::Processor;
459 use crate::pipeline::concurrent::ConcurrentConfig;
460 use crate::store::CommitterWatermark;
461
462 use super::*;
463
464 #[allow(dead_code)]
465 #[derive(Clone, FieldCount)]
466 struct MockValue(u64);
467
468 struct ControllableHandler {
470 process_below: watch::Receiver<u64>,
472 }
473
474 impl ControllableHandler {
475 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
476 let (tx, rx) = watch::channel(limit);
477 (Self { process_below: rx }, tx)
478 }
479 }
480
481 #[async_trait]
482 impl Processor for ControllableHandler {
483 const NAME: &'static str = "controllable";
484 const FANOUT: usize = 501;
489 type Value = MockValue;
490
491 async fn process(
492 &self,
493 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
494 ) -> anyhow::Result<Vec<Self::Value>> {
495 let cp_num = checkpoint.summary.sequence_number;
496
497 self.process_below
499 .clone()
500 .wait_for(|&limit| cp_num <= limit)
501 .await
502 .ok();
503
504 Ok(vec![MockValue(cp_num)])
505 }
506 }
507
508 #[async_trait]
509 impl crate::pipeline::concurrent::Handler for ControllableHandler {
510 type Store = MockStore;
511 type Batch = Vec<MockValue>;
512
513 fn batch(
514 &self,
515 batch: &mut Self::Batch,
516 values: &mut std::vec::IntoIter<Self::Value>,
517 ) -> crate::pipeline::concurrent::BatchStatus {
518 batch.extend(values);
519 crate::pipeline::concurrent::BatchStatus::Ready
520 }
521
522 async fn commit<'a>(
523 &self,
524 batch: &Self::Batch,
525 conn: &mut <Self::Store as Store>::Connection<'a>,
526 ) -> anyhow::Result<usize> {
527 for value in batch {
528 conn.0
529 .commit_data(Self::NAME, value.0, vec![value.0])
530 .await?;
531 }
532 Ok(batch.len())
533 }
534 }
535
536 macro_rules! test_pipeline {
537 ($handler:ident, $name:literal) => {
538 struct $handler;
539
540 #[async_trait]
541 impl Processor for $handler {
542 const NAME: &'static str = $name;
543 type Value = MockValue;
544 async fn process(
545 &self,
546 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
547 ) -> anyhow::Result<Vec<Self::Value>> {
548 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
549 }
550 }
551
552 #[async_trait]
553 impl crate::pipeline::concurrent::Handler for $handler {
554 type Store = MockStore;
555 type Batch = Vec<Self::Value>;
556
557 fn batch(
558 &self,
559 batch: &mut Self::Batch,
560 values: &mut std::vec::IntoIter<Self::Value>,
561 ) -> crate::pipeline::concurrent::BatchStatus {
562 batch.extend(values);
563 crate::pipeline::concurrent::BatchStatus::Pending
564 }
565
566 async fn commit<'a>(
567 &self,
568 batch: &Self::Batch,
569 conn: &mut <Self::Store as Store>::Connection<'a>,
570 ) -> anyhow::Result<usize> {
571 for value in batch {
572 conn.0
573 .commit_data(Self::NAME, value.0, vec![value.0])
574 .await?;
575 }
576 Ok(batch.len())
577 }
578 }
579
580 #[async_trait]
581 impl crate::pipeline::sequential::Handler for $handler {
582 type Store = MockStore;
583 type Batch = Vec<Self::Value>;
584
585 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
586 batch.extend(values);
587 }
588
589 async fn commit<'a>(
590 &self,
591 _batch: &Self::Batch,
592 _conn: &mut <Self::Store as Store>::Connection<'a>,
593 ) -> anyhow::Result<usize> {
594 Ok(1)
595 }
596 }
597 };
598 }
599
600 test_pipeline!(MockHandler, "test_processor");
601 test_pipeline!(SequentialHandler, "sequential_handler");
602 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
603
604 async fn test_init_watermark(
605 first_checkpoint: Option<u64>,
606 is_concurrent: bool,
607 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
608 let registry = Registry::new();
609 let store = MockStore::default();
610
611 test_pipeline!(A, "pipeline_name");
612
613 let mut conn = store.connect().await.unwrap();
614
615 let indexer_args = IndexerArgs {
616 first_checkpoint,
617 ..IndexerArgs::default()
618 };
619 let temp_dir = tempfile::tempdir().unwrap();
620 let client_args = ClientArgs {
621 ingestion: IngestionClientArgs {
622 local_ingestion_path: Some(temp_dir.path().to_owned()),
623 ..Default::default()
624 },
625 ..Default::default()
626 };
627 let ingestion_config = IngestionConfig::default();
628
629 let mut indexer = Indexer::new(
630 store.clone(),
631 indexer_args,
632 client_args,
633 ingestion_config,
634 None,
635 ®istry,
636 )
637 .await
638 .unwrap();
639
640 if is_concurrent {
641 indexer
642 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
643 .await
644 .unwrap();
645 } else {
646 indexer
647 .sequential_pipeline::<A>(A, SequentialConfig::default())
648 .await
649 .unwrap();
650 }
651
652 (
653 conn.committer_watermark(A::NAME).await.unwrap(),
654 conn.pruner_watermark(A::NAME, Duration::ZERO)
655 .await
656 .unwrap(),
657 )
658 }
659
660 #[test]
661 fn test_arg_parsing() {
662 #[derive(Parser)]
663 struct Args {
664 #[clap(flatten)]
665 indexer: IndexerArgs,
666 }
667
668 let args = Args::try_parse_from([
669 "cmd",
670 "--first-checkpoint",
671 "10",
672 "--last-checkpoint",
673 "100",
674 "--pipeline",
675 "a",
676 "--pipeline",
677 "b",
678 "--task",
679 "t",
680 "--reader-interval-ms",
681 "5000",
682 ])
683 .unwrap();
684
685 assert_eq!(args.indexer.first_checkpoint, Some(10));
686 assert_eq!(args.indexer.last_checkpoint, Some(100));
687 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
688 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
689 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
690 }
691
692 #[tokio::test]
694 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
695 let registry = Registry::new();
696 let store = MockStore::default();
697
698 test_pipeline!(A, "concurrent_a");
699 test_pipeline!(B, "concurrent_b");
700 test_pipeline!(C, "sequential_c");
701 test_pipeline!(D, "sequential_d");
702
703 let mut conn = store.connect().await.unwrap();
704 conn.set_committer_watermark(
705 A::NAME,
706 CommitterWatermark {
707 checkpoint_hi_inclusive: 100,
708 ..Default::default()
709 },
710 )
711 .await
712 .unwrap();
713 conn.set_committer_watermark(
714 B::NAME,
715 CommitterWatermark {
716 checkpoint_hi_inclusive: 10,
717 ..Default::default()
718 },
719 )
720 .await
721 .unwrap();
722 conn.set_committer_watermark(
723 C::NAME,
724 CommitterWatermark {
725 checkpoint_hi_inclusive: 1,
726 ..Default::default()
727 },
728 )
729 .await
730 .unwrap();
731 conn.set_committer_watermark(
732 D::NAME,
733 CommitterWatermark {
734 checkpoint_hi_inclusive: 50,
735 ..Default::default()
736 },
737 )
738 .await
739 .unwrap();
740
741 let indexer_args = IndexerArgs::default();
742 let temp_dir = tempfile::tempdir().unwrap();
743 let client_args = ClientArgs {
744 ingestion: IngestionClientArgs {
745 local_ingestion_path: Some(temp_dir.path().to_owned()),
746 ..Default::default()
747 },
748 ..Default::default()
749 };
750 let ingestion_config = IngestionConfig::default();
751
752 let mut indexer = Indexer::new(
753 store,
754 indexer_args,
755 client_args,
756 ingestion_config,
757 None,
758 ®istry,
759 )
760 .await
761 .unwrap();
762
763 indexer
764 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
765 .await
766 .unwrap();
767 indexer
768 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
769 .await
770 .unwrap();
771 indexer
772 .sequential_pipeline::<C>(C, SequentialConfig::default())
773 .await
774 .unwrap();
775 indexer
776 .sequential_pipeline::<D>(D, SequentialConfig::default())
777 .await
778 .unwrap();
779
780 assert_eq!(indexer.first_ingestion_checkpoint, 2);
781 }
782
783 #[tokio::test]
785 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
786 let registry = Registry::new();
787 let store = MockStore::default();
788
789 test_pipeline!(A, "concurrent_a");
790 test_pipeline!(B, "concurrent_b");
791 test_pipeline!(C, "sequential_c");
792 test_pipeline!(D, "sequential_d");
793
794 let mut conn = store.connect().await.unwrap();
795 conn.set_committer_watermark(
796 B::NAME,
797 CommitterWatermark {
798 checkpoint_hi_inclusive: 10,
799 ..Default::default()
800 },
801 )
802 .await
803 .unwrap();
804 conn.set_committer_watermark(
805 C::NAME,
806 CommitterWatermark {
807 checkpoint_hi_inclusive: 1,
808 ..Default::default()
809 },
810 )
811 .await
812 .unwrap();
813
814 let indexer_args = IndexerArgs::default();
815 let temp_dir = tempfile::tempdir().unwrap();
816 let client_args = ClientArgs {
817 ingestion: IngestionClientArgs {
818 local_ingestion_path: Some(temp_dir.path().to_owned()),
819 ..Default::default()
820 },
821 ..Default::default()
822 };
823 let ingestion_config = IngestionConfig::default();
824
825 let mut indexer = Indexer::new(
826 store,
827 indexer_args,
828 client_args,
829 ingestion_config,
830 None,
831 ®istry,
832 )
833 .await
834 .unwrap();
835
836 indexer
837 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
838 .await
839 .unwrap();
840 indexer
841 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
842 .await
843 .unwrap();
844 indexer
845 .sequential_pipeline::<C>(C, SequentialConfig::default())
846 .await
847 .unwrap();
848 indexer
849 .sequential_pipeline::<D>(D, SequentialConfig::default())
850 .await
851 .unwrap();
852
853 assert_eq!(indexer.first_ingestion_checkpoint, 0);
854 }
855
856 #[tokio::test]
858 async fn test_first_ingestion_checkpoint_smallest_is_0() {
859 let registry = Registry::new();
860 let store = MockStore::default();
861
862 test_pipeline!(A, "concurrent_a");
863 test_pipeline!(B, "concurrent_b");
864 test_pipeline!(C, "sequential_c");
865 test_pipeline!(D, "sequential_d");
866
867 let mut conn = store.connect().await.unwrap();
868 conn.set_committer_watermark(
869 A::NAME,
870 CommitterWatermark {
871 checkpoint_hi_inclusive: 100,
872 ..Default::default()
873 },
874 )
875 .await
876 .unwrap();
877 conn.set_committer_watermark(
878 B::NAME,
879 CommitterWatermark {
880 checkpoint_hi_inclusive: 10,
881 ..Default::default()
882 },
883 )
884 .await
885 .unwrap();
886 conn.set_committer_watermark(
887 C::NAME,
888 CommitterWatermark {
889 checkpoint_hi_inclusive: 1,
890 ..Default::default()
891 },
892 )
893 .await
894 .unwrap();
895 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
896 .await
897 .unwrap();
898
899 let indexer_args = IndexerArgs::default();
900 let temp_dir = tempfile::tempdir().unwrap();
901 let client_args = ClientArgs {
902 ingestion: IngestionClientArgs {
903 local_ingestion_path: Some(temp_dir.path().to_owned()),
904 ..Default::default()
905 },
906 ..Default::default()
907 };
908 let ingestion_config = IngestionConfig::default();
909
910 let mut indexer = Indexer::new(
911 store,
912 indexer_args,
913 client_args,
914 ingestion_config,
915 None,
916 ®istry,
917 )
918 .await
919 .unwrap();
920
921 indexer
922 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
923 .await
924 .unwrap();
925 indexer
926 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
927 .await
928 .unwrap();
929 indexer
930 .sequential_pipeline::<C>(C, SequentialConfig::default())
931 .await
932 .unwrap();
933 indexer
934 .sequential_pipeline::<D>(D, SequentialConfig::default())
935 .await
936 .unwrap();
937
938 assert_eq!(indexer.first_ingestion_checkpoint, 1);
939 }
940
941 #[tokio::test]
944 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
945 let registry = Registry::new();
946 let store = MockStore::default();
947
948 test_pipeline!(A, "concurrent_a");
949 test_pipeline!(B, "concurrent_b");
950 test_pipeline!(C, "sequential_c");
951 test_pipeline!(D, "sequential_d");
952
953 let mut conn = store.connect().await.unwrap();
954 conn.set_committer_watermark(
955 B::NAME,
956 CommitterWatermark {
957 checkpoint_hi_inclusive: 50,
958 ..Default::default()
959 },
960 )
961 .await
962 .unwrap();
963 conn.set_committer_watermark(
964 C::NAME,
965 CommitterWatermark {
966 checkpoint_hi_inclusive: 10,
967 ..Default::default()
968 },
969 )
970 .await
971 .unwrap();
972
973 let indexer_args = IndexerArgs {
974 first_checkpoint: Some(5),
975 ..Default::default()
976 };
977 let temp_dir = tempfile::tempdir().unwrap();
978 let client_args = ClientArgs {
979 ingestion: IngestionClientArgs {
980 local_ingestion_path: Some(temp_dir.path().to_owned()),
981 ..Default::default()
982 },
983 ..Default::default()
984 };
985 let ingestion_config = IngestionConfig::default();
986
987 let mut indexer = Indexer::new(
988 store,
989 indexer_args,
990 client_args,
991 ingestion_config,
992 None,
993 ®istry,
994 )
995 .await
996 .unwrap();
997
998 indexer
999 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1000 .await
1001 .unwrap();
1002 indexer
1003 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1004 .await
1005 .unwrap();
1006 indexer
1007 .sequential_pipeline::<C>(C, SequentialConfig::default())
1008 .await
1009 .unwrap();
1010 indexer
1011 .sequential_pipeline::<D>(D, SequentialConfig::default())
1012 .await
1013 .unwrap();
1014
1015 assert_eq!(indexer.first_ingestion_checkpoint, 5);
1016 }
1017
1018 #[tokio::test]
1021 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1022 let registry = Registry::new();
1023 let store = MockStore::default();
1024
1025 test_pipeline!(B, "concurrent_b");
1026 test_pipeline!(C, "sequential_c");
1027
1028 let mut conn = store.connect().await.unwrap();
1029 conn.set_committer_watermark(
1030 B::NAME,
1031 CommitterWatermark {
1032 checkpoint_hi_inclusive: 50,
1033 ..Default::default()
1034 },
1035 )
1036 .await
1037 .unwrap();
1038 conn.set_committer_watermark(
1039 C::NAME,
1040 CommitterWatermark {
1041 checkpoint_hi_inclusive: 10,
1042 ..Default::default()
1043 },
1044 )
1045 .await
1046 .unwrap();
1047
1048 let indexer_args = IndexerArgs {
1049 first_checkpoint: Some(5),
1050 ..Default::default()
1051 };
1052 let temp_dir = tempfile::tempdir().unwrap();
1053 let client_args = ClientArgs {
1054 ingestion: IngestionClientArgs {
1055 local_ingestion_path: Some(temp_dir.path().to_owned()),
1056 ..Default::default()
1057 },
1058 ..Default::default()
1059 };
1060 let ingestion_config = IngestionConfig::default();
1061
1062 let mut indexer = Indexer::new(
1063 store,
1064 indexer_args,
1065 client_args,
1066 ingestion_config,
1067 None,
1068 ®istry,
1069 )
1070 .await
1071 .unwrap();
1072
1073 indexer
1074 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1075 .await
1076 .unwrap();
1077 indexer
1078 .sequential_pipeline::<C>(C, SequentialConfig::default())
1079 .await
1080 .unwrap();
1081
1082 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1083 }
1084
1085 #[tokio::test]
1089 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1090 let registry = Registry::new();
1091 let store = MockStore::default();
1092
1093 test_pipeline!(A, "concurrent_a");
1094 test_pipeline!(B, "concurrent_b");
1095 test_pipeline!(C, "sequential_c");
1096
1097 let mut conn = store.connect().await.unwrap();
1098 conn.set_committer_watermark(
1099 B::NAME,
1100 CommitterWatermark {
1101 checkpoint_hi_inclusive: 50,
1102 ..Default::default()
1103 },
1104 )
1105 .await
1106 .unwrap();
1107 conn.set_committer_watermark(
1108 C::NAME,
1109 CommitterWatermark {
1110 checkpoint_hi_inclusive: 10,
1111 ..Default::default()
1112 },
1113 )
1114 .await
1115 .unwrap();
1116
1117 let indexer_args = IndexerArgs {
1118 first_checkpoint: Some(24),
1119 ..Default::default()
1120 };
1121 let temp_dir = tempfile::tempdir().unwrap();
1122 let client_args = ClientArgs {
1123 ingestion: IngestionClientArgs {
1124 local_ingestion_path: Some(temp_dir.path().to_owned()),
1125 ..Default::default()
1126 },
1127 ..Default::default()
1128 };
1129 let ingestion_config = IngestionConfig::default();
1130
1131 let mut indexer = Indexer::new(
1132 store,
1133 indexer_args,
1134 client_args,
1135 ingestion_config,
1136 None,
1137 ®istry,
1138 )
1139 .await
1140 .unwrap();
1141
1142 indexer
1143 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1144 .await
1145 .unwrap();
1146
1147 indexer
1148 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1149 .await
1150 .unwrap();
1151
1152 indexer
1153 .sequential_pipeline::<C>(C, SequentialConfig::default())
1154 .await
1155 .unwrap();
1156
1157 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1158 }
1159
1160 #[tokio::test]
1162 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1163 let registry = Registry::new();
1164 let store = MockStore::default();
1165
1166 test_pipeline!(A, "concurrent_a");
1167 test_pipeline!(B, "concurrent_b");
1168 test_pipeline!(C, "sequential_c");
1169 test_pipeline!(D, "sequential_d");
1170
1171 let mut conn = store.connect().await.unwrap();
1172 conn.set_committer_watermark(
1173 A::NAME,
1174 CommitterWatermark {
1175 checkpoint_hi_inclusive: 5,
1176 ..Default::default()
1177 },
1178 )
1179 .await
1180 .unwrap();
1181 conn.set_committer_watermark(
1182 B::NAME,
1183 CommitterWatermark {
1184 checkpoint_hi_inclusive: 10,
1185 ..Default::default()
1186 },
1187 )
1188 .await
1189 .unwrap();
1190 conn.set_committer_watermark(
1191 C::NAME,
1192 CommitterWatermark {
1193 checkpoint_hi_inclusive: 15,
1194 ..Default::default()
1195 },
1196 )
1197 .await
1198 .unwrap();
1199 conn.set_committer_watermark(
1200 D::NAME,
1201 CommitterWatermark {
1202 checkpoint_hi_inclusive: 20,
1203 ..Default::default()
1204 },
1205 )
1206 .await
1207 .unwrap();
1208
1209 let temp_dir = tempfile::tempdir().unwrap();
1211 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1212 ingestion_dir: temp_dir.path().to_owned(),
1213 starting_checkpoint: 5,
1214 num_checkpoints: 25,
1215 checkpoint_size: 1,
1216 })
1217 .await;
1218
1219 let indexer_args = IndexerArgs {
1220 last_checkpoint: Some(29),
1221 ..Default::default()
1222 };
1223
1224 let client_args = ClientArgs {
1225 ingestion: IngestionClientArgs {
1226 local_ingestion_path: Some(temp_dir.path().to_owned()),
1227 ..Default::default()
1228 },
1229 ..Default::default()
1230 };
1231
1232 let ingestion_config = IngestionConfig::default();
1233
1234 let mut indexer = Indexer::new(
1235 store.clone(),
1236 indexer_args,
1237 client_args,
1238 ingestion_config,
1239 None,
1240 ®istry,
1241 )
1242 .await
1243 .unwrap();
1244
1245 indexer
1246 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1247 .await
1248 .unwrap();
1249 indexer
1250 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1251 .await
1252 .unwrap();
1253 indexer
1254 .sequential_pipeline::<C>(C, SequentialConfig::default())
1255 .await
1256 .unwrap();
1257 indexer
1258 .sequential_pipeline::<D>(D, SequentialConfig::default())
1259 .await
1260 .unwrap();
1261
1262 let ingestion_metrics = indexer.ingestion_metrics().clone();
1263 let indexer_metrics = indexer.indexer_metrics().clone();
1264
1265 indexer.run().await.unwrap().join().await.unwrap();
1266
1267 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1268 assert_eq!(
1269 indexer_metrics
1270 .total_watermarks_out_of_order
1271 .get_metric_with_label_values(&[A::NAME])
1272 .unwrap()
1273 .get(),
1274 0
1275 );
1276 assert_eq!(
1277 indexer_metrics
1278 .total_watermarks_out_of_order
1279 .get_metric_with_label_values(&[B::NAME])
1280 .unwrap()
1281 .get(),
1282 5
1283 );
1284 assert_eq!(
1285 indexer_metrics
1286 .total_watermarks_out_of_order
1287 .get_metric_with_label_values(&[C::NAME])
1288 .unwrap()
1289 .get(),
1290 10
1291 );
1292 assert_eq!(
1293 indexer_metrics
1294 .total_watermarks_out_of_order
1295 .get_metric_with_label_values(&[D::NAME])
1296 .unwrap()
1297 .get(),
1298 15
1299 );
1300 }
1301
1302 #[tokio::test]
1304 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1305 let registry = Registry::new();
1306 let store = MockStore::default();
1307
1308 test_pipeline!(A, "concurrent_a");
1309 test_pipeline!(B, "concurrent_b");
1310 test_pipeline!(C, "sequential_c");
1311 test_pipeline!(D, "sequential_d");
1312
1313 let mut conn = store.connect().await.unwrap();
1314 conn.set_committer_watermark(
1315 A::NAME,
1316 CommitterWatermark {
1317 checkpoint_hi_inclusive: 5,
1318 ..Default::default()
1319 },
1320 )
1321 .await
1322 .unwrap();
1323 conn.set_committer_watermark(
1324 B::NAME,
1325 CommitterWatermark {
1326 checkpoint_hi_inclusive: 10,
1327 ..Default::default()
1328 },
1329 )
1330 .await
1331 .unwrap();
1332 conn.set_committer_watermark(
1333 C::NAME,
1334 CommitterWatermark {
1335 checkpoint_hi_inclusive: 15,
1336 ..Default::default()
1337 },
1338 )
1339 .await
1340 .unwrap();
1341 conn.set_committer_watermark(
1342 D::NAME,
1343 CommitterWatermark {
1344 checkpoint_hi_inclusive: 20,
1345 ..Default::default()
1346 },
1347 )
1348 .await
1349 .unwrap();
1350
1351 let temp_dir = tempfile::tempdir().unwrap();
1353 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1354 ingestion_dir: temp_dir.path().to_owned(),
1355 starting_checkpoint: 5,
1356 num_checkpoints: 25,
1357 checkpoint_size: 1,
1358 })
1359 .await;
1360
1361 let indexer_args = IndexerArgs {
1362 first_checkpoint: Some(3),
1363 last_checkpoint: Some(29),
1364 ..Default::default()
1365 };
1366
1367 let client_args = ClientArgs {
1368 ingestion: IngestionClientArgs {
1369 local_ingestion_path: Some(temp_dir.path().to_owned()),
1370 ..Default::default()
1371 },
1372 ..Default::default()
1373 };
1374
1375 let ingestion_config = IngestionConfig::default();
1376
1377 let mut indexer = Indexer::new(
1378 store.clone(),
1379 indexer_args,
1380 client_args,
1381 ingestion_config,
1382 None,
1383 ®istry,
1384 )
1385 .await
1386 .unwrap();
1387
1388 indexer
1389 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1390 .await
1391 .unwrap();
1392 indexer
1393 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1394 .await
1395 .unwrap();
1396 indexer
1397 .sequential_pipeline::<C>(C, SequentialConfig::default())
1398 .await
1399 .unwrap();
1400 indexer
1401 .sequential_pipeline::<D>(D, SequentialConfig::default())
1402 .await
1403 .unwrap();
1404
1405 let ingestion_metrics = indexer.ingestion_metrics().clone();
1406 let metrics = indexer.indexer_metrics().clone();
1407 indexer.run().await.unwrap().join().await.unwrap();
1408
1409 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1410 assert_eq!(
1411 metrics
1412 .total_watermarks_out_of_order
1413 .get_metric_with_label_values(&[A::NAME])
1414 .unwrap()
1415 .get(),
1416 0
1417 );
1418 assert_eq!(
1419 metrics
1420 .total_watermarks_out_of_order
1421 .get_metric_with_label_values(&[B::NAME])
1422 .unwrap()
1423 .get(),
1424 5
1425 );
1426 assert_eq!(
1427 metrics
1428 .total_watermarks_out_of_order
1429 .get_metric_with_label_values(&[C::NAME])
1430 .unwrap()
1431 .get(),
1432 10
1433 );
1434 assert_eq!(
1435 metrics
1436 .total_watermarks_out_of_order
1437 .get_metric_with_label_values(&[D::NAME])
1438 .unwrap()
1439 .get(),
1440 15
1441 );
1442 }
1443
1444 #[tokio::test]
1446 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1447 let registry = Registry::new();
1448 let store = MockStore::default();
1449
1450 test_pipeline!(A, "concurrent_a");
1451 test_pipeline!(B, "concurrent_b");
1452 test_pipeline!(C, "sequential_c");
1453 test_pipeline!(D, "sequential_d");
1454
1455 let mut conn = store.connect().await.unwrap();
1456 conn.set_committer_watermark(
1457 B::NAME,
1458 CommitterWatermark {
1459 checkpoint_hi_inclusive: 10,
1460 ..Default::default()
1461 },
1462 )
1463 .await
1464 .unwrap();
1465 conn.set_committer_watermark(
1466 C::NAME,
1467 CommitterWatermark {
1468 checkpoint_hi_inclusive: 15,
1469 ..Default::default()
1470 },
1471 )
1472 .await
1473 .unwrap();
1474 conn.set_committer_watermark(
1475 D::NAME,
1476 CommitterWatermark {
1477 checkpoint_hi_inclusive: 20,
1478 ..Default::default()
1479 },
1480 )
1481 .await
1482 .unwrap();
1483
1484 let temp_dir = tempfile::tempdir().unwrap();
1486 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1487 ingestion_dir: temp_dir.path().to_owned(),
1488 starting_checkpoint: 0,
1489 num_checkpoints: 30,
1490 checkpoint_size: 1,
1491 })
1492 .await;
1493
1494 let indexer_args = IndexerArgs {
1495 last_checkpoint: Some(29),
1496 ..Default::default()
1497 };
1498
1499 let client_args = ClientArgs {
1500 ingestion: IngestionClientArgs {
1501 local_ingestion_path: Some(temp_dir.path().to_owned()),
1502 ..Default::default()
1503 },
1504 ..Default::default()
1505 };
1506
1507 let ingestion_config = IngestionConfig::default();
1508
1509 let mut indexer = Indexer::new(
1510 store.clone(),
1511 indexer_args,
1512 client_args,
1513 ingestion_config,
1514 None,
1515 ®istry,
1516 )
1517 .await
1518 .unwrap();
1519
1520 indexer
1521 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1522 .await
1523 .unwrap();
1524 indexer
1525 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1526 .await
1527 .unwrap();
1528 indexer
1529 .sequential_pipeline::<C>(C, SequentialConfig::default())
1530 .await
1531 .unwrap();
1532 indexer
1533 .sequential_pipeline::<D>(D, SequentialConfig::default())
1534 .await
1535 .unwrap();
1536
1537 let ingestion_metrics = indexer.ingestion_metrics().clone();
1538 let metrics = indexer.indexer_metrics().clone();
1539 indexer.run().await.unwrap().join().await.unwrap();
1540
1541 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1542 assert_eq!(
1543 metrics
1544 .total_watermarks_out_of_order
1545 .get_metric_with_label_values(&[A::NAME])
1546 .unwrap()
1547 .get(),
1548 0
1549 );
1550 assert_eq!(
1551 metrics
1552 .total_watermarks_out_of_order
1553 .get_metric_with_label_values(&[B::NAME])
1554 .unwrap()
1555 .get(),
1556 11
1557 );
1558 assert_eq!(
1559 metrics
1560 .total_watermarks_out_of_order
1561 .get_metric_with_label_values(&[C::NAME])
1562 .unwrap()
1563 .get(),
1564 16
1565 );
1566 assert_eq!(
1567 metrics
1568 .total_watermarks_out_of_order
1569 .get_metric_with_label_values(&[D::NAME])
1570 .unwrap()
1571 .get(),
1572 21
1573 );
1574 }
1575
1576 #[tokio::test]
1578 async fn test_indexer_ingestion_use_first_checkpoint() {
1579 let registry = Registry::new();
1580 let store = MockStore::default();
1581
1582 test_pipeline!(A, "concurrent_a");
1583 test_pipeline!(B, "concurrent_b");
1584 test_pipeline!(C, "sequential_c");
1585 test_pipeline!(D, "sequential_d");
1586
1587 let mut conn = store.connect().await.unwrap();
1588 conn.set_committer_watermark(
1589 B::NAME,
1590 CommitterWatermark {
1591 checkpoint_hi_inclusive: 10,
1592 ..Default::default()
1593 },
1594 )
1595 .await
1596 .unwrap();
1597 conn.set_committer_watermark(
1598 C::NAME,
1599 CommitterWatermark {
1600 checkpoint_hi_inclusive: 15,
1601 ..Default::default()
1602 },
1603 )
1604 .await
1605 .unwrap();
1606 conn.set_committer_watermark(
1607 D::NAME,
1608 CommitterWatermark {
1609 checkpoint_hi_inclusive: 20,
1610 ..Default::default()
1611 },
1612 )
1613 .await
1614 .unwrap();
1615
1616 let temp_dir = tempfile::tempdir().unwrap();
1618 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1619 ingestion_dir: temp_dir.path().to_owned(),
1620 starting_checkpoint: 5,
1621 num_checkpoints: 25,
1622 checkpoint_size: 1,
1623 })
1624 .await;
1625
1626 let indexer_args = IndexerArgs {
1627 first_checkpoint: Some(10),
1628 last_checkpoint: Some(29),
1629 ..Default::default()
1630 };
1631
1632 let client_args = ClientArgs {
1633 ingestion: IngestionClientArgs {
1634 local_ingestion_path: Some(temp_dir.path().to_owned()),
1635 ..Default::default()
1636 },
1637 ..Default::default()
1638 };
1639
1640 let ingestion_config = IngestionConfig::default();
1641
1642 let mut indexer = Indexer::new(
1643 store.clone(),
1644 indexer_args,
1645 client_args,
1646 ingestion_config,
1647 None,
1648 ®istry,
1649 )
1650 .await
1651 .unwrap();
1652
1653 indexer
1654 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1655 .await
1656 .unwrap();
1657 indexer
1658 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1659 .await
1660 .unwrap();
1661 indexer
1662 .sequential_pipeline::<C>(C, SequentialConfig::default())
1663 .await
1664 .unwrap();
1665 indexer
1666 .sequential_pipeline::<D>(D, SequentialConfig::default())
1667 .await
1668 .unwrap();
1669
1670 let ingestion_metrics = indexer.ingestion_metrics().clone();
1671 let metrics = indexer.indexer_metrics().clone();
1672 indexer.run().await.unwrap().join().await.unwrap();
1673
1674 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1675 assert_eq!(
1676 metrics
1677 .total_watermarks_out_of_order
1678 .get_metric_with_label_values(&[A::NAME])
1679 .unwrap()
1680 .get(),
1681 0
1682 );
1683 assert_eq!(
1684 metrics
1685 .total_watermarks_out_of_order
1686 .get_metric_with_label_values(&[B::NAME])
1687 .unwrap()
1688 .get(),
1689 1
1690 );
1691 assert_eq!(
1692 metrics
1693 .total_watermarks_out_of_order
1694 .get_metric_with_label_values(&[C::NAME])
1695 .unwrap()
1696 .get(),
1697 6
1698 );
1699 assert_eq!(
1700 metrics
1701 .total_watermarks_out_of_order
1702 .get_metric_with_label_values(&[D::NAME])
1703 .unwrap()
1704 .get(),
1705 11
1706 );
1707 }
1708
1709 #[tokio::test]
1710 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1711 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1712 assert_eq!(committer_watermark, None);
1714 assert_eq!(pruner_watermark, None);
1715 }
1716
1717 #[tokio::test]
1718 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1719 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1720 assert_eq!(committer_watermark, None);
1722 assert_eq!(pruner_watermark, None);
1723 }
1724
1725 #[tokio::test]
1726 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1727 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1728
1729 let committer_watermark = committer_watermark.unwrap();
1730 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1731
1732 let pruner_watermark = pruner_watermark.unwrap();
1733 assert_eq!(pruner_watermark.reader_lo, 1);
1734 assert_eq!(pruner_watermark.pruner_hi, 1);
1735 }
1736
1737 #[tokio::test]
1738 async fn test_init_watermark_sequential() {
1739 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1740
1741 let committer_watermark = committer_watermark.unwrap();
1742 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1743
1744 let pruner_watermark = pruner_watermark.unwrap();
1745 assert_eq!(pruner_watermark.reader_lo, 1);
1746 assert_eq!(pruner_watermark.pruner_hi, 1);
1747 }
1748
1749 #[tokio::test]
1750 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1751 let registry = Registry::new();
1752 let store = MockStore::default();
1753
1754 let mut conn = store.connect().await.unwrap();
1756
1757 conn.set_committer_watermark(
1759 MockHandler::NAME,
1760 CommitterWatermark {
1761 epoch_hi_inclusive: 0,
1762 checkpoint_hi_inclusive: 10,
1763 tx_hi: 20,
1764 timestamp_ms_hi_inclusive: 10000,
1765 },
1766 )
1767 .await
1768 .unwrap();
1769
1770 conn.set_committer_watermark(
1772 SequentialHandler::NAME,
1773 CommitterWatermark {
1774 epoch_hi_inclusive: 0,
1775 checkpoint_hi_inclusive: 5,
1776 tx_hi: 10,
1777 timestamp_ms_hi_inclusive: 5000,
1778 },
1779 )
1780 .await
1781 .unwrap();
1782
1783 let temp_dir = tempfile::tempdir().unwrap();
1785 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1786 ingestion_dir: temp_dir.path().to_owned(),
1787 starting_checkpoint: 0,
1788 num_checkpoints: 20,
1789 checkpoint_size: 2,
1790 })
1791 .await;
1792
1793 let indexer_args = IndexerArgs {
1794 first_checkpoint: None,
1795 last_checkpoint: Some(19),
1796 pipeline: vec![],
1797 ..Default::default()
1798 };
1799
1800 let client_args = ClientArgs {
1801 ingestion: IngestionClientArgs {
1802 local_ingestion_path: Some(temp_dir.path().to_owned()),
1803 ..Default::default()
1804 },
1805 ..Default::default()
1806 };
1807
1808 let ingestion_config = IngestionConfig::default();
1809
1810 let mut indexer = Indexer::new(
1811 store.clone(),
1812 indexer_args,
1813 client_args,
1814 ingestion_config,
1815 None,
1816 ®istry,
1817 )
1818 .await
1819 .unwrap();
1820
1821 indexer
1823 .sequential_pipeline(
1824 MockHandler,
1825 pipeline::sequential::SequentialConfig::default(),
1826 )
1827 .await
1828 .unwrap();
1829
1830 assert_eq!(
1832 indexer.next_sequential_checkpoint(),
1833 Some(11),
1834 "next_sequential_checkpoint should be 11"
1835 );
1836
1837 indexer
1839 .sequential_pipeline(
1840 SequentialHandler,
1841 pipeline::sequential::SequentialConfig::default(),
1842 )
1843 .await
1844 .unwrap();
1845
1846 assert_eq!(
1848 indexer.next_sequential_checkpoint(),
1849 Some(6),
1850 "next_sequential_checkpoint should still be 6"
1851 );
1852
1853 indexer.run().await.unwrap().join().await.unwrap();
1855
1856 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1858 let watermark2 = conn
1859 .committer_watermark(SequentialHandler::NAME)
1860 .await
1861 .unwrap();
1862
1863 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1864 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1865 }
1866
1867 #[tokio::test]
1871 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1872 let registry = Registry::new();
1873 let store = MockStore::default();
1874
1875 let mut conn = store.connect().await.unwrap();
1878 conn.set_committer_watermark(
1879 MockCheckpointSequenceNumberHandler::NAME,
1880 CommitterWatermark {
1881 checkpoint_hi_inclusive: 10,
1882 ..Default::default()
1883 },
1884 )
1885 .await
1886 .unwrap();
1887 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1888 .await
1889 .unwrap();
1890
1891 let indexer_args = IndexerArgs {
1894 first_checkpoint: Some(0),
1895 last_checkpoint: Some(15),
1896 pipeline: vec![],
1897 task: TaskArgs::tasked("task".to_string(), 10),
1898 };
1899 let temp_dir = tempfile::tempdir().unwrap();
1900 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1901 ingestion_dir: temp_dir.path().to_owned(),
1902 starting_checkpoint: 0,
1903 num_checkpoints: 16,
1904 checkpoint_size: 2,
1905 })
1906 .await;
1907
1908 let client_args = ClientArgs {
1909 ingestion: IngestionClientArgs {
1910 local_ingestion_path: Some(temp_dir.path().to_owned()),
1911 ..Default::default()
1912 },
1913 ..Default::default()
1914 };
1915
1916 let ingestion_config = IngestionConfig::default();
1917
1918 let mut tasked_indexer = Indexer::new(
1919 store.clone(),
1920 indexer_args,
1921 client_args,
1922 ingestion_config,
1923 None,
1924 ®istry,
1925 )
1926 .await
1927 .unwrap();
1928
1929 let _ = tasked_indexer
1930 .concurrent_pipeline(
1931 MockCheckpointSequenceNumberHandler,
1932 ConcurrentConfig::default(),
1933 )
1934 .await;
1935
1936 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1937 let metrics = tasked_indexer.indexer_metrics().clone();
1938
1939 tasked_indexer.run().await.unwrap().join().await.unwrap();
1940
1941 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1942 assert_eq!(
1943 metrics
1944 .total_collector_skipped_checkpoints
1945 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1946 .unwrap()
1947 .get(),
1948 7
1949 );
1950 let data = store
1951 .data
1952 .get(MockCheckpointSequenceNumberHandler::NAME)
1953 .unwrap();
1954 assert_eq!(data.len(), 9);
1955 for i in 0..7 {
1956 assert!(data.get(&i).is_none());
1957 }
1958 for i in 7..16 {
1959 assert!(data.get(&i).is_some());
1960 }
1961 }
1962
1963 #[tokio::test]
1965 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1966 let registry = Registry::new();
1967 let store = MockStore::default();
1968
1969 let mut conn = store.connect().await.unwrap();
1970 conn.set_committer_watermark(
1971 "test",
1972 CommitterWatermark {
1973 checkpoint_hi_inclusive: 10,
1974 ..Default::default()
1975 },
1976 )
1977 .await
1978 .unwrap();
1979 conn.set_reader_watermark("test", 5).await.unwrap();
1980
1981 let indexer_args = IndexerArgs {
1984 first_checkpoint: Some(9),
1985 last_checkpoint: Some(25),
1986 pipeline: vec![],
1987 task: TaskArgs::tasked("task".to_string(), 10),
1988 };
1989 let temp_dir = tempfile::tempdir().unwrap();
1990 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1991 ingestion_dir: temp_dir.path().to_owned(),
1992 starting_checkpoint: 9,
1993 num_checkpoints: 17,
1994 checkpoint_size: 2,
1995 })
1996 .await;
1997
1998 let client_args = ClientArgs {
1999 ingestion: IngestionClientArgs {
2000 local_ingestion_path: Some(temp_dir.path().to_owned()),
2001 ..Default::default()
2002 },
2003 ..Default::default()
2004 };
2005
2006 let ingestion_config = IngestionConfig::default();
2007
2008 let mut tasked_indexer = Indexer::new(
2009 store.clone(),
2010 indexer_args,
2011 client_args,
2012 ingestion_config,
2013 None,
2014 ®istry,
2015 )
2016 .await
2017 .unwrap();
2018
2019 let _ = tasked_indexer
2020 .concurrent_pipeline(
2021 MockCheckpointSequenceNumberHandler,
2022 ConcurrentConfig::default(),
2023 )
2024 .await;
2025
2026 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2027 let metrics = tasked_indexer.indexer_metrics().clone();
2028
2029 tasked_indexer.run().await.unwrap().join().await.unwrap();
2030
2031 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2032 assert_eq!(
2033 metrics
2034 .total_watermarks_out_of_order
2035 .get_metric_with_label_values(&["test"])
2036 .unwrap()
2037 .get(),
2038 0
2039 );
2040 assert_eq!(
2041 metrics
2042 .total_collector_skipped_checkpoints
2043 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2044 .unwrap()
2045 .get(),
2046 0
2047 );
2048
2049 let data = store.data.get("test").unwrap();
2050 assert!(data.len() == 17);
2051 for i in 0..9 {
2052 assert!(data.get(&i).is_none());
2053 }
2054 for i in 9..26 {
2055 assert!(data.get(&i).is_some());
2056 }
2057 let main_pipeline_watermark = store.watermark("test").unwrap();
2058 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2060 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2061 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2062 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2063 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2064 }
2065
2066 #[tokio::test]
2069 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2070 let registry = Registry::new();
2071 let store = MockStore::default();
2072 let mut conn = store.connect().await.unwrap();
2073 conn.set_committer_watermark(
2075 ControllableHandler::NAME,
2076 CommitterWatermark {
2077 checkpoint_hi_inclusive: 11,
2078 ..Default::default()
2079 },
2080 )
2081 .await
2082 .unwrap();
2083
2084 let temp_dir = tempfile::tempdir().unwrap();
2086 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2087 ingestion_dir: temp_dir.path().to_owned(),
2088 starting_checkpoint: 0,
2089 num_checkpoints: 501,
2090 checkpoint_size: 2,
2091 })
2092 .await;
2093 let indexer_args = IndexerArgs {
2094 first_checkpoint: Some(0),
2095 last_checkpoint: Some(500),
2096 pipeline: vec![],
2097 task: TaskArgs::tasked("task".to_string(), 10 ),
2098 };
2099 let client_args = ClientArgs {
2100 ingestion: IngestionClientArgs {
2101 local_ingestion_path: Some(temp_dir.path().to_owned()),
2102 ..Default::default()
2103 },
2104 ..Default::default()
2105 };
2106 let ingestion_config = IngestionConfig::default();
2107 let mut tasked_indexer = Indexer::new(
2108 store.clone(),
2109 indexer_args,
2110 client_args,
2111 ingestion_config,
2112 None,
2113 ®istry,
2114 )
2115 .await
2116 .unwrap();
2117 let mut allow_process = 10;
2118 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2120 let _ = tasked_indexer
2121 .concurrent_pipeline(
2122 controllable_handler,
2123 ConcurrentConfig {
2124 committer: CommitterConfig {
2125 collect_interval_ms: 10,
2126 watermark_interval_ms: 10,
2127 ..Default::default()
2128 },
2129 ..Default::default()
2130 },
2131 )
2132 .await;
2133 let metrics = tasked_indexer.indexer_metrics().clone();
2134
2135 let mut s_indexer = tasked_indexer.run().await.unwrap();
2136
2137 store
2141 .wait_for_watermark(
2142 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2143 10,
2144 std::time::Duration::from_secs(10),
2145 )
2146 .await;
2147
2148 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2153 .await
2154 .unwrap();
2155
2156 let reader_lo = metrics
2157 .collector_reader_lo
2158 .with_label_values(&[ControllableHandler::NAME]);
2159
2160 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2164 while reader_lo.get() != 250 {
2165 interval.tick().await;
2166 allow_process += 1;
2168 assert!(
2169 allow_process <= 500,
2170 "Released all checkpoints but collector never observed new reader_lo"
2171 );
2172 process_below.send(allow_process).ok();
2173 }
2174
2175 process_below.send(500).ok();
2182
2183 s_indexer.join().await.unwrap();
2184
2185 let data = store.data.get(ControllableHandler::NAME).unwrap();
2186
2187 for chkpt in (allow_process + 1)..250 {
2189 assert!(
2190 data.get(&chkpt).is_none(),
2191 "Checkpoint {chkpt} should have been skipped"
2192 );
2193 }
2194
2195 for chkpt in 250..=500 {
2197 assert!(
2198 data.get(&chkpt).is_some(),
2199 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2200 );
2201 }
2202
2203 for chkpt in 0..=10 {
2205 assert!(
2206 data.get(&chkpt).is_some(),
2207 "Checkpoint {chkpt} should have been committed (baseline)"
2208 );
2209 }
2210 }
2211}