1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::InitWatermark;
19use sui_indexer_alt_framework_store_traits::Store;
20use sui_indexer_alt_framework_store_traits::TransactionalStore;
21use sui_indexer_alt_framework_store_traits::pipeline_task;
22use tracing::info;
23
24use crate::metrics::IngestionMetrics;
25use crate::pipeline::Processor;
26use crate::pipeline::concurrent::ConcurrentConfig;
27use crate::pipeline::concurrent::{self};
28use crate::pipeline::sequential::Handler;
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55 #[arg(long)]
62 pub first_checkpoint: Option<u64>,
63
64 #[arg(long)]
67 pub last_checkpoint: Option<u64>,
68
69 #[arg(long, action = clap::ArgAction::Append)]
72 pub pipeline: Vec<String>,
73
74 #[clap(flatten)]
76 pub task: TaskArgs,
77}
78
79#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82 #[arg(long, requires = "reader_interval_ms")]
94 task: Option<String>,
95
96 #[arg(long, requires = "task")]
107 reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111 store: S,
115
116 metrics: Arc<IndexerMetrics>,
118
119 ingestion_service: IngestionService,
121
122 default_next_checkpoint: u64,
129
130 last_checkpoint: Option<u64>,
133
134 task: Option<Task>,
146
147 enabled_pipelines: Option<BTreeSet<String>>,
151
152 added_pipelines: BTreeSet<&'static str>,
155
156 first_ingestion_checkpoint: u64,
160
161 next_sequential_checkpoint: Option<u64>,
164
165 pipelines: Vec<Service>,
167}
168
169#[derive(Clone)]
171pub(crate) struct Task {
172 task: String,
175 reader_interval: Duration,
178}
179
180impl TaskArgs {
181 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
182 Self {
183 task: Some(task),
184 reader_interval_ms: Some(reader_interval_ms),
185 }
186 }
187
188 fn into_task(self) -> Option<Task> {
189 Some(Task {
190 task: self.task?,
191 reader_interval: Duration::from_millis(self.reader_interval_ms?),
192 })
193 }
194}
195
196impl<S: Store> Indexer<S> {
197 pub async fn new(
209 store: S,
210 indexer_args: IndexerArgs,
211 client_args: ClientArgs,
212 ingestion_config: IngestionConfig,
213 metrics_prefix: Option<&str>,
214 registry: &Registry,
215 ) -> Result<Self> {
216 let IndexerArgs {
217 first_checkpoint,
218 last_checkpoint,
219 pipeline,
220 task,
221 } = indexer_args;
222
223 let metrics = IndexerMetrics::new(metrics_prefix, registry);
224
225 let ingestion_service =
226 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
227
228 Ok(Self {
229 store,
230 metrics,
231 ingestion_service,
232 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
233 last_checkpoint,
234 task: task.into_task(),
235 enabled_pipelines: if pipeline.is_empty() {
236 None
237 } else {
238 Some(pipeline.into_iter().collect())
239 },
240 added_pipelines: BTreeSet::new(),
241 first_ingestion_checkpoint: u64::MAX,
242 next_sequential_checkpoint: None,
243 pipelines: vec![],
244 })
245 }
246
247 pub fn store(&self) -> &S {
249 &self.store
250 }
251
252 pub fn ingestion_client(&self) -> &IngestionClient {
254 self.ingestion_service.ingestion_client()
255 }
256
257 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
259 &self.metrics
260 }
261
262 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
264 self.ingestion_service.metrics()
265 }
266
267 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
269 self.added_pipelines.iter().copied().filter(|p| {
270 self.enabled_pipelines
271 .as_ref()
272 .is_none_or(|e| e.contains(*p))
273 })
274 }
275
276 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
280 self.next_sequential_checkpoint
281 }
282
283 pub async fn concurrent_pipeline<H>(
290 &mut self,
291 handler: H,
292 config: ConcurrentConfig,
293 ) -> Result<()>
294 where
295 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
296 {
297 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
298 return Ok(());
299 };
300
301 self.pipelines.push(concurrent::pipeline::<H>(
302 handler,
303 next_checkpoint,
304 config,
305 self.store.clone(),
306 self.task.clone(),
307 self.ingestion_service.subscribe().0,
308 self.metrics.clone(),
309 ));
310
311 Ok(())
312 }
313
314 pub async fn run(self) -> Result<Service> {
320 if let Some(enabled_pipelines) = self.enabled_pipelines {
321 ensure!(
322 enabled_pipelines.is_empty(),
323 "Tried to enable pipelines that this indexer does not know about: \
324 {enabled_pipelines:#?}",
325 );
326 }
327
328 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
329
330 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
331
332 let mut service = self
333 .ingestion_service
334 .run(
335 self.first_ingestion_checkpoint..=last_checkpoint,
336 self.next_sequential_checkpoint,
337 )
338 .await
339 .context("Failed to start ingestion service")?;
340
341 for pipeline in self.pipelines {
342 service = service.merge(pipeline);
343 }
344
345 Ok(service)
346 }
347
348 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
357 ensure!(
358 self.added_pipelines.insert(P::NAME),
359 "Pipeline {:?} already added",
360 P::NAME,
361 );
362
363 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
364 && !enabled_pipelines.remove(P::NAME)
365 {
366 info!(pipeline = P::NAME, "Skipping");
367 return Ok(None);
368 }
369
370 let mut conn = self
371 .store
372 .connect()
373 .await
374 .context("Failed to establish connection to store")?;
375
376 let pipeline_task =
377 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
378
379 let InitWatermark {
380 checkpoint_hi_inclusive,
381 reader_lo,
382 } = conn
383 .init_watermark(
384 &pipeline_task,
385 InitWatermark {
386 checkpoint_hi_inclusive: self.default_next_checkpoint.checked_sub(1),
387 reader_lo: self.default_next_checkpoint,
388 },
389 )
390 .await
391 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
392
393 let next_checkpoint = checkpoint_hi_inclusive.map_or(reader_lo, |c| c + 1);
394
395 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
396
397 Ok(Some(next_checkpoint))
398 }
399}
400
401impl<T: TransactionalStore> Indexer<T> {
402 pub async fn sequential_pipeline<H>(
413 &mut self,
414 handler: H,
415 config: SequentialConfig,
416 ) -> Result<()>
417 where
418 H: Handler<Store = T> + Send + Sync + 'static,
419 {
420 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
421 return Ok(());
422 };
423
424 if self.task.is_some() {
425 bail!(
426 "Sequential pipelines do not support pipeline tasks. \
427 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
428 Running the same pipeline under a different task would violate these guarantees."
429 );
430 }
431
432 self.next_sequential_checkpoint = Some(
434 self.next_sequential_checkpoint
435 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
436 );
437
438 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
439
440 self.pipelines.push(sequential::pipeline::<H>(
441 handler,
442 next_checkpoint,
443 config,
444 self.store.clone(),
445 checkpoint_rx,
446 commit_hi_tx,
447 self.metrics.clone(),
448 ));
449
450 Ok(())
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use std::sync::Arc;
457
458 use async_trait::async_trait;
459 use clap::Parser;
460 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
461 use sui_synthetic_ingestion::synthetic_ingestion;
462 use tokio::sync::watch;
463
464 use crate::FieldCount;
465 use crate::config::ConcurrencyConfig;
466 use crate::ingestion::ingestion_client::IngestionClientArgs;
467 use crate::mocks::store::MockStore;
468 use crate::pipeline::CommitterConfig;
469 use crate::pipeline::Processor;
470 use crate::pipeline::concurrent::ConcurrentConfig;
471 use crate::store::CommitterWatermark;
472
473 use super::*;
474
475 #[allow(dead_code)]
476 #[derive(Clone, FieldCount)]
477 struct MockValue(u64);
478
479 struct ControllableHandler {
481 process_below: watch::Receiver<u64>,
483 }
484
485 impl ControllableHandler {
486 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
487 let (tx, rx) = watch::channel(limit);
488 (Self { process_below: rx }, tx)
489 }
490 }
491
492 #[async_trait]
493 impl Processor for ControllableHandler {
494 const NAME: &'static str = "controllable";
495 type Value = MockValue;
496
497 async fn process(
498 &self,
499 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
500 ) -> anyhow::Result<Vec<Self::Value>> {
501 let cp_num = checkpoint.summary.sequence_number;
502
503 self.process_below
505 .clone()
506 .wait_for(|&limit| cp_num <= limit)
507 .await
508 .ok();
509
510 Ok(vec![MockValue(cp_num)])
511 }
512 }
513
514 #[async_trait]
515 impl crate::pipeline::concurrent::Handler for ControllableHandler {
516 type Store = MockStore;
517 type Batch = Vec<MockValue>;
518
519 fn batch(
520 &self,
521 batch: &mut Self::Batch,
522 values: &mut std::vec::IntoIter<Self::Value>,
523 ) -> crate::pipeline::concurrent::BatchStatus {
524 batch.extend(values);
525 crate::pipeline::concurrent::BatchStatus::Ready
526 }
527
528 async fn commit<'a>(
529 &self,
530 batch: &Self::Batch,
531 conn: &mut <Self::Store as Store>::Connection<'a>,
532 ) -> anyhow::Result<usize> {
533 for value in batch {
534 conn.0
535 .commit_data(Self::NAME, value.0, vec![value.0])
536 .await?;
537 }
538 Ok(batch.len())
539 }
540 }
541
542 macro_rules! test_pipeline {
543 ($handler:ident, $name:literal) => {
544 struct $handler;
545
546 #[async_trait]
547 impl Processor for $handler {
548 const NAME: &'static str = $name;
549 type Value = MockValue;
550 async fn process(
551 &self,
552 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
553 ) -> anyhow::Result<Vec<Self::Value>> {
554 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
555 }
556 }
557
558 #[async_trait]
559 impl crate::pipeline::concurrent::Handler for $handler {
560 type Store = MockStore;
561 type Batch = Vec<Self::Value>;
562
563 fn batch(
564 &self,
565 batch: &mut Self::Batch,
566 values: &mut std::vec::IntoIter<Self::Value>,
567 ) -> crate::pipeline::concurrent::BatchStatus {
568 batch.extend(values);
569 crate::pipeline::concurrent::BatchStatus::Pending
570 }
571
572 async fn commit<'a>(
573 &self,
574 batch: &Self::Batch,
575 conn: &mut <Self::Store as Store>::Connection<'a>,
576 ) -> anyhow::Result<usize> {
577 for value in batch {
578 conn.0
579 .commit_data(Self::NAME, value.0, vec![value.0])
580 .await?;
581 }
582 Ok(batch.len())
583 }
584 }
585
586 #[async_trait]
587 impl crate::pipeline::sequential::Handler for $handler {
588 type Store = MockStore;
589 type Batch = Vec<Self::Value>;
590
591 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
592 batch.extend(values);
593 }
594
595 async fn commit<'a>(
596 &self,
597 _batch: &Self::Batch,
598 _conn: &mut <Self::Store as Store>::Connection<'a>,
599 ) -> anyhow::Result<usize> {
600 Ok(1)
601 }
602 }
603 };
604 }
605
606 test_pipeline!(MockHandler, "test_processor");
607 test_pipeline!(SequentialHandler, "sequential_handler");
608 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
609
610 async fn test_init_watermark(
611 first_checkpoint: Option<u64>,
612 is_concurrent: bool,
613 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
614 let registry = Registry::new();
615 let store = MockStore::default();
616
617 test_pipeline!(A, "pipeline_name");
618
619 let mut conn = store.connect().await.unwrap();
620
621 let indexer_args = IndexerArgs {
622 first_checkpoint,
623 ..IndexerArgs::default()
624 };
625 let temp_dir = tempfile::tempdir().unwrap();
626 let client_args = ClientArgs {
627 ingestion: IngestionClientArgs {
628 local_ingestion_path: Some(temp_dir.path().to_owned()),
629 ..Default::default()
630 },
631 ..Default::default()
632 };
633 let ingestion_config = IngestionConfig::default();
634
635 let mut indexer = Indexer::new(
636 store.clone(),
637 indexer_args,
638 client_args,
639 ingestion_config,
640 None,
641 ®istry,
642 )
643 .await
644 .unwrap();
645
646 if is_concurrent {
647 indexer
648 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
649 .await
650 .unwrap();
651 } else {
652 indexer
653 .sequential_pipeline::<A>(A, SequentialConfig::default())
654 .await
655 .unwrap();
656 }
657
658 (
659 conn.committer_watermark(A::NAME).await.unwrap(),
660 conn.pruner_watermark(A::NAME, Duration::ZERO)
661 .await
662 .unwrap(),
663 )
664 }
665
666 #[test]
667 fn test_arg_parsing() {
668 #[derive(Parser)]
669 struct Args {
670 #[clap(flatten)]
671 indexer: IndexerArgs,
672 }
673
674 let args = Args::try_parse_from([
675 "cmd",
676 "--first-checkpoint",
677 "10",
678 "--last-checkpoint",
679 "100",
680 "--pipeline",
681 "a",
682 "--pipeline",
683 "b",
684 "--task",
685 "t",
686 "--reader-interval-ms",
687 "5000",
688 ])
689 .unwrap();
690
691 assert_eq!(args.indexer.first_checkpoint, Some(10));
692 assert_eq!(args.indexer.last_checkpoint, Some(100));
693 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
694 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
695 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
696 }
697
698 #[tokio::test]
700 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
701 let registry = Registry::new();
702 let store = MockStore::default();
703
704 test_pipeline!(A, "concurrent_a");
705 test_pipeline!(B, "concurrent_b");
706 test_pipeline!(C, "sequential_c");
707 test_pipeline!(D, "sequential_d");
708
709 let mut conn = store.connect().await.unwrap();
710
711 conn.init_watermark(A::NAME, InitWatermark::default())
712 .await
713 .unwrap();
714 conn.set_committer_watermark(
715 A::NAME,
716 CommitterWatermark {
717 checkpoint_hi_inclusive: 100,
718 ..Default::default()
719 },
720 )
721 .await
722 .unwrap();
723
724 conn.init_watermark(B::NAME, InitWatermark::default())
725 .await
726 .unwrap();
727 conn.set_committer_watermark(
728 B::NAME,
729 CommitterWatermark {
730 checkpoint_hi_inclusive: 10,
731 ..Default::default()
732 },
733 )
734 .await
735 .unwrap();
736
737 conn.init_watermark(C::NAME, InitWatermark::default())
738 .await
739 .unwrap();
740 conn.set_committer_watermark(
741 C::NAME,
742 CommitterWatermark {
743 checkpoint_hi_inclusive: 1,
744 ..Default::default()
745 },
746 )
747 .await
748 .unwrap();
749
750 conn.init_watermark(D::NAME, InitWatermark::default())
751 .await
752 .unwrap();
753 conn.set_committer_watermark(
754 D::NAME,
755 CommitterWatermark {
756 checkpoint_hi_inclusive: 50,
757 ..Default::default()
758 },
759 )
760 .await
761 .unwrap();
762
763 let indexer_args = IndexerArgs::default();
764 let temp_dir = tempfile::tempdir().unwrap();
765 let client_args = ClientArgs {
766 ingestion: IngestionClientArgs {
767 local_ingestion_path: Some(temp_dir.path().to_owned()),
768 ..Default::default()
769 },
770 ..Default::default()
771 };
772 let ingestion_config = IngestionConfig::default();
773
774 let mut indexer = Indexer::new(
775 store,
776 indexer_args,
777 client_args,
778 ingestion_config,
779 None,
780 ®istry,
781 )
782 .await
783 .unwrap();
784
785 indexer
786 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
787 .await
788 .unwrap();
789 indexer
790 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
791 .await
792 .unwrap();
793 indexer
794 .sequential_pipeline::<C>(C, SequentialConfig::default())
795 .await
796 .unwrap();
797 indexer
798 .sequential_pipeline::<D>(D, SequentialConfig::default())
799 .await
800 .unwrap();
801
802 assert_eq!(indexer.first_ingestion_checkpoint, 2);
803 }
804
805 #[tokio::test]
807 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
808 let registry = Registry::new();
809 let store = MockStore::default();
810
811 test_pipeline!(A, "concurrent_a");
812 test_pipeline!(B, "concurrent_b");
813 test_pipeline!(C, "sequential_c");
814 test_pipeline!(D, "sequential_d");
815
816 let mut conn = store.connect().await.unwrap();
817 conn.set_committer_watermark(
818 B::NAME,
819 CommitterWatermark {
820 checkpoint_hi_inclusive: 10,
821 ..Default::default()
822 },
823 )
824 .await
825 .unwrap();
826 conn.set_committer_watermark(
827 C::NAME,
828 CommitterWatermark {
829 checkpoint_hi_inclusive: 1,
830 ..Default::default()
831 },
832 )
833 .await
834 .unwrap();
835
836 let indexer_args = IndexerArgs::default();
837 let temp_dir = tempfile::tempdir().unwrap();
838 let client_args = ClientArgs {
839 ingestion: IngestionClientArgs {
840 local_ingestion_path: Some(temp_dir.path().to_owned()),
841 ..Default::default()
842 },
843 ..Default::default()
844 };
845 let ingestion_config = IngestionConfig::default();
846
847 let mut indexer = Indexer::new(
848 store,
849 indexer_args,
850 client_args,
851 ingestion_config,
852 None,
853 ®istry,
854 )
855 .await
856 .unwrap();
857
858 indexer
859 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
860 .await
861 .unwrap();
862 indexer
863 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
864 .await
865 .unwrap();
866 indexer
867 .sequential_pipeline::<C>(C, SequentialConfig::default())
868 .await
869 .unwrap();
870 indexer
871 .sequential_pipeline::<D>(D, SequentialConfig::default())
872 .await
873 .unwrap();
874
875 assert_eq!(indexer.first_ingestion_checkpoint, 0);
876 }
877
878 #[tokio::test]
880 async fn test_first_ingestion_checkpoint_smallest_is_0() {
881 let registry = Registry::new();
882 let store = MockStore::default();
883
884 test_pipeline!(A, "concurrent_a");
885 test_pipeline!(B, "concurrent_b");
886 test_pipeline!(C, "sequential_c");
887 test_pipeline!(D, "sequential_d");
888
889 let mut conn = store.connect().await.unwrap();
890 conn.set_committer_watermark(
891 A::NAME,
892 CommitterWatermark {
893 checkpoint_hi_inclusive: 100,
894 ..Default::default()
895 },
896 )
897 .await
898 .unwrap();
899 conn.set_committer_watermark(
900 B::NAME,
901 CommitterWatermark {
902 checkpoint_hi_inclusive: 10,
903 ..Default::default()
904 },
905 )
906 .await
907 .unwrap();
908 conn.set_committer_watermark(
909 C::NAME,
910 CommitterWatermark {
911 checkpoint_hi_inclusive: 1,
912 ..Default::default()
913 },
914 )
915 .await
916 .unwrap();
917 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
918 .await
919 .unwrap();
920
921 let indexer_args = IndexerArgs::default();
922 let temp_dir = tempfile::tempdir().unwrap();
923 let client_args = ClientArgs {
924 ingestion: IngestionClientArgs {
925 local_ingestion_path: Some(temp_dir.path().to_owned()),
926 ..Default::default()
927 },
928 ..Default::default()
929 };
930 let ingestion_config = IngestionConfig::default();
931
932 let mut indexer = Indexer::new(
933 store,
934 indexer_args,
935 client_args,
936 ingestion_config,
937 None,
938 ®istry,
939 )
940 .await
941 .unwrap();
942
943 indexer
944 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
945 .await
946 .unwrap();
947 indexer
948 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
949 .await
950 .unwrap();
951 indexer
952 .sequential_pipeline::<C>(C, SequentialConfig::default())
953 .await
954 .unwrap();
955 indexer
956 .sequential_pipeline::<D>(D, SequentialConfig::default())
957 .await
958 .unwrap();
959
960 assert_eq!(indexer.first_ingestion_checkpoint, 1);
961 }
962
963 #[tokio::test]
966 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
967 let registry = Registry::new();
968 let store = MockStore::default();
969
970 test_pipeline!(A, "concurrent_a");
971 test_pipeline!(B, "concurrent_b");
972 test_pipeline!(C, "sequential_c");
973 test_pipeline!(D, "sequential_d");
974
975 let mut conn = store.connect().await.unwrap();
976 conn.set_committer_watermark(
977 B::NAME,
978 CommitterWatermark {
979 checkpoint_hi_inclusive: 50,
980 ..Default::default()
981 },
982 )
983 .await
984 .unwrap();
985 conn.set_committer_watermark(
986 C::NAME,
987 CommitterWatermark {
988 checkpoint_hi_inclusive: 10,
989 ..Default::default()
990 },
991 )
992 .await
993 .unwrap();
994
995 let indexer_args = IndexerArgs {
996 first_checkpoint: Some(5),
997 ..Default::default()
998 };
999 let temp_dir = tempfile::tempdir().unwrap();
1000 let client_args = ClientArgs {
1001 ingestion: IngestionClientArgs {
1002 local_ingestion_path: Some(temp_dir.path().to_owned()),
1003 ..Default::default()
1004 },
1005 ..Default::default()
1006 };
1007 let ingestion_config = IngestionConfig::default();
1008
1009 let mut indexer = Indexer::new(
1010 store,
1011 indexer_args,
1012 client_args,
1013 ingestion_config,
1014 None,
1015 ®istry,
1016 )
1017 .await
1018 .unwrap();
1019
1020 indexer
1021 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1022 .await
1023 .unwrap();
1024 indexer
1025 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1026 .await
1027 .unwrap();
1028 indexer
1029 .sequential_pipeline::<C>(C, SequentialConfig::default())
1030 .await
1031 .unwrap();
1032 indexer
1033 .sequential_pipeline::<D>(D, SequentialConfig::default())
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(indexer.first_ingestion_checkpoint, 5);
1038 }
1039
1040 #[tokio::test]
1043 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1044 let registry = Registry::new();
1045 let store = MockStore::default();
1046
1047 test_pipeline!(B, "concurrent_b");
1048 test_pipeline!(C, "sequential_c");
1049
1050 let mut conn = store.connect().await.unwrap();
1051 conn.set_committer_watermark(
1052 B::NAME,
1053 CommitterWatermark {
1054 checkpoint_hi_inclusive: 50,
1055 ..Default::default()
1056 },
1057 )
1058 .await
1059 .unwrap();
1060 conn.set_committer_watermark(
1061 C::NAME,
1062 CommitterWatermark {
1063 checkpoint_hi_inclusive: 10,
1064 ..Default::default()
1065 },
1066 )
1067 .await
1068 .unwrap();
1069
1070 let indexer_args = IndexerArgs {
1071 first_checkpoint: Some(5),
1072 ..Default::default()
1073 };
1074 let temp_dir = tempfile::tempdir().unwrap();
1075 let client_args = ClientArgs {
1076 ingestion: IngestionClientArgs {
1077 local_ingestion_path: Some(temp_dir.path().to_owned()),
1078 ..Default::default()
1079 },
1080 ..Default::default()
1081 };
1082 let ingestion_config = IngestionConfig::default();
1083
1084 let mut indexer = Indexer::new(
1085 store,
1086 indexer_args,
1087 client_args,
1088 ingestion_config,
1089 None,
1090 ®istry,
1091 )
1092 .await
1093 .unwrap();
1094
1095 indexer
1096 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1097 .await
1098 .unwrap();
1099 indexer
1100 .sequential_pipeline::<C>(C, SequentialConfig::default())
1101 .await
1102 .unwrap();
1103
1104 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1105 }
1106
1107 #[tokio::test]
1111 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1112 let registry = Registry::new();
1113 let store = MockStore::default();
1114
1115 test_pipeline!(A, "concurrent_a");
1116 test_pipeline!(B, "concurrent_b");
1117 test_pipeline!(C, "sequential_c");
1118
1119 let mut conn = store.connect().await.unwrap();
1120 conn.set_committer_watermark(
1121 B::NAME,
1122 CommitterWatermark {
1123 checkpoint_hi_inclusive: 50,
1124 ..Default::default()
1125 },
1126 )
1127 .await
1128 .unwrap();
1129 conn.set_committer_watermark(
1130 C::NAME,
1131 CommitterWatermark {
1132 checkpoint_hi_inclusive: 10,
1133 ..Default::default()
1134 },
1135 )
1136 .await
1137 .unwrap();
1138
1139 let indexer_args = IndexerArgs {
1140 first_checkpoint: Some(24),
1141 ..Default::default()
1142 };
1143 let temp_dir = tempfile::tempdir().unwrap();
1144 let client_args = ClientArgs {
1145 ingestion: IngestionClientArgs {
1146 local_ingestion_path: Some(temp_dir.path().to_owned()),
1147 ..Default::default()
1148 },
1149 ..Default::default()
1150 };
1151 let ingestion_config = IngestionConfig::default();
1152
1153 let mut indexer = Indexer::new(
1154 store,
1155 indexer_args,
1156 client_args,
1157 ingestion_config,
1158 None,
1159 ®istry,
1160 )
1161 .await
1162 .unwrap();
1163
1164 indexer
1165 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1166 .await
1167 .unwrap();
1168
1169 indexer
1170 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1171 .await
1172 .unwrap();
1173
1174 indexer
1175 .sequential_pipeline::<C>(C, SequentialConfig::default())
1176 .await
1177 .unwrap();
1178
1179 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1180 }
1181
1182 #[tokio::test]
1184 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1185 let registry = Registry::new();
1186 let store = MockStore::default();
1187
1188 test_pipeline!(A, "concurrent_a");
1189 test_pipeline!(B, "concurrent_b");
1190 test_pipeline!(C, "sequential_c");
1191 test_pipeline!(D, "sequential_d");
1192
1193 let mut conn = store.connect().await.unwrap();
1194 conn.set_committer_watermark(
1195 A::NAME,
1196 CommitterWatermark {
1197 checkpoint_hi_inclusive: 5,
1198 ..Default::default()
1199 },
1200 )
1201 .await
1202 .unwrap();
1203 conn.set_committer_watermark(
1204 B::NAME,
1205 CommitterWatermark {
1206 checkpoint_hi_inclusive: 10,
1207 ..Default::default()
1208 },
1209 )
1210 .await
1211 .unwrap();
1212 conn.set_committer_watermark(
1213 C::NAME,
1214 CommitterWatermark {
1215 checkpoint_hi_inclusive: 15,
1216 ..Default::default()
1217 },
1218 )
1219 .await
1220 .unwrap();
1221 conn.set_committer_watermark(
1222 D::NAME,
1223 CommitterWatermark {
1224 checkpoint_hi_inclusive: 20,
1225 ..Default::default()
1226 },
1227 )
1228 .await
1229 .unwrap();
1230
1231 let temp_dir = tempfile::tempdir().unwrap();
1233 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1234 ingestion_dir: temp_dir.path().to_owned(),
1235 starting_checkpoint: 5,
1236 num_checkpoints: 25,
1237 checkpoint_size: 1,
1238 })
1239 .await;
1240
1241 let indexer_args = IndexerArgs {
1242 last_checkpoint: Some(29),
1243 ..Default::default()
1244 };
1245
1246 let client_args = ClientArgs {
1247 ingestion: IngestionClientArgs {
1248 local_ingestion_path: Some(temp_dir.path().to_owned()),
1249 ..Default::default()
1250 },
1251 ..Default::default()
1252 };
1253
1254 let ingestion_config = IngestionConfig::default();
1255
1256 let mut indexer = Indexer::new(
1257 store.clone(),
1258 indexer_args,
1259 client_args,
1260 ingestion_config,
1261 None,
1262 ®istry,
1263 )
1264 .await
1265 .unwrap();
1266
1267 indexer
1268 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1269 .await
1270 .unwrap();
1271 indexer
1272 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1273 .await
1274 .unwrap();
1275 indexer
1276 .sequential_pipeline::<C>(C, SequentialConfig::default())
1277 .await
1278 .unwrap();
1279 indexer
1280 .sequential_pipeline::<D>(D, SequentialConfig::default())
1281 .await
1282 .unwrap();
1283
1284 let ingestion_metrics = indexer.ingestion_metrics().clone();
1285 let indexer_metrics = indexer.indexer_metrics().clone();
1286
1287 indexer.run().await.unwrap().join().await.unwrap();
1288
1289 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1290 assert_eq!(
1291 indexer_metrics
1292 .total_watermarks_out_of_order
1293 .get_metric_with_label_values(&[A::NAME])
1294 .unwrap()
1295 .get(),
1296 0
1297 );
1298 assert_eq!(
1299 indexer_metrics
1300 .total_watermarks_out_of_order
1301 .get_metric_with_label_values(&[B::NAME])
1302 .unwrap()
1303 .get(),
1304 5
1305 );
1306 assert_eq!(
1307 indexer_metrics
1308 .total_watermarks_out_of_order
1309 .get_metric_with_label_values(&[C::NAME])
1310 .unwrap()
1311 .get(),
1312 10
1313 );
1314 assert_eq!(
1315 indexer_metrics
1316 .total_watermarks_out_of_order
1317 .get_metric_with_label_values(&[D::NAME])
1318 .unwrap()
1319 .get(),
1320 15
1321 );
1322 }
1323
1324 #[tokio::test]
1326 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1327 let registry = Registry::new();
1328 let store = MockStore::default();
1329
1330 test_pipeline!(A, "concurrent_a");
1331 test_pipeline!(B, "concurrent_b");
1332 test_pipeline!(C, "sequential_c");
1333 test_pipeline!(D, "sequential_d");
1334
1335 let mut conn = store.connect().await.unwrap();
1336 conn.set_committer_watermark(
1337 A::NAME,
1338 CommitterWatermark {
1339 checkpoint_hi_inclusive: 5,
1340 ..Default::default()
1341 },
1342 )
1343 .await
1344 .unwrap();
1345 conn.set_committer_watermark(
1346 B::NAME,
1347 CommitterWatermark {
1348 checkpoint_hi_inclusive: 10,
1349 ..Default::default()
1350 },
1351 )
1352 .await
1353 .unwrap();
1354 conn.set_committer_watermark(
1355 C::NAME,
1356 CommitterWatermark {
1357 checkpoint_hi_inclusive: 15,
1358 ..Default::default()
1359 },
1360 )
1361 .await
1362 .unwrap();
1363 conn.set_committer_watermark(
1364 D::NAME,
1365 CommitterWatermark {
1366 checkpoint_hi_inclusive: 20,
1367 ..Default::default()
1368 },
1369 )
1370 .await
1371 .unwrap();
1372
1373 let temp_dir = tempfile::tempdir().unwrap();
1375 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1376 ingestion_dir: temp_dir.path().to_owned(),
1377 starting_checkpoint: 5,
1378 num_checkpoints: 25,
1379 checkpoint_size: 1,
1380 })
1381 .await;
1382
1383 let indexer_args = IndexerArgs {
1384 first_checkpoint: Some(3),
1385 last_checkpoint: Some(29),
1386 ..Default::default()
1387 };
1388
1389 let client_args = ClientArgs {
1390 ingestion: IngestionClientArgs {
1391 local_ingestion_path: Some(temp_dir.path().to_owned()),
1392 ..Default::default()
1393 },
1394 ..Default::default()
1395 };
1396
1397 let ingestion_config = IngestionConfig::default();
1398
1399 let mut indexer = Indexer::new(
1400 store.clone(),
1401 indexer_args,
1402 client_args,
1403 ingestion_config,
1404 None,
1405 ®istry,
1406 )
1407 .await
1408 .unwrap();
1409
1410 indexer
1411 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1412 .await
1413 .unwrap();
1414 indexer
1415 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1416 .await
1417 .unwrap();
1418 indexer
1419 .sequential_pipeline::<C>(C, SequentialConfig::default())
1420 .await
1421 .unwrap();
1422 indexer
1423 .sequential_pipeline::<D>(D, SequentialConfig::default())
1424 .await
1425 .unwrap();
1426
1427 let ingestion_metrics = indexer.ingestion_metrics().clone();
1428 let metrics = indexer.indexer_metrics().clone();
1429 indexer.run().await.unwrap().join().await.unwrap();
1430
1431 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1432 assert_eq!(
1433 metrics
1434 .total_watermarks_out_of_order
1435 .get_metric_with_label_values(&[A::NAME])
1436 .unwrap()
1437 .get(),
1438 0
1439 );
1440 assert_eq!(
1441 metrics
1442 .total_watermarks_out_of_order
1443 .get_metric_with_label_values(&[B::NAME])
1444 .unwrap()
1445 .get(),
1446 5
1447 );
1448 assert_eq!(
1449 metrics
1450 .total_watermarks_out_of_order
1451 .get_metric_with_label_values(&[C::NAME])
1452 .unwrap()
1453 .get(),
1454 10
1455 );
1456 assert_eq!(
1457 metrics
1458 .total_watermarks_out_of_order
1459 .get_metric_with_label_values(&[D::NAME])
1460 .unwrap()
1461 .get(),
1462 15
1463 );
1464 }
1465
1466 #[tokio::test]
1468 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1469 let registry = Registry::new();
1470 let store = MockStore::default();
1471
1472 test_pipeline!(A, "concurrent_a");
1473 test_pipeline!(B, "concurrent_b");
1474 test_pipeline!(C, "sequential_c");
1475 test_pipeline!(D, "sequential_d");
1476
1477 let mut conn = store.connect().await.unwrap();
1478 conn.set_committer_watermark(
1479 B::NAME,
1480 CommitterWatermark {
1481 checkpoint_hi_inclusive: 10,
1482 ..Default::default()
1483 },
1484 )
1485 .await
1486 .unwrap();
1487 conn.set_committer_watermark(
1488 C::NAME,
1489 CommitterWatermark {
1490 checkpoint_hi_inclusive: 15,
1491 ..Default::default()
1492 },
1493 )
1494 .await
1495 .unwrap();
1496 conn.set_committer_watermark(
1497 D::NAME,
1498 CommitterWatermark {
1499 checkpoint_hi_inclusive: 20,
1500 ..Default::default()
1501 },
1502 )
1503 .await
1504 .unwrap();
1505
1506 let temp_dir = tempfile::tempdir().unwrap();
1508 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1509 ingestion_dir: temp_dir.path().to_owned(),
1510 starting_checkpoint: 0,
1511 num_checkpoints: 30,
1512 checkpoint_size: 1,
1513 })
1514 .await;
1515
1516 let indexer_args = IndexerArgs {
1517 last_checkpoint: Some(29),
1518 ..Default::default()
1519 };
1520
1521 let client_args = ClientArgs {
1522 ingestion: IngestionClientArgs {
1523 local_ingestion_path: Some(temp_dir.path().to_owned()),
1524 ..Default::default()
1525 },
1526 ..Default::default()
1527 };
1528
1529 let ingestion_config = IngestionConfig::default();
1530
1531 let mut indexer = Indexer::new(
1532 store.clone(),
1533 indexer_args,
1534 client_args,
1535 ingestion_config,
1536 None,
1537 ®istry,
1538 )
1539 .await
1540 .unwrap();
1541
1542 indexer
1543 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1544 .await
1545 .unwrap();
1546 indexer
1547 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1548 .await
1549 .unwrap();
1550 indexer
1551 .sequential_pipeline::<C>(C, SequentialConfig::default())
1552 .await
1553 .unwrap();
1554 indexer
1555 .sequential_pipeline::<D>(D, SequentialConfig::default())
1556 .await
1557 .unwrap();
1558
1559 let ingestion_metrics = indexer.ingestion_metrics().clone();
1560 let metrics = indexer.indexer_metrics().clone();
1561 indexer.run().await.unwrap().join().await.unwrap();
1562
1563 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1564 assert_eq!(
1565 metrics
1566 .total_watermarks_out_of_order
1567 .get_metric_with_label_values(&[A::NAME])
1568 .unwrap()
1569 .get(),
1570 0
1571 );
1572 assert_eq!(
1573 metrics
1574 .total_watermarks_out_of_order
1575 .get_metric_with_label_values(&[B::NAME])
1576 .unwrap()
1577 .get(),
1578 11
1579 );
1580 assert_eq!(
1581 metrics
1582 .total_watermarks_out_of_order
1583 .get_metric_with_label_values(&[C::NAME])
1584 .unwrap()
1585 .get(),
1586 16
1587 );
1588 assert_eq!(
1589 metrics
1590 .total_watermarks_out_of_order
1591 .get_metric_with_label_values(&[D::NAME])
1592 .unwrap()
1593 .get(),
1594 21
1595 );
1596 }
1597
1598 #[tokio::test]
1600 async fn test_indexer_ingestion_use_first_checkpoint() {
1601 let registry = Registry::new();
1602 let store = MockStore::default();
1603
1604 test_pipeline!(A, "concurrent_a");
1605 test_pipeline!(B, "concurrent_b");
1606 test_pipeline!(C, "sequential_c");
1607 test_pipeline!(D, "sequential_d");
1608
1609 let mut conn = store.connect().await.unwrap();
1610 conn.set_committer_watermark(
1611 B::NAME,
1612 CommitterWatermark {
1613 checkpoint_hi_inclusive: 10,
1614 ..Default::default()
1615 },
1616 )
1617 .await
1618 .unwrap();
1619 conn.set_committer_watermark(
1620 C::NAME,
1621 CommitterWatermark {
1622 checkpoint_hi_inclusive: 15,
1623 ..Default::default()
1624 },
1625 )
1626 .await
1627 .unwrap();
1628 conn.set_committer_watermark(
1629 D::NAME,
1630 CommitterWatermark {
1631 checkpoint_hi_inclusive: 20,
1632 ..Default::default()
1633 },
1634 )
1635 .await
1636 .unwrap();
1637
1638 let temp_dir = tempfile::tempdir().unwrap();
1640 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1641 ingestion_dir: temp_dir.path().to_owned(),
1642 starting_checkpoint: 5,
1643 num_checkpoints: 25,
1644 checkpoint_size: 1,
1645 })
1646 .await;
1647
1648 let indexer_args = IndexerArgs {
1649 first_checkpoint: Some(10),
1650 last_checkpoint: Some(29),
1651 ..Default::default()
1652 };
1653
1654 let client_args = ClientArgs {
1655 ingestion: IngestionClientArgs {
1656 local_ingestion_path: Some(temp_dir.path().to_owned()),
1657 ..Default::default()
1658 },
1659 ..Default::default()
1660 };
1661
1662 let ingestion_config = IngestionConfig::default();
1663
1664 let mut indexer = Indexer::new(
1665 store.clone(),
1666 indexer_args,
1667 client_args,
1668 ingestion_config,
1669 None,
1670 ®istry,
1671 )
1672 .await
1673 .unwrap();
1674
1675 indexer
1676 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1677 .await
1678 .unwrap();
1679 indexer
1680 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1681 .await
1682 .unwrap();
1683 indexer
1684 .sequential_pipeline::<C>(C, SequentialConfig::default())
1685 .await
1686 .unwrap();
1687 indexer
1688 .sequential_pipeline::<D>(D, SequentialConfig::default())
1689 .await
1690 .unwrap();
1691
1692 let ingestion_metrics = indexer.ingestion_metrics().clone();
1693 let metrics = indexer.indexer_metrics().clone();
1694 indexer.run().await.unwrap().join().await.unwrap();
1695
1696 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1697 assert_eq!(
1698 metrics
1699 .total_watermarks_out_of_order
1700 .get_metric_with_label_values(&[A::NAME])
1701 .unwrap()
1702 .get(),
1703 0
1704 );
1705 assert_eq!(
1706 metrics
1707 .total_watermarks_out_of_order
1708 .get_metric_with_label_values(&[B::NAME])
1709 .unwrap()
1710 .get(),
1711 1
1712 );
1713 assert_eq!(
1714 metrics
1715 .total_watermarks_out_of_order
1716 .get_metric_with_label_values(&[C::NAME])
1717 .unwrap()
1718 .get(),
1719 6
1720 );
1721 assert_eq!(
1722 metrics
1723 .total_watermarks_out_of_order
1724 .get_metric_with_label_values(&[D::NAME])
1725 .unwrap()
1726 .get(),
1727 11
1728 );
1729 }
1730
1731 #[tokio::test]
1732 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1733 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1734 assert_eq!(committer_watermark, None);
1735 assert_eq!(pruner_watermark, None);
1736 }
1737
1738 #[tokio::test]
1739 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1740 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1741 assert_eq!(committer_watermark, None);
1742 assert_eq!(pruner_watermark, None);
1743 }
1744
1745 #[tokio::test]
1746 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1747 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1748
1749 let committer_watermark = committer_watermark.unwrap();
1750 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1751
1752 let pruner_watermark = pruner_watermark.unwrap();
1753 assert_eq!(pruner_watermark.reader_lo, 1);
1754 assert_eq!(pruner_watermark.pruner_hi, 1);
1755 }
1756
1757 #[tokio::test]
1758 async fn test_init_watermark_sequential() {
1759 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1760
1761 let committer_watermark = committer_watermark.unwrap();
1762 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1763
1764 let pruner_watermark = pruner_watermark.unwrap();
1765 assert_eq!(pruner_watermark.reader_lo, 1);
1766 assert_eq!(pruner_watermark.pruner_hi, 1);
1767 }
1768
1769 #[tokio::test]
1770 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1771 let registry = Registry::new();
1772 let store = MockStore::default();
1773
1774 let mut conn = store.connect().await.unwrap();
1776
1777 conn.set_committer_watermark(
1779 MockHandler::NAME,
1780 CommitterWatermark {
1781 epoch_hi_inclusive: 0,
1782 checkpoint_hi_inclusive: 10,
1783 tx_hi: 20,
1784 timestamp_ms_hi_inclusive: 10000,
1785 },
1786 )
1787 .await
1788 .unwrap();
1789
1790 conn.set_committer_watermark(
1792 SequentialHandler::NAME,
1793 CommitterWatermark {
1794 epoch_hi_inclusive: 0,
1795 checkpoint_hi_inclusive: 5,
1796 tx_hi: 10,
1797 timestamp_ms_hi_inclusive: 5000,
1798 },
1799 )
1800 .await
1801 .unwrap();
1802
1803 let temp_dir = tempfile::tempdir().unwrap();
1805 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1806 ingestion_dir: temp_dir.path().to_owned(),
1807 starting_checkpoint: 0,
1808 num_checkpoints: 20,
1809 checkpoint_size: 2,
1810 })
1811 .await;
1812
1813 let indexer_args = IndexerArgs {
1814 first_checkpoint: None,
1815 last_checkpoint: Some(19),
1816 pipeline: vec![],
1817 ..Default::default()
1818 };
1819
1820 let client_args = ClientArgs {
1821 ingestion: IngestionClientArgs {
1822 local_ingestion_path: Some(temp_dir.path().to_owned()),
1823 ..Default::default()
1824 },
1825 ..Default::default()
1826 };
1827
1828 let ingestion_config = IngestionConfig::default();
1829
1830 let mut indexer = Indexer::new(
1831 store.clone(),
1832 indexer_args,
1833 client_args,
1834 ingestion_config,
1835 None,
1836 ®istry,
1837 )
1838 .await
1839 .unwrap();
1840
1841 indexer
1843 .sequential_pipeline(
1844 MockHandler,
1845 pipeline::sequential::SequentialConfig::default(),
1846 )
1847 .await
1848 .unwrap();
1849
1850 assert_eq!(
1852 indexer.next_sequential_checkpoint(),
1853 Some(11),
1854 "next_sequential_checkpoint should be 11"
1855 );
1856
1857 indexer
1859 .sequential_pipeline(
1860 SequentialHandler,
1861 pipeline::sequential::SequentialConfig::default(),
1862 )
1863 .await
1864 .unwrap();
1865
1866 assert_eq!(
1868 indexer.next_sequential_checkpoint(),
1869 Some(6),
1870 "next_sequential_checkpoint should still be 6"
1871 );
1872
1873 indexer.run().await.unwrap().join().await.unwrap();
1875
1876 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1878 let watermark2 = conn
1879 .committer_watermark(SequentialHandler::NAME)
1880 .await
1881 .unwrap();
1882
1883 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1884 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1885 }
1886
1887 #[tokio::test]
1891 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1892 let registry = Registry::new();
1893 let store = MockStore::default();
1894
1895 let mut conn = store.connect().await.unwrap();
1898 conn.set_committer_watermark(
1899 MockCheckpointSequenceNumberHandler::NAME,
1900 CommitterWatermark {
1901 checkpoint_hi_inclusive: 10,
1902 ..Default::default()
1903 },
1904 )
1905 .await
1906 .unwrap();
1907 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1908 .await
1909 .unwrap();
1910
1911 let indexer_args = IndexerArgs {
1914 first_checkpoint: Some(0),
1915 last_checkpoint: Some(15),
1916 task: TaskArgs::tasked("task".to_string(), 10),
1917 ..Default::default()
1918 };
1919 let temp_dir = tempfile::tempdir().unwrap();
1920 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1921 ingestion_dir: temp_dir.path().to_owned(),
1922 starting_checkpoint: 0,
1923 num_checkpoints: 16,
1924 checkpoint_size: 2,
1925 })
1926 .await;
1927
1928 let client_args = ClientArgs {
1929 ingestion: IngestionClientArgs {
1930 local_ingestion_path: Some(temp_dir.path().to_owned()),
1931 ..Default::default()
1932 },
1933 ..Default::default()
1934 };
1935
1936 let ingestion_config = IngestionConfig::default();
1937
1938 let mut tasked_indexer = Indexer::new(
1939 store.clone(),
1940 indexer_args,
1941 client_args,
1942 ingestion_config,
1943 None,
1944 ®istry,
1945 )
1946 .await
1947 .unwrap();
1948
1949 let _ = tasked_indexer
1950 .concurrent_pipeline(
1951 MockCheckpointSequenceNumberHandler,
1952 ConcurrentConfig::default(),
1953 )
1954 .await;
1955
1956 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1957 let metrics = tasked_indexer.indexer_metrics().clone();
1958
1959 tasked_indexer.run().await.unwrap().join().await.unwrap();
1960
1961 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1962 assert_eq!(
1963 metrics
1964 .total_collector_skipped_checkpoints
1965 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1966 .unwrap()
1967 .get(),
1968 7
1969 );
1970 let data = store
1971 .data
1972 .get(MockCheckpointSequenceNumberHandler::NAME)
1973 .unwrap();
1974 assert_eq!(data.len(), 9);
1975 for i in 0..7 {
1976 assert!(data.get(&i).is_none());
1977 }
1978 for i in 7..16 {
1979 assert!(data.get(&i).is_some());
1980 }
1981 }
1982
1983 #[tokio::test]
1985 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1986 let registry = Registry::new();
1987 let store = MockStore::default();
1988
1989 let mut conn = store.connect().await.unwrap();
1990 conn.set_committer_watermark(
1991 "test",
1992 CommitterWatermark {
1993 checkpoint_hi_inclusive: 10,
1994 ..Default::default()
1995 },
1996 )
1997 .await
1998 .unwrap();
1999 conn.set_reader_watermark("test", 5).await.unwrap();
2000
2001 let indexer_args = IndexerArgs {
2004 first_checkpoint: Some(9),
2005 last_checkpoint: Some(25),
2006 task: TaskArgs::tasked("task".to_string(), 10),
2007 ..Default::default()
2008 };
2009 let temp_dir = tempfile::tempdir().unwrap();
2010 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2011 ingestion_dir: temp_dir.path().to_owned(),
2012 starting_checkpoint: 9,
2013 num_checkpoints: 17,
2014 checkpoint_size: 2,
2015 })
2016 .await;
2017
2018 let client_args = ClientArgs {
2019 ingestion: IngestionClientArgs {
2020 local_ingestion_path: Some(temp_dir.path().to_owned()),
2021 ..Default::default()
2022 },
2023 ..Default::default()
2024 };
2025
2026 let ingestion_config = IngestionConfig::default();
2027
2028 let mut tasked_indexer = Indexer::new(
2029 store.clone(),
2030 indexer_args,
2031 client_args,
2032 ingestion_config,
2033 None,
2034 ®istry,
2035 )
2036 .await
2037 .unwrap();
2038
2039 let _ = tasked_indexer
2040 .concurrent_pipeline(
2041 MockCheckpointSequenceNumberHandler,
2042 ConcurrentConfig::default(),
2043 )
2044 .await;
2045
2046 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2047 let metrics = tasked_indexer.indexer_metrics().clone();
2048
2049 tasked_indexer.run().await.unwrap().join().await.unwrap();
2050
2051 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2052 assert_eq!(
2053 metrics
2054 .total_watermarks_out_of_order
2055 .get_metric_with_label_values(&["test"])
2056 .unwrap()
2057 .get(),
2058 0
2059 );
2060 assert_eq!(
2061 metrics
2062 .total_collector_skipped_checkpoints
2063 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2064 .unwrap()
2065 .get(),
2066 0
2067 );
2068
2069 let data = store.data.get("test").unwrap();
2070 assert!(data.len() == 17);
2071 for i in 0..9 {
2072 assert!(data.get(&i).is_none());
2073 }
2074 for i in 9..26 {
2075 assert!(data.get(&i).is_some());
2076 }
2077 let main_pipeline_watermark = store.watermark("test").unwrap();
2078 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2080 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2081 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2082 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2083 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2084 }
2085
2086 #[tokio::test]
2089 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2090 let registry = Registry::new();
2091 let store = MockStore::default();
2092 let mut conn = store.connect().await.unwrap();
2093 conn.set_committer_watermark(
2095 ControllableHandler::NAME,
2096 CommitterWatermark {
2097 checkpoint_hi_inclusive: 11,
2098 ..Default::default()
2099 },
2100 )
2101 .await
2102 .unwrap();
2103
2104 let temp_dir = tempfile::tempdir().unwrap();
2106 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2107 ingestion_dir: temp_dir.path().to_owned(),
2108 starting_checkpoint: 0,
2109 num_checkpoints: 501,
2110 checkpoint_size: 2,
2111 })
2112 .await;
2113 let indexer_args = IndexerArgs {
2114 first_checkpoint: Some(0),
2115 last_checkpoint: Some(500),
2116 task: TaskArgs::tasked("task".to_string(), 10 ),
2117 ..Default::default()
2118 };
2119 let client_args = ClientArgs {
2120 ingestion: IngestionClientArgs {
2121 local_ingestion_path: Some(temp_dir.path().to_owned()),
2122 ..Default::default()
2123 },
2124 ..Default::default()
2125 };
2126 let ingestion_config = IngestionConfig::default();
2127 let mut tasked_indexer = Indexer::new(
2128 store.clone(),
2129 indexer_args,
2130 client_args,
2131 ingestion_config,
2132 None,
2133 ®istry,
2134 )
2135 .await
2136 .unwrap();
2137 let mut allow_process = 10;
2138 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2140 let _ = tasked_indexer
2141 .concurrent_pipeline(
2142 controllable_handler,
2143 ConcurrentConfig {
2144 committer: CommitterConfig {
2145 collect_interval_ms: 10,
2146 watermark_interval_ms: 10,
2147 ..Default::default()
2148 },
2149 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2152 ..Default::default()
2153 },
2154 )
2155 .await;
2156 let metrics = tasked_indexer.indexer_metrics().clone();
2157
2158 let mut s_indexer = tasked_indexer.run().await.unwrap();
2159
2160 store
2164 .wait_for_watermark(
2165 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2166 10,
2167 std::time::Duration::from_secs(10),
2168 )
2169 .await;
2170
2171 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2176 .await
2177 .unwrap();
2178
2179 let reader_lo = metrics
2180 .collector_reader_lo
2181 .with_label_values(&[ControllableHandler::NAME]);
2182
2183 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2187 while reader_lo.get() != 250 {
2188 interval.tick().await;
2189 allow_process += 1;
2191 assert!(
2192 allow_process <= 500,
2193 "Released all checkpoints but collector never observed new reader_lo"
2194 );
2195 process_below.send(allow_process).ok();
2196 }
2197
2198 process_below.send(500).ok();
2205
2206 s_indexer.join().await.unwrap();
2207
2208 let data = store.data.get(ControllableHandler::NAME).unwrap();
2209
2210 for chkpt in (allow_process + 1)..250 {
2212 assert!(
2213 data.get(&chkpt).is_none(),
2214 "Checkpoint {chkpt} should have been skipped"
2215 );
2216 }
2217
2218 for chkpt in 250..=500 {
2220 assert!(
2221 data.get(&chkpt).is_some(),
2222 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2223 );
2224 }
2225
2226 for chkpt in 0..=10 {
2228 assert!(
2229 data.get(&chkpt).is_some(),
2230 "Checkpoint {chkpt} should have been committed (baseline)"
2231 );
2232 }
2233 }
2234}