1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53 #[arg(long)]
60 pub first_checkpoint: Option<u64>,
61
62 #[arg(long)]
65 pub last_checkpoint: Option<u64>,
66
67 #[arg(long, action = clap::ArgAction::Append)]
70 pub pipeline: Vec<String>,
71
72 #[clap(flatten)]
74 pub task: TaskArgs,
75}
76
77#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80 #[arg(long, requires = "reader_interval_ms")]
92 task: Option<String>,
93
94 #[arg(long, requires = "task")]
105 reader_interval_ms: Option<u64>,
106}
107
108pub struct Indexer<S: Store> {
109 store: S,
113
114 metrics: Arc<IndexerMetrics>,
116
117 ingestion_service: IngestionService,
119
120 default_next_checkpoint: u64,
127
128 last_checkpoint: Option<u64>,
131
132 task: Option<Task>,
144
145 enabled_pipelines: Option<BTreeSet<String>>,
149
150 added_pipelines: BTreeSet<&'static str>,
153
154 first_ingestion_checkpoint: u64,
158
159 pipelines: Vec<Service>,
161}
162
163#[derive(Clone)]
165pub(crate) struct Task {
166 task: String,
169 reader_interval: Duration,
172}
173
174impl TaskArgs {
175 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
176 Self {
177 task: Some(task),
178 reader_interval_ms: Some(reader_interval_ms),
179 }
180 }
181
182 fn into_task(self) -> Option<Task> {
183 Some(Task {
184 task: self.task?,
185 reader_interval: Duration::from_millis(self.reader_interval_ms?),
186 })
187 }
188}
189
190impl<S: Store> Indexer<S> {
191 pub async fn new(
203 store: S,
204 indexer_args: IndexerArgs,
205 client_args: ClientArgs,
206 ingestion_config: IngestionConfig,
207 metrics_prefix: Option<&str>,
208 registry: &Registry,
209 ) -> Result<Self> {
210 let IndexerArgs {
211 first_checkpoint,
212 last_checkpoint,
213 pipeline,
214 task,
215 } = indexer_args;
216
217 let metrics = IndexerMetrics::new(metrics_prefix, registry);
218
219 let ingestion_service =
220 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
221
222 Ok(Self {
223 store,
224 metrics,
225 ingestion_service,
226 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
227 last_checkpoint,
228 task: task.into_task(),
229 enabled_pipelines: if pipeline.is_empty() {
230 None
231 } else {
232 Some(pipeline.into_iter().collect())
233 },
234 added_pipelines: BTreeSet::new(),
235 first_ingestion_checkpoint: u64::MAX,
236 pipelines: vec![],
237 })
238 }
239
240 pub fn store(&self) -> &S {
242 &self.store
243 }
244
245 pub fn ingestion_client(&self) -> &IngestionClient {
247 self.ingestion_service.ingestion_client()
248 }
249
250 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
252 &self.metrics
253 }
254
255 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
257 self.ingestion_service.metrics()
258 }
259
260 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
262 self.added_pipelines.iter().copied().filter(|p| {
263 self.enabled_pipelines
264 .as_ref()
265 .is_none_or(|e| e.contains(*p))
266 })
267 }
268
269 pub fn initial_commit_hi(&self) -> Option<u64> {
273 (self.first_ingestion_checkpoint != u64::MAX).then_some(self.first_ingestion_checkpoint)
274 }
275
276 pub async fn concurrent_pipeline<H>(
283 &mut self,
284 handler: H,
285 config: ConcurrentConfig,
286 ) -> Result<()>
287 where
288 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
289 {
290 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
291 return Ok(());
292 };
293
294 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
295 self.pipelines.push(concurrent::pipeline::<H>(
296 handler,
297 next_checkpoint,
298 config,
299 self.store.clone(),
300 self.task.clone(),
301 checkpoint_rx,
302 commit_hi_tx,
303 self.metrics.clone(),
304 ));
305
306 Ok(())
307 }
308
309 pub async fn run(self) -> Result<Service> {
315 let initial_commit_hi = self.initial_commit_hi();
316
317 if let Some(enabled_pipelines) = self.enabled_pipelines {
318 ensure!(
319 enabled_pipelines.is_empty(),
320 "Tried to enable pipelines that this indexer does not know about: \
321 {enabled_pipelines:#?}",
322 );
323 }
324
325 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
326
327 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
328
329 let mut service = self
330 .ingestion_service
331 .run(
332 self.first_ingestion_checkpoint..=last_checkpoint,
333 initial_commit_hi,
334 )
335 .await
336 .context("Failed to start ingestion service")?;
337
338 for pipeline in self.pipelines {
339 service = service.merge(pipeline);
340 }
341
342 Ok(service)
343 }
344
345 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
354 ensure!(
355 self.added_pipelines.insert(P::NAME),
356 "Pipeline {:?} already added",
357 P::NAME,
358 );
359
360 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
361 && !enabled_pipelines.remove(P::NAME)
362 {
363 info!(pipeline = P::NAME, "Skipping");
364 return Ok(None);
365 }
366
367 let mut conn = self
368 .store
369 .connect()
370 .await
371 .context("Failed to establish connection to store")?;
372
373 let pipeline_task =
374 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
375
376 let checkpoint_hi_inclusive = conn
377 .init_watermark(&pipeline_task, self.default_next_checkpoint)
378 .await
379 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
380
381 let next_checkpoint =
382 checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
383
384 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
385
386 Ok(Some(next_checkpoint))
387 }
388}
389
390impl<T: TransactionalStore> Indexer<T> {
391 pub async fn sequential_pipeline<H>(
402 &mut self,
403 handler: H,
404 config: SequentialConfig,
405 ) -> Result<()>
406 where
407 H: Handler<Store = T> + Send + Sync + 'static,
408 {
409 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
410 return Ok(());
411 };
412
413 if self.task.is_some() {
414 bail!(
415 "Sequential pipelines do not support pipeline tasks. \
416 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417 Running the same pipeline under a different task would violate these guarantees."
418 );
419 }
420
421 let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
422
423 self.pipelines.push(sequential::pipeline::<H>(
424 handler,
425 next_checkpoint,
426 config,
427 self.store.clone(),
428 checkpoint_rx,
429 watermark_tx,
430 self.metrics.clone(),
431 ));
432
433 Ok(())
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use std::sync::Arc;
440
441 use async_trait::async_trait;
442 use clap::Parser;
443 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
444 use sui_synthetic_ingestion::synthetic_ingestion;
445 use tokio::sync::watch;
446
447 use crate::FieldCount;
448 use crate::ingestion::ingestion_client::IngestionClientArgs;
449 use crate::mocks::store::MockStore;
450 use crate::pipeline::CommitterConfig;
451 use crate::pipeline::Processor;
452 use crate::pipeline::concurrent::ConcurrentConfig;
453 use crate::store::CommitterWatermark;
454
455 use super::*;
456
457 #[allow(dead_code)]
458 #[derive(Clone, FieldCount)]
459 struct MockValue(u64);
460
461 struct ControllableHandler {
463 process_below: watch::Receiver<u64>,
465 }
466
467 impl ControllableHandler {
468 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
469 let (tx, rx) = watch::channel(limit);
470 (Self { process_below: rx }, tx)
471 }
472 }
473
474 #[async_trait]
475 impl Processor for ControllableHandler {
476 const NAME: &'static str = "controllable";
477 const FANOUT: usize = 501;
482 type Value = MockValue;
483
484 async fn process(
485 &self,
486 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
487 ) -> anyhow::Result<Vec<Self::Value>> {
488 let cp_num = checkpoint.summary.sequence_number;
489
490 self.process_below
492 .clone()
493 .wait_for(|&limit| cp_num <= limit)
494 .await
495 .ok();
496
497 Ok(vec![MockValue(cp_num)])
498 }
499 }
500
501 #[async_trait]
502 impl crate::pipeline::concurrent::Handler for ControllableHandler {
503 type Store = MockStore;
504 type Batch = Vec<MockValue>;
505
506 fn batch(
507 &self,
508 batch: &mut Self::Batch,
509 values: &mut std::vec::IntoIter<Self::Value>,
510 ) -> crate::pipeline::concurrent::BatchStatus {
511 batch.extend(values);
512 crate::pipeline::concurrent::BatchStatus::Ready
513 }
514
515 async fn commit<'a>(
516 &self,
517 batch: &Self::Batch,
518 conn: &mut <Self::Store as Store>::Connection<'a>,
519 ) -> anyhow::Result<usize> {
520 for value in batch {
521 conn.0
522 .commit_data(Self::NAME, value.0, vec![value.0])
523 .await?;
524 }
525 Ok(batch.len())
526 }
527 }
528
529 macro_rules! test_pipeline {
530 ($handler:ident, $name:literal) => {
531 struct $handler;
532
533 #[async_trait]
534 impl Processor for $handler {
535 const NAME: &'static str = $name;
536 type Value = MockValue;
537 async fn process(
538 &self,
539 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
540 ) -> anyhow::Result<Vec<Self::Value>> {
541 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
542 }
543 }
544
545 #[async_trait]
546 impl crate::pipeline::concurrent::Handler for $handler {
547 type Store = MockStore;
548 type Batch = Vec<Self::Value>;
549
550 fn batch(
551 &self,
552 batch: &mut Self::Batch,
553 values: &mut std::vec::IntoIter<Self::Value>,
554 ) -> crate::pipeline::concurrent::BatchStatus {
555 batch.extend(values);
556 crate::pipeline::concurrent::BatchStatus::Pending
557 }
558
559 async fn commit<'a>(
560 &self,
561 batch: &Self::Batch,
562 conn: &mut <Self::Store as Store>::Connection<'a>,
563 ) -> anyhow::Result<usize> {
564 for value in batch {
565 conn.0
566 .commit_data(Self::NAME, value.0, vec![value.0])
567 .await?;
568 }
569 Ok(batch.len())
570 }
571 }
572
573 #[async_trait]
574 impl crate::pipeline::sequential::Handler for $handler {
575 type Store = MockStore;
576 type Batch = Vec<Self::Value>;
577
578 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
579 batch.extend(values);
580 }
581
582 async fn commit<'a>(
583 &self,
584 _batch: &Self::Batch,
585 _conn: &mut <Self::Store as Store>::Connection<'a>,
586 ) -> anyhow::Result<usize> {
587 Ok(1)
588 }
589 }
590 };
591 }
592
593 test_pipeline!(MockHandler, "test_processor");
594 test_pipeline!(SequentialHandler, "sequential_handler");
595 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
596
597 async fn test_init_watermark(
598 first_checkpoint: Option<u64>,
599 is_concurrent: bool,
600 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
601 let registry = Registry::new();
602 let store = MockStore::default();
603
604 test_pipeline!(A, "pipeline_name");
605
606 let mut conn = store.connect().await.unwrap();
607
608 let indexer_args = IndexerArgs {
609 first_checkpoint,
610 ..IndexerArgs::default()
611 };
612 let temp_dir = tempfile::tempdir().unwrap();
613 let client_args = ClientArgs {
614 ingestion: IngestionClientArgs {
615 local_ingestion_path: Some(temp_dir.path().to_owned()),
616 ..Default::default()
617 },
618 ..Default::default()
619 };
620 let ingestion_config = IngestionConfig::default();
621
622 let mut indexer = Indexer::new(
623 store.clone(),
624 indexer_args,
625 client_args,
626 ingestion_config,
627 None,
628 ®istry,
629 )
630 .await
631 .unwrap();
632
633 if is_concurrent {
634 indexer
635 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
636 .await
637 .unwrap();
638 } else {
639 indexer
640 .sequential_pipeline::<A>(A, SequentialConfig::default())
641 .await
642 .unwrap();
643 }
644
645 (
646 conn.committer_watermark(A::NAME).await.unwrap(),
647 conn.pruner_watermark(A::NAME, Duration::ZERO)
648 .await
649 .unwrap(),
650 )
651 }
652
653 #[test]
654 fn test_arg_parsing() {
655 #[derive(Parser)]
656 struct Args {
657 #[clap(flatten)]
658 indexer: IndexerArgs,
659 }
660
661 let args = Args::try_parse_from([
662 "cmd",
663 "--first-checkpoint",
664 "10",
665 "--last-checkpoint",
666 "100",
667 "--pipeline",
668 "a",
669 "--pipeline",
670 "b",
671 "--task",
672 "t",
673 "--reader-interval-ms",
674 "5000",
675 ])
676 .unwrap();
677
678 assert_eq!(args.indexer.first_checkpoint, Some(10));
679 assert_eq!(args.indexer.last_checkpoint, Some(100));
680 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
681 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
682 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
683 }
684
685 #[tokio::test]
687 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
688 let registry = Registry::new();
689 let store = MockStore::default();
690
691 test_pipeline!(A, "concurrent_a");
692 test_pipeline!(B, "concurrent_b");
693 test_pipeline!(C, "sequential_c");
694 test_pipeline!(D, "sequential_d");
695
696 let mut conn = store.connect().await.unwrap();
697 conn.set_committer_watermark(
698 A::NAME,
699 CommitterWatermark {
700 checkpoint_hi_inclusive: 100,
701 ..Default::default()
702 },
703 )
704 .await
705 .unwrap();
706 conn.set_committer_watermark(
707 B::NAME,
708 CommitterWatermark {
709 checkpoint_hi_inclusive: 10,
710 ..Default::default()
711 },
712 )
713 .await
714 .unwrap();
715 conn.set_committer_watermark(
716 C::NAME,
717 CommitterWatermark {
718 checkpoint_hi_inclusive: 1,
719 ..Default::default()
720 },
721 )
722 .await
723 .unwrap();
724 conn.set_committer_watermark(
725 D::NAME,
726 CommitterWatermark {
727 checkpoint_hi_inclusive: 50,
728 ..Default::default()
729 },
730 )
731 .await
732 .unwrap();
733
734 let indexer_args = IndexerArgs::default();
735 let temp_dir = tempfile::tempdir().unwrap();
736 let client_args = ClientArgs {
737 ingestion: IngestionClientArgs {
738 local_ingestion_path: Some(temp_dir.path().to_owned()),
739 ..Default::default()
740 },
741 ..Default::default()
742 };
743 let ingestion_config = IngestionConfig::default();
744
745 let mut indexer = Indexer::new(
746 store,
747 indexer_args,
748 client_args,
749 ingestion_config,
750 None,
751 ®istry,
752 )
753 .await
754 .unwrap();
755
756 indexer
757 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
758 .await
759 .unwrap();
760 indexer
761 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
762 .await
763 .unwrap();
764 indexer
765 .sequential_pipeline::<C>(C, SequentialConfig::default())
766 .await
767 .unwrap();
768 indexer
769 .sequential_pipeline::<D>(D, SequentialConfig::default())
770 .await
771 .unwrap();
772
773 assert_eq!(indexer.first_ingestion_checkpoint, 2);
774 }
775
776 #[tokio::test]
778 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
779 let registry = Registry::new();
780 let store = MockStore::default();
781
782 test_pipeline!(A, "concurrent_a");
783 test_pipeline!(B, "concurrent_b");
784 test_pipeline!(C, "sequential_c");
785 test_pipeline!(D, "sequential_d");
786
787 let mut conn = store.connect().await.unwrap();
788 conn.set_committer_watermark(
789 B::NAME,
790 CommitterWatermark {
791 checkpoint_hi_inclusive: 10,
792 ..Default::default()
793 },
794 )
795 .await
796 .unwrap();
797 conn.set_committer_watermark(
798 C::NAME,
799 CommitterWatermark {
800 checkpoint_hi_inclusive: 1,
801 ..Default::default()
802 },
803 )
804 .await
805 .unwrap();
806
807 let indexer_args = IndexerArgs::default();
808 let temp_dir = tempfile::tempdir().unwrap();
809 let client_args = ClientArgs {
810 ingestion: IngestionClientArgs {
811 local_ingestion_path: Some(temp_dir.path().to_owned()),
812 ..Default::default()
813 },
814 ..Default::default()
815 };
816 let ingestion_config = IngestionConfig::default();
817
818 let mut indexer = Indexer::new(
819 store,
820 indexer_args,
821 client_args,
822 ingestion_config,
823 None,
824 ®istry,
825 )
826 .await
827 .unwrap();
828
829 indexer
830 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
831 .await
832 .unwrap();
833 indexer
834 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
835 .await
836 .unwrap();
837 indexer
838 .sequential_pipeline::<C>(C, SequentialConfig::default())
839 .await
840 .unwrap();
841 indexer
842 .sequential_pipeline::<D>(D, SequentialConfig::default())
843 .await
844 .unwrap();
845
846 assert_eq!(indexer.first_ingestion_checkpoint, 0);
847 }
848
849 #[tokio::test]
851 async fn test_first_ingestion_checkpoint_smallest_is_0() {
852 let registry = Registry::new();
853 let store = MockStore::default();
854
855 test_pipeline!(A, "concurrent_a");
856 test_pipeline!(B, "concurrent_b");
857 test_pipeline!(C, "sequential_c");
858 test_pipeline!(D, "sequential_d");
859
860 let mut conn = store.connect().await.unwrap();
861 conn.set_committer_watermark(
862 A::NAME,
863 CommitterWatermark {
864 checkpoint_hi_inclusive: 100,
865 ..Default::default()
866 },
867 )
868 .await
869 .unwrap();
870 conn.set_committer_watermark(
871 B::NAME,
872 CommitterWatermark {
873 checkpoint_hi_inclusive: 10,
874 ..Default::default()
875 },
876 )
877 .await
878 .unwrap();
879 conn.set_committer_watermark(
880 C::NAME,
881 CommitterWatermark {
882 checkpoint_hi_inclusive: 1,
883 ..Default::default()
884 },
885 )
886 .await
887 .unwrap();
888 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
889 .await
890 .unwrap();
891
892 let indexer_args = IndexerArgs::default();
893 let temp_dir = tempfile::tempdir().unwrap();
894 let client_args = ClientArgs {
895 ingestion: IngestionClientArgs {
896 local_ingestion_path: Some(temp_dir.path().to_owned()),
897 ..Default::default()
898 },
899 ..Default::default()
900 };
901 let ingestion_config = IngestionConfig::default();
902
903 let mut indexer = Indexer::new(
904 store,
905 indexer_args,
906 client_args,
907 ingestion_config,
908 None,
909 ®istry,
910 )
911 .await
912 .unwrap();
913
914 indexer
915 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
916 .await
917 .unwrap();
918 indexer
919 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
920 .await
921 .unwrap();
922 indexer
923 .sequential_pipeline::<C>(C, SequentialConfig::default())
924 .await
925 .unwrap();
926 indexer
927 .sequential_pipeline::<D>(D, SequentialConfig::default())
928 .await
929 .unwrap();
930
931 assert_eq!(indexer.first_ingestion_checkpoint, 1);
932 }
933
934 #[tokio::test]
937 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
938 let registry = Registry::new();
939 let store = MockStore::default();
940
941 test_pipeline!(A, "concurrent_a");
942 test_pipeline!(B, "concurrent_b");
943 test_pipeline!(C, "sequential_c");
944 test_pipeline!(D, "sequential_d");
945
946 let mut conn = store.connect().await.unwrap();
947 conn.set_committer_watermark(
948 B::NAME,
949 CommitterWatermark {
950 checkpoint_hi_inclusive: 50,
951 ..Default::default()
952 },
953 )
954 .await
955 .unwrap();
956 conn.set_committer_watermark(
957 C::NAME,
958 CommitterWatermark {
959 checkpoint_hi_inclusive: 10,
960 ..Default::default()
961 },
962 )
963 .await
964 .unwrap();
965
966 let indexer_args = IndexerArgs {
967 first_checkpoint: Some(5),
968 ..Default::default()
969 };
970 let temp_dir = tempfile::tempdir().unwrap();
971 let client_args = ClientArgs {
972 ingestion: IngestionClientArgs {
973 local_ingestion_path: Some(temp_dir.path().to_owned()),
974 ..Default::default()
975 },
976 ..Default::default()
977 };
978 let ingestion_config = IngestionConfig::default();
979
980 let mut indexer = Indexer::new(
981 store,
982 indexer_args,
983 client_args,
984 ingestion_config,
985 None,
986 ®istry,
987 )
988 .await
989 .unwrap();
990
991 indexer
992 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
993 .await
994 .unwrap();
995 indexer
996 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
997 .await
998 .unwrap();
999 indexer
1000 .sequential_pipeline::<C>(C, SequentialConfig::default())
1001 .await
1002 .unwrap();
1003 indexer
1004 .sequential_pipeline::<D>(D, SequentialConfig::default())
1005 .await
1006 .unwrap();
1007
1008 assert_eq!(indexer.first_ingestion_checkpoint, 5);
1009 }
1010
1011 #[tokio::test]
1014 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1015 let registry = Registry::new();
1016 let store = MockStore::default();
1017
1018 test_pipeline!(B, "concurrent_b");
1019 test_pipeline!(C, "sequential_c");
1020
1021 let mut conn = store.connect().await.unwrap();
1022 conn.set_committer_watermark(
1023 B::NAME,
1024 CommitterWatermark {
1025 checkpoint_hi_inclusive: 50,
1026 ..Default::default()
1027 },
1028 )
1029 .await
1030 .unwrap();
1031 conn.set_committer_watermark(
1032 C::NAME,
1033 CommitterWatermark {
1034 checkpoint_hi_inclusive: 10,
1035 ..Default::default()
1036 },
1037 )
1038 .await
1039 .unwrap();
1040
1041 let indexer_args = IndexerArgs {
1042 first_checkpoint: Some(5),
1043 ..Default::default()
1044 };
1045 let temp_dir = tempfile::tempdir().unwrap();
1046 let client_args = ClientArgs {
1047 ingestion: IngestionClientArgs {
1048 local_ingestion_path: Some(temp_dir.path().to_owned()),
1049 ..Default::default()
1050 },
1051 ..Default::default()
1052 };
1053 let ingestion_config = IngestionConfig::default();
1054
1055 let mut indexer = Indexer::new(
1056 store,
1057 indexer_args,
1058 client_args,
1059 ingestion_config,
1060 None,
1061 ®istry,
1062 )
1063 .await
1064 .unwrap();
1065
1066 indexer
1067 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1068 .await
1069 .unwrap();
1070 indexer
1071 .sequential_pipeline::<C>(C, SequentialConfig::default())
1072 .await
1073 .unwrap();
1074
1075 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1076 }
1077
1078 #[tokio::test]
1082 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1083 let registry = Registry::new();
1084 let store = MockStore::default();
1085
1086 test_pipeline!(A, "concurrent_a");
1087 test_pipeline!(B, "concurrent_b");
1088 test_pipeline!(C, "sequential_c");
1089
1090 let mut conn = store.connect().await.unwrap();
1091 conn.set_committer_watermark(
1092 B::NAME,
1093 CommitterWatermark {
1094 checkpoint_hi_inclusive: 50,
1095 ..Default::default()
1096 },
1097 )
1098 .await
1099 .unwrap();
1100 conn.set_committer_watermark(
1101 C::NAME,
1102 CommitterWatermark {
1103 checkpoint_hi_inclusive: 10,
1104 ..Default::default()
1105 },
1106 )
1107 .await
1108 .unwrap();
1109
1110 let indexer_args = IndexerArgs {
1111 first_checkpoint: Some(24),
1112 ..Default::default()
1113 };
1114 let temp_dir = tempfile::tempdir().unwrap();
1115 let client_args = ClientArgs {
1116 ingestion: IngestionClientArgs {
1117 local_ingestion_path: Some(temp_dir.path().to_owned()),
1118 ..Default::default()
1119 },
1120 ..Default::default()
1121 };
1122 let ingestion_config = IngestionConfig::default();
1123
1124 let mut indexer = Indexer::new(
1125 store,
1126 indexer_args,
1127 client_args,
1128 ingestion_config,
1129 None,
1130 ®istry,
1131 )
1132 .await
1133 .unwrap();
1134
1135 indexer
1136 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1137 .await
1138 .unwrap();
1139
1140 indexer
1141 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1142 .await
1143 .unwrap();
1144
1145 indexer
1146 .sequential_pipeline::<C>(C, SequentialConfig::default())
1147 .await
1148 .unwrap();
1149
1150 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1151 }
1152
1153 #[tokio::test]
1155 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1156 let registry = Registry::new();
1157 let store = MockStore::default();
1158
1159 test_pipeline!(A, "concurrent_a");
1160 test_pipeline!(B, "concurrent_b");
1161 test_pipeline!(C, "sequential_c");
1162 test_pipeline!(D, "sequential_d");
1163
1164 let mut conn = store.connect().await.unwrap();
1165 conn.set_committer_watermark(
1166 A::NAME,
1167 CommitterWatermark {
1168 checkpoint_hi_inclusive: 5,
1169 ..Default::default()
1170 },
1171 )
1172 .await
1173 .unwrap();
1174 conn.set_committer_watermark(
1175 B::NAME,
1176 CommitterWatermark {
1177 checkpoint_hi_inclusive: 10,
1178 ..Default::default()
1179 },
1180 )
1181 .await
1182 .unwrap();
1183 conn.set_committer_watermark(
1184 C::NAME,
1185 CommitterWatermark {
1186 checkpoint_hi_inclusive: 15,
1187 ..Default::default()
1188 },
1189 )
1190 .await
1191 .unwrap();
1192 conn.set_committer_watermark(
1193 D::NAME,
1194 CommitterWatermark {
1195 checkpoint_hi_inclusive: 20,
1196 ..Default::default()
1197 },
1198 )
1199 .await
1200 .unwrap();
1201
1202 let temp_dir = tempfile::tempdir().unwrap();
1204 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1205 ingestion_dir: temp_dir.path().to_owned(),
1206 starting_checkpoint: 5,
1207 num_checkpoints: 25,
1208 checkpoint_size: 1,
1209 })
1210 .await;
1211
1212 let indexer_args = IndexerArgs {
1213 last_checkpoint: Some(29),
1214 ..Default::default()
1215 };
1216
1217 let client_args = ClientArgs {
1218 ingestion: IngestionClientArgs {
1219 local_ingestion_path: Some(temp_dir.path().to_owned()),
1220 ..Default::default()
1221 },
1222 ..Default::default()
1223 };
1224
1225 let ingestion_config = IngestionConfig::default();
1226
1227 let mut indexer = Indexer::new(
1228 store.clone(),
1229 indexer_args,
1230 client_args,
1231 ingestion_config,
1232 None,
1233 ®istry,
1234 )
1235 .await
1236 .unwrap();
1237
1238 indexer
1239 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1240 .await
1241 .unwrap();
1242 indexer
1243 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1244 .await
1245 .unwrap();
1246 indexer
1247 .sequential_pipeline::<C>(C, SequentialConfig::default())
1248 .await
1249 .unwrap();
1250 indexer
1251 .sequential_pipeline::<D>(D, SequentialConfig::default())
1252 .await
1253 .unwrap();
1254
1255 let ingestion_metrics = indexer.ingestion_metrics().clone();
1256 let indexer_metrics = indexer.indexer_metrics().clone();
1257
1258 indexer.run().await.unwrap().join().await.unwrap();
1259
1260 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1261 assert_eq!(
1262 indexer_metrics
1263 .total_watermarks_out_of_order
1264 .get_metric_with_label_values(&[A::NAME])
1265 .unwrap()
1266 .get(),
1267 0
1268 );
1269 assert_eq!(
1270 indexer_metrics
1271 .total_watermarks_out_of_order
1272 .get_metric_with_label_values(&[B::NAME])
1273 .unwrap()
1274 .get(),
1275 5
1276 );
1277 assert_eq!(
1278 indexer_metrics
1279 .total_watermarks_out_of_order
1280 .get_metric_with_label_values(&[C::NAME])
1281 .unwrap()
1282 .get(),
1283 10
1284 );
1285 assert_eq!(
1286 indexer_metrics
1287 .total_watermarks_out_of_order
1288 .get_metric_with_label_values(&[D::NAME])
1289 .unwrap()
1290 .get(),
1291 15
1292 );
1293 }
1294
1295 #[tokio::test]
1297 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1298 let registry = Registry::new();
1299 let store = MockStore::default();
1300
1301 test_pipeline!(A, "concurrent_a");
1302 test_pipeline!(B, "concurrent_b");
1303 test_pipeline!(C, "sequential_c");
1304 test_pipeline!(D, "sequential_d");
1305
1306 let mut conn = store.connect().await.unwrap();
1307 conn.set_committer_watermark(
1308 A::NAME,
1309 CommitterWatermark {
1310 checkpoint_hi_inclusive: 5,
1311 ..Default::default()
1312 },
1313 )
1314 .await
1315 .unwrap();
1316 conn.set_committer_watermark(
1317 B::NAME,
1318 CommitterWatermark {
1319 checkpoint_hi_inclusive: 10,
1320 ..Default::default()
1321 },
1322 )
1323 .await
1324 .unwrap();
1325 conn.set_committer_watermark(
1326 C::NAME,
1327 CommitterWatermark {
1328 checkpoint_hi_inclusive: 15,
1329 ..Default::default()
1330 },
1331 )
1332 .await
1333 .unwrap();
1334 conn.set_committer_watermark(
1335 D::NAME,
1336 CommitterWatermark {
1337 checkpoint_hi_inclusive: 20,
1338 ..Default::default()
1339 },
1340 )
1341 .await
1342 .unwrap();
1343
1344 let temp_dir = tempfile::tempdir().unwrap();
1346 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1347 ingestion_dir: temp_dir.path().to_owned(),
1348 starting_checkpoint: 5,
1349 num_checkpoints: 25,
1350 checkpoint_size: 1,
1351 })
1352 .await;
1353
1354 let indexer_args = IndexerArgs {
1355 first_checkpoint: Some(3),
1356 last_checkpoint: Some(29),
1357 ..Default::default()
1358 };
1359
1360 let client_args = ClientArgs {
1361 ingestion: IngestionClientArgs {
1362 local_ingestion_path: Some(temp_dir.path().to_owned()),
1363 ..Default::default()
1364 },
1365 ..Default::default()
1366 };
1367
1368 let ingestion_config = IngestionConfig::default();
1369
1370 let mut indexer = Indexer::new(
1371 store.clone(),
1372 indexer_args,
1373 client_args,
1374 ingestion_config,
1375 None,
1376 ®istry,
1377 )
1378 .await
1379 .unwrap();
1380
1381 indexer
1382 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1383 .await
1384 .unwrap();
1385 indexer
1386 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1387 .await
1388 .unwrap();
1389 indexer
1390 .sequential_pipeline::<C>(C, SequentialConfig::default())
1391 .await
1392 .unwrap();
1393 indexer
1394 .sequential_pipeline::<D>(D, SequentialConfig::default())
1395 .await
1396 .unwrap();
1397
1398 let ingestion_metrics = indexer.ingestion_metrics().clone();
1399 let metrics = indexer.indexer_metrics().clone();
1400 indexer.run().await.unwrap().join().await.unwrap();
1401
1402 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1403 assert_eq!(
1404 metrics
1405 .total_watermarks_out_of_order
1406 .get_metric_with_label_values(&[A::NAME])
1407 .unwrap()
1408 .get(),
1409 0
1410 );
1411 assert_eq!(
1412 metrics
1413 .total_watermarks_out_of_order
1414 .get_metric_with_label_values(&[B::NAME])
1415 .unwrap()
1416 .get(),
1417 5
1418 );
1419 assert_eq!(
1420 metrics
1421 .total_watermarks_out_of_order
1422 .get_metric_with_label_values(&[C::NAME])
1423 .unwrap()
1424 .get(),
1425 10
1426 );
1427 assert_eq!(
1428 metrics
1429 .total_watermarks_out_of_order
1430 .get_metric_with_label_values(&[D::NAME])
1431 .unwrap()
1432 .get(),
1433 15
1434 );
1435 }
1436
1437 #[tokio::test]
1439 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1440 let registry = Registry::new();
1441 let store = MockStore::default();
1442
1443 test_pipeline!(A, "concurrent_a");
1444 test_pipeline!(B, "concurrent_b");
1445 test_pipeline!(C, "sequential_c");
1446 test_pipeline!(D, "sequential_d");
1447
1448 let mut conn = store.connect().await.unwrap();
1449 conn.set_committer_watermark(
1450 B::NAME,
1451 CommitterWatermark {
1452 checkpoint_hi_inclusive: 10,
1453 ..Default::default()
1454 },
1455 )
1456 .await
1457 .unwrap();
1458 conn.set_committer_watermark(
1459 C::NAME,
1460 CommitterWatermark {
1461 checkpoint_hi_inclusive: 15,
1462 ..Default::default()
1463 },
1464 )
1465 .await
1466 .unwrap();
1467 conn.set_committer_watermark(
1468 D::NAME,
1469 CommitterWatermark {
1470 checkpoint_hi_inclusive: 20,
1471 ..Default::default()
1472 },
1473 )
1474 .await
1475 .unwrap();
1476
1477 let temp_dir = tempfile::tempdir().unwrap();
1479 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1480 ingestion_dir: temp_dir.path().to_owned(),
1481 starting_checkpoint: 0,
1482 num_checkpoints: 30,
1483 checkpoint_size: 1,
1484 })
1485 .await;
1486
1487 let indexer_args = IndexerArgs {
1488 last_checkpoint: Some(29),
1489 ..Default::default()
1490 };
1491
1492 let client_args = ClientArgs {
1493 ingestion: IngestionClientArgs {
1494 local_ingestion_path: Some(temp_dir.path().to_owned()),
1495 ..Default::default()
1496 },
1497 ..Default::default()
1498 };
1499
1500 let ingestion_config = IngestionConfig::default();
1501
1502 let mut indexer = Indexer::new(
1503 store.clone(),
1504 indexer_args,
1505 client_args,
1506 ingestion_config,
1507 None,
1508 ®istry,
1509 )
1510 .await
1511 .unwrap();
1512
1513 indexer
1514 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1515 .await
1516 .unwrap();
1517 indexer
1518 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1519 .await
1520 .unwrap();
1521 indexer
1522 .sequential_pipeline::<C>(C, SequentialConfig::default())
1523 .await
1524 .unwrap();
1525 indexer
1526 .sequential_pipeline::<D>(D, SequentialConfig::default())
1527 .await
1528 .unwrap();
1529
1530 let ingestion_metrics = indexer.ingestion_metrics().clone();
1531 let metrics = indexer.indexer_metrics().clone();
1532 indexer.run().await.unwrap().join().await.unwrap();
1533
1534 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1535 assert_eq!(
1536 metrics
1537 .total_watermarks_out_of_order
1538 .get_metric_with_label_values(&[A::NAME])
1539 .unwrap()
1540 .get(),
1541 0
1542 );
1543 assert_eq!(
1544 metrics
1545 .total_watermarks_out_of_order
1546 .get_metric_with_label_values(&[B::NAME])
1547 .unwrap()
1548 .get(),
1549 11
1550 );
1551 assert_eq!(
1552 metrics
1553 .total_watermarks_out_of_order
1554 .get_metric_with_label_values(&[C::NAME])
1555 .unwrap()
1556 .get(),
1557 16
1558 );
1559 assert_eq!(
1560 metrics
1561 .total_watermarks_out_of_order
1562 .get_metric_with_label_values(&[D::NAME])
1563 .unwrap()
1564 .get(),
1565 21
1566 );
1567 }
1568
1569 #[tokio::test]
1571 async fn test_indexer_ingestion_use_first_checkpoint() {
1572 let registry = Registry::new();
1573 let store = MockStore::default();
1574
1575 test_pipeline!(A, "concurrent_a");
1576 test_pipeline!(B, "concurrent_b");
1577 test_pipeline!(C, "sequential_c");
1578 test_pipeline!(D, "sequential_d");
1579
1580 let mut conn = store.connect().await.unwrap();
1581 conn.set_committer_watermark(
1582 B::NAME,
1583 CommitterWatermark {
1584 checkpoint_hi_inclusive: 10,
1585 ..Default::default()
1586 },
1587 )
1588 .await
1589 .unwrap();
1590 conn.set_committer_watermark(
1591 C::NAME,
1592 CommitterWatermark {
1593 checkpoint_hi_inclusive: 15,
1594 ..Default::default()
1595 },
1596 )
1597 .await
1598 .unwrap();
1599 conn.set_committer_watermark(
1600 D::NAME,
1601 CommitterWatermark {
1602 checkpoint_hi_inclusive: 20,
1603 ..Default::default()
1604 },
1605 )
1606 .await
1607 .unwrap();
1608
1609 let temp_dir = tempfile::tempdir().unwrap();
1611 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1612 ingestion_dir: temp_dir.path().to_owned(),
1613 starting_checkpoint: 5,
1614 num_checkpoints: 25,
1615 checkpoint_size: 1,
1616 })
1617 .await;
1618
1619 let indexer_args = IndexerArgs {
1620 first_checkpoint: Some(10),
1621 last_checkpoint: Some(29),
1622 ..Default::default()
1623 };
1624
1625 let client_args = ClientArgs {
1626 ingestion: IngestionClientArgs {
1627 local_ingestion_path: Some(temp_dir.path().to_owned()),
1628 ..Default::default()
1629 },
1630 ..Default::default()
1631 };
1632
1633 let ingestion_config = IngestionConfig::default();
1634
1635 let mut indexer = Indexer::new(
1636 store.clone(),
1637 indexer_args,
1638 client_args,
1639 ingestion_config,
1640 None,
1641 ®istry,
1642 )
1643 .await
1644 .unwrap();
1645
1646 indexer
1647 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1648 .await
1649 .unwrap();
1650 indexer
1651 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1652 .await
1653 .unwrap();
1654 indexer
1655 .sequential_pipeline::<C>(C, SequentialConfig::default())
1656 .await
1657 .unwrap();
1658 indexer
1659 .sequential_pipeline::<D>(D, SequentialConfig::default())
1660 .await
1661 .unwrap();
1662
1663 let ingestion_metrics = indexer.ingestion_metrics().clone();
1664 let metrics = indexer.indexer_metrics().clone();
1665 indexer.run().await.unwrap().join().await.unwrap();
1666
1667 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1668 assert_eq!(
1669 metrics
1670 .total_watermarks_out_of_order
1671 .get_metric_with_label_values(&[A::NAME])
1672 .unwrap()
1673 .get(),
1674 0
1675 );
1676 assert_eq!(
1677 metrics
1678 .total_watermarks_out_of_order
1679 .get_metric_with_label_values(&[B::NAME])
1680 .unwrap()
1681 .get(),
1682 1
1683 );
1684 assert_eq!(
1685 metrics
1686 .total_watermarks_out_of_order
1687 .get_metric_with_label_values(&[C::NAME])
1688 .unwrap()
1689 .get(),
1690 6
1691 );
1692 assert_eq!(
1693 metrics
1694 .total_watermarks_out_of_order
1695 .get_metric_with_label_values(&[D::NAME])
1696 .unwrap()
1697 .get(),
1698 11
1699 );
1700 }
1701
1702 #[tokio::test]
1703 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1704 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1705 assert_eq!(committer_watermark, None);
1707 assert_eq!(pruner_watermark, None);
1708 }
1709
1710 #[tokio::test]
1711 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1712 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1713 assert_eq!(committer_watermark, None);
1715 assert_eq!(pruner_watermark, None);
1716 }
1717
1718 #[tokio::test]
1719 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1720 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1721
1722 let committer_watermark = committer_watermark.unwrap();
1723 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1724
1725 let pruner_watermark = pruner_watermark.unwrap();
1726 assert_eq!(pruner_watermark.reader_lo, 1);
1727 assert_eq!(pruner_watermark.pruner_hi, 1);
1728 }
1729
1730 #[tokio::test]
1731 async fn test_init_watermark_sequential() {
1732 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1733
1734 let committer_watermark = committer_watermark.unwrap();
1735 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1736
1737 let pruner_watermark = pruner_watermark.unwrap();
1738 assert_eq!(pruner_watermark.reader_lo, 1);
1739 assert_eq!(pruner_watermark.pruner_hi, 1);
1740 }
1741
1742 #[tokio::test]
1743 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1744 let registry = Registry::new();
1745 let store = MockStore::default();
1746
1747 let mut conn = store.connect().await.unwrap();
1749
1750 conn.set_committer_watermark(
1752 MockHandler::NAME,
1753 CommitterWatermark {
1754 epoch_hi_inclusive: 0,
1755 checkpoint_hi_inclusive: 10,
1756 tx_hi: 20,
1757 timestamp_ms_hi_inclusive: 10000,
1758 },
1759 )
1760 .await
1761 .unwrap();
1762
1763 conn.set_committer_watermark(
1765 SequentialHandler::NAME,
1766 CommitterWatermark {
1767 epoch_hi_inclusive: 0,
1768 checkpoint_hi_inclusive: 5,
1769 tx_hi: 10,
1770 timestamp_ms_hi_inclusive: 5000,
1771 },
1772 )
1773 .await
1774 .unwrap();
1775
1776 let temp_dir = tempfile::tempdir().unwrap();
1778 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1779 ingestion_dir: temp_dir.path().to_owned(),
1780 starting_checkpoint: 0,
1781 num_checkpoints: 20,
1782 checkpoint_size: 2,
1783 })
1784 .await;
1785
1786 let indexer_args = IndexerArgs {
1787 first_checkpoint: None,
1788 last_checkpoint: Some(19),
1789 pipeline: vec![],
1790 ..Default::default()
1791 };
1792
1793 let client_args = ClientArgs {
1794 ingestion: IngestionClientArgs {
1795 local_ingestion_path: Some(temp_dir.path().to_owned()),
1796 ..Default::default()
1797 },
1798 ..Default::default()
1799 };
1800
1801 let ingestion_config = IngestionConfig::default();
1802
1803 let mut indexer = Indexer::new(
1804 store.clone(),
1805 indexer_args,
1806 client_args,
1807 ingestion_config,
1808 None,
1809 ®istry,
1810 )
1811 .await
1812 .unwrap();
1813
1814 indexer
1816 .sequential_pipeline(
1817 MockHandler,
1818 pipeline::sequential::SequentialConfig::default(),
1819 )
1820 .await
1821 .unwrap();
1822
1823 assert_eq!(
1825 indexer.initial_commit_hi(),
1826 Some(11),
1827 "initial_commit_hi should be 11"
1828 );
1829
1830 indexer
1832 .sequential_pipeline(
1833 SequentialHandler,
1834 pipeline::sequential::SequentialConfig::default(),
1835 )
1836 .await
1837 .unwrap();
1838
1839 assert_eq!(
1841 indexer.initial_commit_hi(),
1842 Some(6),
1843 "initial_commit_hi should still be 6"
1844 );
1845
1846 indexer.run().await.unwrap().join().await.unwrap();
1848
1849 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1851 let watermark2 = conn
1852 .committer_watermark(SequentialHandler::NAME)
1853 .await
1854 .unwrap();
1855
1856 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1857 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1858 }
1859
1860 #[tokio::test]
1864 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1865 let registry = Registry::new();
1866 let store = MockStore::default();
1867
1868 let mut conn = store.connect().await.unwrap();
1871 conn.set_committer_watermark(
1872 MockCheckpointSequenceNumberHandler::NAME,
1873 CommitterWatermark {
1874 checkpoint_hi_inclusive: 10,
1875 ..Default::default()
1876 },
1877 )
1878 .await
1879 .unwrap();
1880 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1881 .await
1882 .unwrap();
1883
1884 let indexer_args = IndexerArgs {
1887 first_checkpoint: Some(0),
1888 last_checkpoint: Some(15),
1889 pipeline: vec![],
1890 task: TaskArgs::tasked("task".to_string(), 10),
1891 };
1892 let temp_dir = tempfile::tempdir().unwrap();
1893 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1894 ingestion_dir: temp_dir.path().to_owned(),
1895 starting_checkpoint: 0,
1896 num_checkpoints: 16,
1897 checkpoint_size: 2,
1898 })
1899 .await;
1900
1901 let client_args = ClientArgs {
1902 ingestion: IngestionClientArgs {
1903 local_ingestion_path: Some(temp_dir.path().to_owned()),
1904 ..Default::default()
1905 },
1906 ..Default::default()
1907 };
1908
1909 let ingestion_config = IngestionConfig::default();
1910
1911 let mut tasked_indexer = Indexer::new(
1912 store.clone(),
1913 indexer_args,
1914 client_args,
1915 ingestion_config,
1916 None,
1917 ®istry,
1918 )
1919 .await
1920 .unwrap();
1921
1922 let _ = tasked_indexer
1923 .concurrent_pipeline(
1924 MockCheckpointSequenceNumberHandler,
1925 ConcurrentConfig::default(),
1926 )
1927 .await;
1928
1929 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1930 let metrics = tasked_indexer.indexer_metrics().clone();
1931
1932 tasked_indexer.run().await.unwrap().join().await.unwrap();
1933
1934 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1935 assert_eq!(
1936 metrics
1937 .total_collector_skipped_checkpoints
1938 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1939 .unwrap()
1940 .get(),
1941 7
1942 );
1943 let data = store
1944 .data
1945 .get(MockCheckpointSequenceNumberHandler::NAME)
1946 .unwrap();
1947 assert_eq!(data.len(), 9);
1948 for i in 0..7 {
1949 assert!(data.get(&i).is_none());
1950 }
1951 for i in 7..16 {
1952 assert!(data.get(&i).is_some());
1953 }
1954 }
1955
1956 #[tokio::test]
1958 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1959 let registry = Registry::new();
1960 let store = MockStore::default();
1961
1962 let mut conn = store.connect().await.unwrap();
1963 conn.set_committer_watermark(
1964 "test",
1965 CommitterWatermark {
1966 checkpoint_hi_inclusive: 10,
1967 ..Default::default()
1968 },
1969 )
1970 .await
1971 .unwrap();
1972 conn.set_reader_watermark("test", 5).await.unwrap();
1973
1974 let indexer_args = IndexerArgs {
1977 first_checkpoint: Some(9),
1978 last_checkpoint: Some(25),
1979 pipeline: vec![],
1980 task: TaskArgs::tasked("task".to_string(), 10),
1981 };
1982 let temp_dir = tempfile::tempdir().unwrap();
1983 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1984 ingestion_dir: temp_dir.path().to_owned(),
1985 starting_checkpoint: 9,
1986 num_checkpoints: 17,
1987 checkpoint_size: 2,
1988 })
1989 .await;
1990
1991 let client_args = ClientArgs {
1992 ingestion: IngestionClientArgs {
1993 local_ingestion_path: Some(temp_dir.path().to_owned()),
1994 ..Default::default()
1995 },
1996 ..Default::default()
1997 };
1998
1999 let ingestion_config = IngestionConfig::default();
2000
2001 let mut tasked_indexer = Indexer::new(
2002 store.clone(),
2003 indexer_args,
2004 client_args,
2005 ingestion_config,
2006 None,
2007 ®istry,
2008 )
2009 .await
2010 .unwrap();
2011
2012 let _ = tasked_indexer
2013 .concurrent_pipeline(
2014 MockCheckpointSequenceNumberHandler,
2015 ConcurrentConfig::default(),
2016 )
2017 .await;
2018
2019 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2020 let metrics = tasked_indexer.indexer_metrics().clone();
2021
2022 tasked_indexer.run().await.unwrap().join().await.unwrap();
2023
2024 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2025 assert_eq!(
2026 metrics
2027 .total_watermarks_out_of_order
2028 .get_metric_with_label_values(&["test"])
2029 .unwrap()
2030 .get(),
2031 0
2032 );
2033 assert_eq!(
2034 metrics
2035 .total_collector_skipped_checkpoints
2036 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2037 .unwrap()
2038 .get(),
2039 0
2040 );
2041
2042 let data = store.data.get("test").unwrap();
2043 assert!(data.len() == 17);
2044 for i in 0..9 {
2045 assert!(data.get(&i).is_none());
2046 }
2047 for i in 9..26 {
2048 assert!(data.get(&i).is_some());
2049 }
2050 let main_pipeline_watermark = store.watermark("test").unwrap();
2051 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2053 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2054 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2055 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2056 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2057 }
2058
2059 #[tokio::test]
2062 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2063 let registry = Registry::new();
2064 let store = MockStore::default();
2065 let mut conn = store.connect().await.unwrap();
2066 conn.set_committer_watermark(
2068 ControllableHandler::NAME,
2069 CommitterWatermark {
2070 checkpoint_hi_inclusive: 11,
2071 ..Default::default()
2072 },
2073 )
2074 .await
2075 .unwrap();
2076
2077 let temp_dir = tempfile::tempdir().unwrap();
2079 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2080 ingestion_dir: temp_dir.path().to_owned(),
2081 starting_checkpoint: 0,
2082 num_checkpoints: 501,
2083 checkpoint_size: 2,
2084 })
2085 .await;
2086 let indexer_args = IndexerArgs {
2087 first_checkpoint: Some(0),
2088 last_checkpoint: Some(500),
2089 pipeline: vec![],
2090 task: TaskArgs::tasked("task".to_string(), 10 ),
2091 };
2092 let client_args = ClientArgs {
2093 ingestion: IngestionClientArgs {
2094 local_ingestion_path: Some(temp_dir.path().to_owned()),
2095 ..Default::default()
2096 },
2097 ..Default::default()
2098 };
2099 let ingestion_config = IngestionConfig::default();
2100 let mut tasked_indexer = Indexer::new(
2101 store.clone(),
2102 indexer_args,
2103 client_args,
2104 ingestion_config,
2105 None,
2106 ®istry,
2107 )
2108 .await
2109 .unwrap();
2110 let mut allow_process = 10;
2111 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2113 let _ = tasked_indexer
2114 .concurrent_pipeline(
2115 controllable_handler,
2116 ConcurrentConfig {
2117 committer: CommitterConfig {
2118 collect_interval_ms: 10,
2119 watermark_interval_ms: 10,
2120 ..Default::default()
2121 },
2122 ..Default::default()
2123 },
2124 )
2125 .await;
2126 let metrics = tasked_indexer.indexer_metrics().clone();
2127
2128 let mut s_indexer = tasked_indexer.run().await.unwrap();
2129
2130 store
2134 .wait_for_watermark(
2135 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2136 10,
2137 std::time::Duration::from_secs(10),
2138 )
2139 .await;
2140
2141 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2146 .await
2147 .unwrap();
2148
2149 let reader_lo = metrics
2150 .collector_reader_lo
2151 .with_label_values(&[ControllableHandler::NAME]);
2152
2153 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2157 while reader_lo.get() != 250 {
2158 interval.tick().await;
2159 allow_process += 1;
2161 assert!(
2162 allow_process <= 500,
2163 "Released all checkpoints but collector never observed new reader_lo"
2164 );
2165 process_below.send(allow_process).ok();
2166 }
2167
2168 process_below.send(500).ok();
2175
2176 s_indexer.join().await.unwrap();
2177
2178 let data = store.data.get(ControllableHandler::NAME).unwrap();
2179
2180 for chkpt in (allow_process + 1)..250 {
2182 assert!(
2183 data.get(&chkpt).is_none(),
2184 "Checkpoint {chkpt} should have been skipped"
2185 );
2186 }
2187
2188 for chkpt in 250..=500 {
2190 assert!(
2191 data.get(&chkpt).is_some(),
2192 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2193 );
2194 }
2195
2196 for chkpt in 0..=10 {
2198 assert!(
2199 data.get(&chkpt).is_some(),
2200 "Checkpoint {chkpt} should have been committed (baseline)"
2201 );
2202 }
2203 }
2204}