1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54 #[arg(long)]
61 pub first_checkpoint: Option<u64>,
62
63 #[arg(long)]
66 pub last_checkpoint: Option<u64>,
67
68 #[arg(long, action = clap::ArgAction::Append)]
71 pub pipeline: Vec<String>,
72
73 #[clap(flatten)]
75 pub task: TaskArgs,
76}
77
78#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81 #[arg(long, requires = "reader_interval_ms")]
93 task: Option<String>,
94
95 #[arg(long, requires = "task")]
106 reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110 store: S,
114
115 metrics: Arc<IndexerMetrics>,
117
118 ingestion_service: IngestionService,
120
121 default_next_checkpoint: u64,
128
129 last_checkpoint: Option<u64>,
132
133 task: Option<Task>,
145
146 enabled_pipelines: Option<BTreeSet<String>>,
150
151 added_pipelines: BTreeSet<&'static str>,
154
155 first_ingestion_checkpoint: u64,
159
160 next_sequential_checkpoint: Option<u64>,
163
164 pipelines: Vec<Service>,
166}
167
168#[derive(Clone)]
170pub(crate) struct Task {
171 task: String,
174 reader_interval: Duration,
177}
178
179impl TaskArgs {
180 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
181 Self {
182 task: Some(task),
183 reader_interval_ms: Some(reader_interval_ms),
184 }
185 }
186
187 fn into_task(self) -> Option<Task> {
188 Some(Task {
189 task: self.task?,
190 reader_interval: Duration::from_millis(self.reader_interval_ms?),
191 })
192 }
193}
194
195impl<S: Store> Indexer<S> {
196 pub async fn new(
208 store: S,
209 indexer_args: IndexerArgs,
210 client_args: ClientArgs,
211 ingestion_config: IngestionConfig,
212 metrics_prefix: Option<&str>,
213 registry: &Registry,
214 ) -> Result<Self> {
215 let IndexerArgs {
216 first_checkpoint,
217 last_checkpoint,
218 pipeline,
219 task,
220 } = indexer_args;
221
222 let metrics = IndexerMetrics::new(metrics_prefix, registry);
223
224 let ingestion_service =
225 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
226
227 Ok(Self {
228 store,
229 metrics,
230 ingestion_service,
231 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
232 last_checkpoint,
233 task: task.into_task(),
234 enabled_pipelines: if pipeline.is_empty() {
235 None
236 } else {
237 Some(pipeline.into_iter().collect())
238 },
239 added_pipelines: BTreeSet::new(),
240 first_ingestion_checkpoint: u64::MAX,
241 next_sequential_checkpoint: None,
242 pipelines: vec![],
243 })
244 }
245
246 pub fn store(&self) -> &S {
248 &self.store
249 }
250
251 pub fn ingestion_client(&self) -> &IngestionClient {
253 self.ingestion_service.ingestion_client()
254 }
255
256 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
258 &self.metrics
259 }
260
261 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
263 self.ingestion_service.metrics()
264 }
265
266 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
268 self.added_pipelines.iter().copied().filter(|p| {
269 self.enabled_pipelines
270 .as_ref()
271 .is_none_or(|e| e.contains(*p))
272 })
273 }
274
275 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
279 self.next_sequential_checkpoint
280 }
281
282 pub async fn concurrent_pipeline<H>(
289 &mut self,
290 handler: H,
291 config: ConcurrentConfig,
292 ) -> Result<()>
293 where
294 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
295 {
296 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
297 return Ok(());
298 };
299
300 self.pipelines.push(concurrent::pipeline::<H>(
301 handler,
302 next_checkpoint,
303 config,
304 self.store.clone(),
305 self.task.clone(),
306 self.ingestion_service.subscribe().0,
307 self.metrics.clone(),
308 ));
309
310 Ok(())
311 }
312
313 pub async fn run(self) -> Result<Service> {
319 if let Some(enabled_pipelines) = self.enabled_pipelines {
320 ensure!(
321 enabled_pipelines.is_empty(),
322 "Tried to enable pipelines that this indexer does not know about: \
323 {enabled_pipelines:#?}",
324 );
325 }
326
327 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
328
329 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
330
331 let mut service = self
332 .ingestion_service
333 .run(
334 self.first_ingestion_checkpoint..=last_checkpoint,
335 self.next_sequential_checkpoint,
336 )
337 .await
338 .context("Failed to start ingestion service")?;
339
340 for pipeline in self.pipelines {
341 service = service.merge(pipeline);
342 }
343
344 Ok(service)
345 }
346
347 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
356 ensure!(
357 self.added_pipelines.insert(P::NAME),
358 "Pipeline {:?} already added",
359 P::NAME,
360 );
361
362 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
363 && !enabled_pipelines.remove(P::NAME)
364 {
365 info!(pipeline = P::NAME, "Skipping");
366 return Ok(None);
367 }
368
369 let mut conn = self
370 .store
371 .connect()
372 .await
373 .context("Failed to establish connection to store")?;
374
375 let pipeline_task =
376 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
377
378 let checkpoint_hi_inclusive = conn
379 .init_watermark(&pipeline_task, self.default_next_checkpoint)
380 .await
381 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
382
383 let next_checkpoint =
384 checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
385
386 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
387
388 Ok(Some(next_checkpoint))
389 }
390}
391
392impl<T: TransactionalStore> Indexer<T> {
393 pub async fn sequential_pipeline<H>(
404 &mut self,
405 handler: H,
406 config: SequentialConfig,
407 ) -> Result<()>
408 where
409 H: Handler<Store = T> + Send + Sync + 'static,
410 {
411 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
412 return Ok(());
413 };
414
415 if self.task.is_some() {
416 bail!(
417 "Sequential pipelines do not support pipeline tasks. \
418 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
419 Running the same pipeline under a different task would violate these guarantees."
420 );
421 }
422
423 self.next_sequential_checkpoint = Some(
425 self.next_sequential_checkpoint
426 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
427 );
428
429 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
430
431 self.pipelines.push(sequential::pipeline::<H>(
432 handler,
433 next_checkpoint,
434 config,
435 self.store.clone(),
436 checkpoint_rx,
437 commit_hi_tx,
438 self.metrics.clone(),
439 ));
440
441 Ok(())
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use std::sync::Arc;
448
449 use async_trait::async_trait;
450 use clap::Parser;
451 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
452 use sui_synthetic_ingestion::synthetic_ingestion;
453 use tokio::sync::watch;
454
455 use crate::FieldCount;
456 use crate::config::ConcurrencyConfig;
457 use crate::ingestion::ingestion_client::IngestionClientArgs;
458 use crate::mocks::store::MockStore;
459 use crate::pipeline::CommitterConfig;
460 use crate::pipeline::Processor;
461 use crate::pipeline::concurrent::ConcurrentConfig;
462 use crate::store::CommitterWatermark;
463
464 use super::*;
465
466 #[allow(dead_code)]
467 #[derive(Clone, FieldCount)]
468 struct MockValue(u64);
469
470 struct ControllableHandler {
472 process_below: watch::Receiver<u64>,
474 }
475
476 impl ControllableHandler {
477 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
478 let (tx, rx) = watch::channel(limit);
479 (Self { process_below: rx }, tx)
480 }
481 }
482
483 #[async_trait]
484 impl Processor for ControllableHandler {
485 const NAME: &'static str = "controllable";
486 type Value = MockValue;
487
488 async fn process(
489 &self,
490 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
491 ) -> anyhow::Result<Vec<Self::Value>> {
492 let cp_num = checkpoint.summary.sequence_number;
493
494 self.process_below
496 .clone()
497 .wait_for(|&limit| cp_num <= limit)
498 .await
499 .ok();
500
501 Ok(vec![MockValue(cp_num)])
502 }
503 }
504
505 #[async_trait]
506 impl crate::pipeline::concurrent::Handler for ControllableHandler {
507 type Store = MockStore;
508 type Batch = Vec<MockValue>;
509
510 fn batch(
511 &self,
512 batch: &mut Self::Batch,
513 values: &mut std::vec::IntoIter<Self::Value>,
514 ) -> crate::pipeline::concurrent::BatchStatus {
515 batch.extend(values);
516 crate::pipeline::concurrent::BatchStatus::Ready
517 }
518
519 async fn commit<'a>(
520 &self,
521 batch: &Self::Batch,
522 conn: &mut <Self::Store as Store>::Connection<'a>,
523 ) -> anyhow::Result<usize> {
524 for value in batch {
525 conn.0
526 .commit_data(Self::NAME, value.0, vec![value.0])
527 .await?;
528 }
529 Ok(batch.len())
530 }
531 }
532
533 macro_rules! test_pipeline {
534 ($handler:ident, $name:literal) => {
535 struct $handler;
536
537 #[async_trait]
538 impl Processor for $handler {
539 const NAME: &'static str = $name;
540 type Value = MockValue;
541 async fn process(
542 &self,
543 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
544 ) -> anyhow::Result<Vec<Self::Value>> {
545 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
546 }
547 }
548
549 #[async_trait]
550 impl crate::pipeline::concurrent::Handler for $handler {
551 type Store = MockStore;
552 type Batch = Vec<Self::Value>;
553
554 fn batch(
555 &self,
556 batch: &mut Self::Batch,
557 values: &mut std::vec::IntoIter<Self::Value>,
558 ) -> crate::pipeline::concurrent::BatchStatus {
559 batch.extend(values);
560 crate::pipeline::concurrent::BatchStatus::Pending
561 }
562
563 async fn commit<'a>(
564 &self,
565 batch: &Self::Batch,
566 conn: &mut <Self::Store as Store>::Connection<'a>,
567 ) -> anyhow::Result<usize> {
568 for value in batch {
569 conn.0
570 .commit_data(Self::NAME, value.0, vec![value.0])
571 .await?;
572 }
573 Ok(batch.len())
574 }
575 }
576
577 #[async_trait]
578 impl crate::pipeline::sequential::Handler for $handler {
579 type Store = MockStore;
580 type Batch = Vec<Self::Value>;
581
582 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
583 batch.extend(values);
584 }
585
586 async fn commit<'a>(
587 &self,
588 _batch: &Self::Batch,
589 _conn: &mut <Self::Store as Store>::Connection<'a>,
590 ) -> anyhow::Result<usize> {
591 Ok(1)
592 }
593 }
594 };
595 }
596
597 test_pipeline!(MockHandler, "test_processor");
598 test_pipeline!(SequentialHandler, "sequential_handler");
599 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
600
601 async fn test_init_watermark(
602 first_checkpoint: Option<u64>,
603 is_concurrent: bool,
604 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
605 let registry = Registry::new();
606 let store = MockStore::default();
607
608 test_pipeline!(A, "pipeline_name");
609
610 let mut conn = store.connect().await.unwrap();
611
612 let indexer_args = IndexerArgs {
613 first_checkpoint,
614 ..IndexerArgs::default()
615 };
616 let temp_dir = tempfile::tempdir().unwrap();
617 let client_args = ClientArgs {
618 ingestion: IngestionClientArgs {
619 local_ingestion_path: Some(temp_dir.path().to_owned()),
620 ..Default::default()
621 },
622 ..Default::default()
623 };
624 let ingestion_config = IngestionConfig::default();
625
626 let mut indexer = Indexer::new(
627 store.clone(),
628 indexer_args,
629 client_args,
630 ingestion_config,
631 None,
632 ®istry,
633 )
634 .await
635 .unwrap();
636
637 if is_concurrent {
638 indexer
639 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
640 .await
641 .unwrap();
642 } else {
643 indexer
644 .sequential_pipeline::<A>(A, SequentialConfig::default())
645 .await
646 .unwrap();
647 }
648
649 (
650 conn.committer_watermark(A::NAME).await.unwrap(),
651 conn.pruner_watermark(A::NAME, Duration::ZERO)
652 .await
653 .unwrap(),
654 )
655 }
656
657 #[test]
658 fn test_arg_parsing() {
659 #[derive(Parser)]
660 struct Args {
661 #[clap(flatten)]
662 indexer: IndexerArgs,
663 }
664
665 let args = Args::try_parse_from([
666 "cmd",
667 "--first-checkpoint",
668 "10",
669 "--last-checkpoint",
670 "100",
671 "--pipeline",
672 "a",
673 "--pipeline",
674 "b",
675 "--task",
676 "t",
677 "--reader-interval-ms",
678 "5000",
679 ])
680 .unwrap();
681
682 assert_eq!(args.indexer.first_checkpoint, Some(10));
683 assert_eq!(args.indexer.last_checkpoint, Some(100));
684 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
685 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
686 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
687 }
688
689 #[tokio::test]
691 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
692 let registry = Registry::new();
693 let store = MockStore::default();
694
695 test_pipeline!(A, "concurrent_a");
696 test_pipeline!(B, "concurrent_b");
697 test_pipeline!(C, "sequential_c");
698 test_pipeline!(D, "sequential_d");
699
700 let mut conn = store.connect().await.unwrap();
701 conn.set_committer_watermark(
702 A::NAME,
703 CommitterWatermark {
704 checkpoint_hi_inclusive: 100,
705 ..Default::default()
706 },
707 )
708 .await
709 .unwrap();
710 conn.set_committer_watermark(
711 B::NAME,
712 CommitterWatermark {
713 checkpoint_hi_inclusive: 10,
714 ..Default::default()
715 },
716 )
717 .await
718 .unwrap();
719 conn.set_committer_watermark(
720 C::NAME,
721 CommitterWatermark {
722 checkpoint_hi_inclusive: 1,
723 ..Default::default()
724 },
725 )
726 .await
727 .unwrap();
728 conn.set_committer_watermark(
729 D::NAME,
730 CommitterWatermark {
731 checkpoint_hi_inclusive: 50,
732 ..Default::default()
733 },
734 )
735 .await
736 .unwrap();
737
738 let indexer_args = IndexerArgs::default();
739 let temp_dir = tempfile::tempdir().unwrap();
740 let client_args = ClientArgs {
741 ingestion: IngestionClientArgs {
742 local_ingestion_path: Some(temp_dir.path().to_owned()),
743 ..Default::default()
744 },
745 ..Default::default()
746 };
747 let ingestion_config = IngestionConfig::default();
748
749 let mut indexer = Indexer::new(
750 store,
751 indexer_args,
752 client_args,
753 ingestion_config,
754 None,
755 ®istry,
756 )
757 .await
758 .unwrap();
759
760 indexer
761 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
762 .await
763 .unwrap();
764 indexer
765 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
766 .await
767 .unwrap();
768 indexer
769 .sequential_pipeline::<C>(C, SequentialConfig::default())
770 .await
771 .unwrap();
772 indexer
773 .sequential_pipeline::<D>(D, SequentialConfig::default())
774 .await
775 .unwrap();
776
777 assert_eq!(indexer.first_ingestion_checkpoint, 2);
778 }
779
780 #[tokio::test]
782 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
783 let registry = Registry::new();
784 let store = MockStore::default();
785
786 test_pipeline!(A, "concurrent_a");
787 test_pipeline!(B, "concurrent_b");
788 test_pipeline!(C, "sequential_c");
789 test_pipeline!(D, "sequential_d");
790
791 let mut conn = store.connect().await.unwrap();
792 conn.set_committer_watermark(
793 B::NAME,
794 CommitterWatermark {
795 checkpoint_hi_inclusive: 10,
796 ..Default::default()
797 },
798 )
799 .await
800 .unwrap();
801 conn.set_committer_watermark(
802 C::NAME,
803 CommitterWatermark {
804 checkpoint_hi_inclusive: 1,
805 ..Default::default()
806 },
807 )
808 .await
809 .unwrap();
810
811 let indexer_args = IndexerArgs::default();
812 let temp_dir = tempfile::tempdir().unwrap();
813 let client_args = ClientArgs {
814 ingestion: IngestionClientArgs {
815 local_ingestion_path: Some(temp_dir.path().to_owned()),
816 ..Default::default()
817 },
818 ..Default::default()
819 };
820 let ingestion_config = IngestionConfig::default();
821
822 let mut indexer = Indexer::new(
823 store,
824 indexer_args,
825 client_args,
826 ingestion_config,
827 None,
828 ®istry,
829 )
830 .await
831 .unwrap();
832
833 indexer
834 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
835 .await
836 .unwrap();
837 indexer
838 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
839 .await
840 .unwrap();
841 indexer
842 .sequential_pipeline::<C>(C, SequentialConfig::default())
843 .await
844 .unwrap();
845 indexer
846 .sequential_pipeline::<D>(D, SequentialConfig::default())
847 .await
848 .unwrap();
849
850 assert_eq!(indexer.first_ingestion_checkpoint, 0);
851 }
852
853 #[tokio::test]
855 async fn test_first_ingestion_checkpoint_smallest_is_0() {
856 let registry = Registry::new();
857 let store = MockStore::default();
858
859 test_pipeline!(A, "concurrent_a");
860 test_pipeline!(B, "concurrent_b");
861 test_pipeline!(C, "sequential_c");
862 test_pipeline!(D, "sequential_d");
863
864 let mut conn = store.connect().await.unwrap();
865 conn.set_committer_watermark(
866 A::NAME,
867 CommitterWatermark {
868 checkpoint_hi_inclusive: 100,
869 ..Default::default()
870 },
871 )
872 .await
873 .unwrap();
874 conn.set_committer_watermark(
875 B::NAME,
876 CommitterWatermark {
877 checkpoint_hi_inclusive: 10,
878 ..Default::default()
879 },
880 )
881 .await
882 .unwrap();
883 conn.set_committer_watermark(
884 C::NAME,
885 CommitterWatermark {
886 checkpoint_hi_inclusive: 1,
887 ..Default::default()
888 },
889 )
890 .await
891 .unwrap();
892 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
893 .await
894 .unwrap();
895
896 let indexer_args = IndexerArgs::default();
897 let temp_dir = tempfile::tempdir().unwrap();
898 let client_args = ClientArgs {
899 ingestion: IngestionClientArgs {
900 local_ingestion_path: Some(temp_dir.path().to_owned()),
901 ..Default::default()
902 },
903 ..Default::default()
904 };
905 let ingestion_config = IngestionConfig::default();
906
907 let mut indexer = Indexer::new(
908 store,
909 indexer_args,
910 client_args,
911 ingestion_config,
912 None,
913 ®istry,
914 )
915 .await
916 .unwrap();
917
918 indexer
919 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
920 .await
921 .unwrap();
922 indexer
923 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
924 .await
925 .unwrap();
926 indexer
927 .sequential_pipeline::<C>(C, SequentialConfig::default())
928 .await
929 .unwrap();
930 indexer
931 .sequential_pipeline::<D>(D, SequentialConfig::default())
932 .await
933 .unwrap();
934
935 assert_eq!(indexer.first_ingestion_checkpoint, 1);
936 }
937
938 #[tokio::test]
941 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
942 let registry = Registry::new();
943 let store = MockStore::default();
944
945 test_pipeline!(A, "concurrent_a");
946 test_pipeline!(B, "concurrent_b");
947 test_pipeline!(C, "sequential_c");
948 test_pipeline!(D, "sequential_d");
949
950 let mut conn = store.connect().await.unwrap();
951 conn.set_committer_watermark(
952 B::NAME,
953 CommitterWatermark {
954 checkpoint_hi_inclusive: 50,
955 ..Default::default()
956 },
957 )
958 .await
959 .unwrap();
960 conn.set_committer_watermark(
961 C::NAME,
962 CommitterWatermark {
963 checkpoint_hi_inclusive: 10,
964 ..Default::default()
965 },
966 )
967 .await
968 .unwrap();
969
970 let indexer_args = IndexerArgs {
971 first_checkpoint: Some(5),
972 ..Default::default()
973 };
974 let temp_dir = tempfile::tempdir().unwrap();
975 let client_args = ClientArgs {
976 ingestion: IngestionClientArgs {
977 local_ingestion_path: Some(temp_dir.path().to_owned()),
978 ..Default::default()
979 },
980 ..Default::default()
981 };
982 let ingestion_config = IngestionConfig::default();
983
984 let mut indexer = Indexer::new(
985 store,
986 indexer_args,
987 client_args,
988 ingestion_config,
989 None,
990 ®istry,
991 )
992 .await
993 .unwrap();
994
995 indexer
996 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
997 .await
998 .unwrap();
999 indexer
1000 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1001 .await
1002 .unwrap();
1003 indexer
1004 .sequential_pipeline::<C>(C, SequentialConfig::default())
1005 .await
1006 .unwrap();
1007 indexer
1008 .sequential_pipeline::<D>(D, SequentialConfig::default())
1009 .await
1010 .unwrap();
1011
1012 assert_eq!(indexer.first_ingestion_checkpoint, 5);
1013 }
1014
1015 #[tokio::test]
1018 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1019 let registry = Registry::new();
1020 let store = MockStore::default();
1021
1022 test_pipeline!(B, "concurrent_b");
1023 test_pipeline!(C, "sequential_c");
1024
1025 let mut conn = store.connect().await.unwrap();
1026 conn.set_committer_watermark(
1027 B::NAME,
1028 CommitterWatermark {
1029 checkpoint_hi_inclusive: 50,
1030 ..Default::default()
1031 },
1032 )
1033 .await
1034 .unwrap();
1035 conn.set_committer_watermark(
1036 C::NAME,
1037 CommitterWatermark {
1038 checkpoint_hi_inclusive: 10,
1039 ..Default::default()
1040 },
1041 )
1042 .await
1043 .unwrap();
1044
1045 let indexer_args = IndexerArgs {
1046 first_checkpoint: Some(5),
1047 ..Default::default()
1048 };
1049 let temp_dir = tempfile::tempdir().unwrap();
1050 let client_args = ClientArgs {
1051 ingestion: IngestionClientArgs {
1052 local_ingestion_path: Some(temp_dir.path().to_owned()),
1053 ..Default::default()
1054 },
1055 ..Default::default()
1056 };
1057 let ingestion_config = IngestionConfig::default();
1058
1059 let mut indexer = Indexer::new(
1060 store,
1061 indexer_args,
1062 client_args,
1063 ingestion_config,
1064 None,
1065 ®istry,
1066 )
1067 .await
1068 .unwrap();
1069
1070 indexer
1071 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1072 .await
1073 .unwrap();
1074 indexer
1075 .sequential_pipeline::<C>(C, SequentialConfig::default())
1076 .await
1077 .unwrap();
1078
1079 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1080 }
1081
1082 #[tokio::test]
1086 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1087 let registry = Registry::new();
1088 let store = MockStore::default();
1089
1090 test_pipeline!(A, "concurrent_a");
1091 test_pipeline!(B, "concurrent_b");
1092 test_pipeline!(C, "sequential_c");
1093
1094 let mut conn = store.connect().await.unwrap();
1095 conn.set_committer_watermark(
1096 B::NAME,
1097 CommitterWatermark {
1098 checkpoint_hi_inclusive: 50,
1099 ..Default::default()
1100 },
1101 )
1102 .await
1103 .unwrap();
1104 conn.set_committer_watermark(
1105 C::NAME,
1106 CommitterWatermark {
1107 checkpoint_hi_inclusive: 10,
1108 ..Default::default()
1109 },
1110 )
1111 .await
1112 .unwrap();
1113
1114 let indexer_args = IndexerArgs {
1115 first_checkpoint: Some(24),
1116 ..Default::default()
1117 };
1118 let temp_dir = tempfile::tempdir().unwrap();
1119 let client_args = ClientArgs {
1120 ingestion: IngestionClientArgs {
1121 local_ingestion_path: Some(temp_dir.path().to_owned()),
1122 ..Default::default()
1123 },
1124 ..Default::default()
1125 };
1126 let ingestion_config = IngestionConfig::default();
1127
1128 let mut indexer = Indexer::new(
1129 store,
1130 indexer_args,
1131 client_args,
1132 ingestion_config,
1133 None,
1134 ®istry,
1135 )
1136 .await
1137 .unwrap();
1138
1139 indexer
1140 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1141 .await
1142 .unwrap();
1143
1144 indexer
1145 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1146 .await
1147 .unwrap();
1148
1149 indexer
1150 .sequential_pipeline::<C>(C, SequentialConfig::default())
1151 .await
1152 .unwrap();
1153
1154 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1155 }
1156
1157 #[tokio::test]
1159 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1160 let registry = Registry::new();
1161 let store = MockStore::default();
1162
1163 test_pipeline!(A, "concurrent_a");
1164 test_pipeline!(B, "concurrent_b");
1165 test_pipeline!(C, "sequential_c");
1166 test_pipeline!(D, "sequential_d");
1167
1168 let mut conn = store.connect().await.unwrap();
1169 conn.set_committer_watermark(
1170 A::NAME,
1171 CommitterWatermark {
1172 checkpoint_hi_inclusive: 5,
1173 ..Default::default()
1174 },
1175 )
1176 .await
1177 .unwrap();
1178 conn.set_committer_watermark(
1179 B::NAME,
1180 CommitterWatermark {
1181 checkpoint_hi_inclusive: 10,
1182 ..Default::default()
1183 },
1184 )
1185 .await
1186 .unwrap();
1187 conn.set_committer_watermark(
1188 C::NAME,
1189 CommitterWatermark {
1190 checkpoint_hi_inclusive: 15,
1191 ..Default::default()
1192 },
1193 )
1194 .await
1195 .unwrap();
1196 conn.set_committer_watermark(
1197 D::NAME,
1198 CommitterWatermark {
1199 checkpoint_hi_inclusive: 20,
1200 ..Default::default()
1201 },
1202 )
1203 .await
1204 .unwrap();
1205
1206 let temp_dir = tempfile::tempdir().unwrap();
1208 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1209 ingestion_dir: temp_dir.path().to_owned(),
1210 starting_checkpoint: 5,
1211 num_checkpoints: 25,
1212 checkpoint_size: 1,
1213 })
1214 .await;
1215
1216 let indexer_args = IndexerArgs {
1217 last_checkpoint: Some(29),
1218 ..Default::default()
1219 };
1220
1221 let client_args = ClientArgs {
1222 ingestion: IngestionClientArgs {
1223 local_ingestion_path: Some(temp_dir.path().to_owned()),
1224 ..Default::default()
1225 },
1226 ..Default::default()
1227 };
1228
1229 let ingestion_config = IngestionConfig::default();
1230
1231 let mut indexer = Indexer::new(
1232 store.clone(),
1233 indexer_args,
1234 client_args,
1235 ingestion_config,
1236 None,
1237 ®istry,
1238 )
1239 .await
1240 .unwrap();
1241
1242 indexer
1243 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1244 .await
1245 .unwrap();
1246 indexer
1247 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1248 .await
1249 .unwrap();
1250 indexer
1251 .sequential_pipeline::<C>(C, SequentialConfig::default())
1252 .await
1253 .unwrap();
1254 indexer
1255 .sequential_pipeline::<D>(D, SequentialConfig::default())
1256 .await
1257 .unwrap();
1258
1259 let ingestion_metrics = indexer.ingestion_metrics().clone();
1260 let indexer_metrics = indexer.indexer_metrics().clone();
1261
1262 indexer.run().await.unwrap().join().await.unwrap();
1263
1264 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1265 assert_eq!(
1266 indexer_metrics
1267 .total_watermarks_out_of_order
1268 .get_metric_with_label_values(&[A::NAME])
1269 .unwrap()
1270 .get(),
1271 0
1272 );
1273 assert_eq!(
1274 indexer_metrics
1275 .total_watermarks_out_of_order
1276 .get_metric_with_label_values(&[B::NAME])
1277 .unwrap()
1278 .get(),
1279 5
1280 );
1281 assert_eq!(
1282 indexer_metrics
1283 .total_watermarks_out_of_order
1284 .get_metric_with_label_values(&[C::NAME])
1285 .unwrap()
1286 .get(),
1287 10
1288 );
1289 assert_eq!(
1290 indexer_metrics
1291 .total_watermarks_out_of_order
1292 .get_metric_with_label_values(&[D::NAME])
1293 .unwrap()
1294 .get(),
1295 15
1296 );
1297 }
1298
1299 #[tokio::test]
1301 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1302 let registry = Registry::new();
1303 let store = MockStore::default();
1304
1305 test_pipeline!(A, "concurrent_a");
1306 test_pipeline!(B, "concurrent_b");
1307 test_pipeline!(C, "sequential_c");
1308 test_pipeline!(D, "sequential_d");
1309
1310 let mut conn = store.connect().await.unwrap();
1311 conn.set_committer_watermark(
1312 A::NAME,
1313 CommitterWatermark {
1314 checkpoint_hi_inclusive: 5,
1315 ..Default::default()
1316 },
1317 )
1318 .await
1319 .unwrap();
1320 conn.set_committer_watermark(
1321 B::NAME,
1322 CommitterWatermark {
1323 checkpoint_hi_inclusive: 10,
1324 ..Default::default()
1325 },
1326 )
1327 .await
1328 .unwrap();
1329 conn.set_committer_watermark(
1330 C::NAME,
1331 CommitterWatermark {
1332 checkpoint_hi_inclusive: 15,
1333 ..Default::default()
1334 },
1335 )
1336 .await
1337 .unwrap();
1338 conn.set_committer_watermark(
1339 D::NAME,
1340 CommitterWatermark {
1341 checkpoint_hi_inclusive: 20,
1342 ..Default::default()
1343 },
1344 )
1345 .await
1346 .unwrap();
1347
1348 let temp_dir = tempfile::tempdir().unwrap();
1350 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1351 ingestion_dir: temp_dir.path().to_owned(),
1352 starting_checkpoint: 5,
1353 num_checkpoints: 25,
1354 checkpoint_size: 1,
1355 })
1356 .await;
1357
1358 let indexer_args = IndexerArgs {
1359 first_checkpoint: Some(3),
1360 last_checkpoint: Some(29),
1361 ..Default::default()
1362 };
1363
1364 let client_args = ClientArgs {
1365 ingestion: IngestionClientArgs {
1366 local_ingestion_path: Some(temp_dir.path().to_owned()),
1367 ..Default::default()
1368 },
1369 ..Default::default()
1370 };
1371
1372 let ingestion_config = IngestionConfig::default();
1373
1374 let mut indexer = Indexer::new(
1375 store.clone(),
1376 indexer_args,
1377 client_args,
1378 ingestion_config,
1379 None,
1380 ®istry,
1381 )
1382 .await
1383 .unwrap();
1384
1385 indexer
1386 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1387 .await
1388 .unwrap();
1389 indexer
1390 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1391 .await
1392 .unwrap();
1393 indexer
1394 .sequential_pipeline::<C>(C, SequentialConfig::default())
1395 .await
1396 .unwrap();
1397 indexer
1398 .sequential_pipeline::<D>(D, SequentialConfig::default())
1399 .await
1400 .unwrap();
1401
1402 let ingestion_metrics = indexer.ingestion_metrics().clone();
1403 let metrics = indexer.indexer_metrics().clone();
1404 indexer.run().await.unwrap().join().await.unwrap();
1405
1406 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1407 assert_eq!(
1408 metrics
1409 .total_watermarks_out_of_order
1410 .get_metric_with_label_values(&[A::NAME])
1411 .unwrap()
1412 .get(),
1413 0
1414 );
1415 assert_eq!(
1416 metrics
1417 .total_watermarks_out_of_order
1418 .get_metric_with_label_values(&[B::NAME])
1419 .unwrap()
1420 .get(),
1421 5
1422 );
1423 assert_eq!(
1424 metrics
1425 .total_watermarks_out_of_order
1426 .get_metric_with_label_values(&[C::NAME])
1427 .unwrap()
1428 .get(),
1429 10
1430 );
1431 assert_eq!(
1432 metrics
1433 .total_watermarks_out_of_order
1434 .get_metric_with_label_values(&[D::NAME])
1435 .unwrap()
1436 .get(),
1437 15
1438 );
1439 }
1440
1441 #[tokio::test]
1443 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1444 let registry = Registry::new();
1445 let store = MockStore::default();
1446
1447 test_pipeline!(A, "concurrent_a");
1448 test_pipeline!(B, "concurrent_b");
1449 test_pipeline!(C, "sequential_c");
1450 test_pipeline!(D, "sequential_d");
1451
1452 let mut conn = store.connect().await.unwrap();
1453 conn.set_committer_watermark(
1454 B::NAME,
1455 CommitterWatermark {
1456 checkpoint_hi_inclusive: 10,
1457 ..Default::default()
1458 },
1459 )
1460 .await
1461 .unwrap();
1462 conn.set_committer_watermark(
1463 C::NAME,
1464 CommitterWatermark {
1465 checkpoint_hi_inclusive: 15,
1466 ..Default::default()
1467 },
1468 )
1469 .await
1470 .unwrap();
1471 conn.set_committer_watermark(
1472 D::NAME,
1473 CommitterWatermark {
1474 checkpoint_hi_inclusive: 20,
1475 ..Default::default()
1476 },
1477 )
1478 .await
1479 .unwrap();
1480
1481 let temp_dir = tempfile::tempdir().unwrap();
1483 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1484 ingestion_dir: temp_dir.path().to_owned(),
1485 starting_checkpoint: 0,
1486 num_checkpoints: 30,
1487 checkpoint_size: 1,
1488 })
1489 .await;
1490
1491 let indexer_args = IndexerArgs {
1492 last_checkpoint: Some(29),
1493 ..Default::default()
1494 };
1495
1496 let client_args = ClientArgs {
1497 ingestion: IngestionClientArgs {
1498 local_ingestion_path: Some(temp_dir.path().to_owned()),
1499 ..Default::default()
1500 },
1501 ..Default::default()
1502 };
1503
1504 let ingestion_config = IngestionConfig::default();
1505
1506 let mut indexer = Indexer::new(
1507 store.clone(),
1508 indexer_args,
1509 client_args,
1510 ingestion_config,
1511 None,
1512 ®istry,
1513 )
1514 .await
1515 .unwrap();
1516
1517 indexer
1518 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1519 .await
1520 .unwrap();
1521 indexer
1522 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1523 .await
1524 .unwrap();
1525 indexer
1526 .sequential_pipeline::<C>(C, SequentialConfig::default())
1527 .await
1528 .unwrap();
1529 indexer
1530 .sequential_pipeline::<D>(D, SequentialConfig::default())
1531 .await
1532 .unwrap();
1533
1534 let ingestion_metrics = indexer.ingestion_metrics().clone();
1535 let metrics = indexer.indexer_metrics().clone();
1536 indexer.run().await.unwrap().join().await.unwrap();
1537
1538 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1539 assert_eq!(
1540 metrics
1541 .total_watermarks_out_of_order
1542 .get_metric_with_label_values(&[A::NAME])
1543 .unwrap()
1544 .get(),
1545 0
1546 );
1547 assert_eq!(
1548 metrics
1549 .total_watermarks_out_of_order
1550 .get_metric_with_label_values(&[B::NAME])
1551 .unwrap()
1552 .get(),
1553 11
1554 );
1555 assert_eq!(
1556 metrics
1557 .total_watermarks_out_of_order
1558 .get_metric_with_label_values(&[C::NAME])
1559 .unwrap()
1560 .get(),
1561 16
1562 );
1563 assert_eq!(
1564 metrics
1565 .total_watermarks_out_of_order
1566 .get_metric_with_label_values(&[D::NAME])
1567 .unwrap()
1568 .get(),
1569 21
1570 );
1571 }
1572
1573 #[tokio::test]
1575 async fn test_indexer_ingestion_use_first_checkpoint() {
1576 let registry = Registry::new();
1577 let store = MockStore::default();
1578
1579 test_pipeline!(A, "concurrent_a");
1580 test_pipeline!(B, "concurrent_b");
1581 test_pipeline!(C, "sequential_c");
1582 test_pipeline!(D, "sequential_d");
1583
1584 let mut conn = store.connect().await.unwrap();
1585 conn.set_committer_watermark(
1586 B::NAME,
1587 CommitterWatermark {
1588 checkpoint_hi_inclusive: 10,
1589 ..Default::default()
1590 },
1591 )
1592 .await
1593 .unwrap();
1594 conn.set_committer_watermark(
1595 C::NAME,
1596 CommitterWatermark {
1597 checkpoint_hi_inclusive: 15,
1598 ..Default::default()
1599 },
1600 )
1601 .await
1602 .unwrap();
1603 conn.set_committer_watermark(
1604 D::NAME,
1605 CommitterWatermark {
1606 checkpoint_hi_inclusive: 20,
1607 ..Default::default()
1608 },
1609 )
1610 .await
1611 .unwrap();
1612
1613 let temp_dir = tempfile::tempdir().unwrap();
1615 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1616 ingestion_dir: temp_dir.path().to_owned(),
1617 starting_checkpoint: 5,
1618 num_checkpoints: 25,
1619 checkpoint_size: 1,
1620 })
1621 .await;
1622
1623 let indexer_args = IndexerArgs {
1624 first_checkpoint: Some(10),
1625 last_checkpoint: Some(29),
1626 ..Default::default()
1627 };
1628
1629 let client_args = ClientArgs {
1630 ingestion: IngestionClientArgs {
1631 local_ingestion_path: Some(temp_dir.path().to_owned()),
1632 ..Default::default()
1633 },
1634 ..Default::default()
1635 };
1636
1637 let ingestion_config = IngestionConfig::default();
1638
1639 let mut indexer = Indexer::new(
1640 store.clone(),
1641 indexer_args,
1642 client_args,
1643 ingestion_config,
1644 None,
1645 ®istry,
1646 )
1647 .await
1648 .unwrap();
1649
1650 indexer
1651 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1652 .await
1653 .unwrap();
1654 indexer
1655 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1656 .await
1657 .unwrap();
1658 indexer
1659 .sequential_pipeline::<C>(C, SequentialConfig::default())
1660 .await
1661 .unwrap();
1662 indexer
1663 .sequential_pipeline::<D>(D, SequentialConfig::default())
1664 .await
1665 .unwrap();
1666
1667 let ingestion_metrics = indexer.ingestion_metrics().clone();
1668 let metrics = indexer.indexer_metrics().clone();
1669 indexer.run().await.unwrap().join().await.unwrap();
1670
1671 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1672 assert_eq!(
1673 metrics
1674 .total_watermarks_out_of_order
1675 .get_metric_with_label_values(&[A::NAME])
1676 .unwrap()
1677 .get(),
1678 0
1679 );
1680 assert_eq!(
1681 metrics
1682 .total_watermarks_out_of_order
1683 .get_metric_with_label_values(&[B::NAME])
1684 .unwrap()
1685 .get(),
1686 1
1687 );
1688 assert_eq!(
1689 metrics
1690 .total_watermarks_out_of_order
1691 .get_metric_with_label_values(&[C::NAME])
1692 .unwrap()
1693 .get(),
1694 6
1695 );
1696 assert_eq!(
1697 metrics
1698 .total_watermarks_out_of_order
1699 .get_metric_with_label_values(&[D::NAME])
1700 .unwrap()
1701 .get(),
1702 11
1703 );
1704 }
1705
1706 #[tokio::test]
1707 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1708 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1709 assert_eq!(committer_watermark, None);
1711 assert_eq!(pruner_watermark, None);
1712 }
1713
1714 #[tokio::test]
1715 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1716 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1717 assert_eq!(committer_watermark, None);
1719 assert_eq!(pruner_watermark, None);
1720 }
1721
1722 #[tokio::test]
1723 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1724 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1725
1726 let committer_watermark = committer_watermark.unwrap();
1727 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1728
1729 let pruner_watermark = pruner_watermark.unwrap();
1730 assert_eq!(pruner_watermark.reader_lo, 1);
1731 assert_eq!(pruner_watermark.pruner_hi, 1);
1732 }
1733
1734 #[tokio::test]
1735 async fn test_init_watermark_sequential() {
1736 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1737
1738 let committer_watermark = committer_watermark.unwrap();
1739 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1740
1741 let pruner_watermark = pruner_watermark.unwrap();
1742 assert_eq!(pruner_watermark.reader_lo, 1);
1743 assert_eq!(pruner_watermark.pruner_hi, 1);
1744 }
1745
1746 #[tokio::test]
1747 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1748 let registry = Registry::new();
1749 let store = MockStore::default();
1750
1751 let mut conn = store.connect().await.unwrap();
1753
1754 conn.set_committer_watermark(
1756 MockHandler::NAME,
1757 CommitterWatermark {
1758 epoch_hi_inclusive: 0,
1759 checkpoint_hi_inclusive: 10,
1760 tx_hi: 20,
1761 timestamp_ms_hi_inclusive: 10000,
1762 },
1763 )
1764 .await
1765 .unwrap();
1766
1767 conn.set_committer_watermark(
1769 SequentialHandler::NAME,
1770 CommitterWatermark {
1771 epoch_hi_inclusive: 0,
1772 checkpoint_hi_inclusive: 5,
1773 tx_hi: 10,
1774 timestamp_ms_hi_inclusive: 5000,
1775 },
1776 )
1777 .await
1778 .unwrap();
1779
1780 let temp_dir = tempfile::tempdir().unwrap();
1782 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1783 ingestion_dir: temp_dir.path().to_owned(),
1784 starting_checkpoint: 0,
1785 num_checkpoints: 20,
1786 checkpoint_size: 2,
1787 })
1788 .await;
1789
1790 let indexer_args = IndexerArgs {
1791 first_checkpoint: None,
1792 last_checkpoint: Some(19),
1793 pipeline: vec![],
1794 ..Default::default()
1795 };
1796
1797 let client_args = ClientArgs {
1798 ingestion: IngestionClientArgs {
1799 local_ingestion_path: Some(temp_dir.path().to_owned()),
1800 ..Default::default()
1801 },
1802 ..Default::default()
1803 };
1804
1805 let ingestion_config = IngestionConfig::default();
1806
1807 let mut indexer = Indexer::new(
1808 store.clone(),
1809 indexer_args,
1810 client_args,
1811 ingestion_config,
1812 None,
1813 ®istry,
1814 )
1815 .await
1816 .unwrap();
1817
1818 indexer
1820 .sequential_pipeline(
1821 MockHandler,
1822 pipeline::sequential::SequentialConfig::default(),
1823 )
1824 .await
1825 .unwrap();
1826
1827 assert_eq!(
1829 indexer.next_sequential_checkpoint(),
1830 Some(11),
1831 "next_sequential_checkpoint should be 11"
1832 );
1833
1834 indexer
1836 .sequential_pipeline(
1837 SequentialHandler,
1838 pipeline::sequential::SequentialConfig::default(),
1839 )
1840 .await
1841 .unwrap();
1842
1843 assert_eq!(
1845 indexer.next_sequential_checkpoint(),
1846 Some(6),
1847 "next_sequential_checkpoint should still be 6"
1848 );
1849
1850 indexer.run().await.unwrap().join().await.unwrap();
1852
1853 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1855 let watermark2 = conn
1856 .committer_watermark(SequentialHandler::NAME)
1857 .await
1858 .unwrap();
1859
1860 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1861 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1862 }
1863
1864 #[tokio::test]
1868 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1869 let registry = Registry::new();
1870 let store = MockStore::default();
1871
1872 let mut conn = store.connect().await.unwrap();
1875 conn.set_committer_watermark(
1876 MockCheckpointSequenceNumberHandler::NAME,
1877 CommitterWatermark {
1878 checkpoint_hi_inclusive: 10,
1879 ..Default::default()
1880 },
1881 )
1882 .await
1883 .unwrap();
1884 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1885 .await
1886 .unwrap();
1887
1888 let indexer_args = IndexerArgs {
1891 first_checkpoint: Some(0),
1892 last_checkpoint: Some(15),
1893 task: TaskArgs::tasked("task".to_string(), 10),
1894 ..Default::default()
1895 };
1896 let temp_dir = tempfile::tempdir().unwrap();
1897 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1898 ingestion_dir: temp_dir.path().to_owned(),
1899 starting_checkpoint: 0,
1900 num_checkpoints: 16,
1901 checkpoint_size: 2,
1902 })
1903 .await;
1904
1905 let client_args = ClientArgs {
1906 ingestion: IngestionClientArgs {
1907 local_ingestion_path: Some(temp_dir.path().to_owned()),
1908 ..Default::default()
1909 },
1910 ..Default::default()
1911 };
1912
1913 let ingestion_config = IngestionConfig::default();
1914
1915 let mut tasked_indexer = Indexer::new(
1916 store.clone(),
1917 indexer_args,
1918 client_args,
1919 ingestion_config,
1920 None,
1921 ®istry,
1922 )
1923 .await
1924 .unwrap();
1925
1926 let _ = tasked_indexer
1927 .concurrent_pipeline(
1928 MockCheckpointSequenceNumberHandler,
1929 ConcurrentConfig::default(),
1930 )
1931 .await;
1932
1933 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1934 let metrics = tasked_indexer.indexer_metrics().clone();
1935
1936 tasked_indexer.run().await.unwrap().join().await.unwrap();
1937
1938 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1939 assert_eq!(
1940 metrics
1941 .total_collector_skipped_checkpoints
1942 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1943 .unwrap()
1944 .get(),
1945 7
1946 );
1947 let data = store
1948 .data
1949 .get(MockCheckpointSequenceNumberHandler::NAME)
1950 .unwrap();
1951 assert_eq!(data.len(), 9);
1952 for i in 0..7 {
1953 assert!(data.get(&i).is_none());
1954 }
1955 for i in 7..16 {
1956 assert!(data.get(&i).is_some());
1957 }
1958 }
1959
1960 #[tokio::test]
1962 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1963 let registry = Registry::new();
1964 let store = MockStore::default();
1965
1966 let mut conn = store.connect().await.unwrap();
1967 conn.set_committer_watermark(
1968 "test",
1969 CommitterWatermark {
1970 checkpoint_hi_inclusive: 10,
1971 ..Default::default()
1972 },
1973 )
1974 .await
1975 .unwrap();
1976 conn.set_reader_watermark("test", 5).await.unwrap();
1977
1978 let indexer_args = IndexerArgs {
1981 first_checkpoint: Some(9),
1982 last_checkpoint: Some(25),
1983 task: TaskArgs::tasked("task".to_string(), 10),
1984 ..Default::default()
1985 };
1986 let temp_dir = tempfile::tempdir().unwrap();
1987 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1988 ingestion_dir: temp_dir.path().to_owned(),
1989 starting_checkpoint: 9,
1990 num_checkpoints: 17,
1991 checkpoint_size: 2,
1992 })
1993 .await;
1994
1995 let client_args = ClientArgs {
1996 ingestion: IngestionClientArgs {
1997 local_ingestion_path: Some(temp_dir.path().to_owned()),
1998 ..Default::default()
1999 },
2000 ..Default::default()
2001 };
2002
2003 let ingestion_config = IngestionConfig::default();
2004
2005 let mut tasked_indexer = Indexer::new(
2006 store.clone(),
2007 indexer_args,
2008 client_args,
2009 ingestion_config,
2010 None,
2011 ®istry,
2012 )
2013 .await
2014 .unwrap();
2015
2016 let _ = tasked_indexer
2017 .concurrent_pipeline(
2018 MockCheckpointSequenceNumberHandler,
2019 ConcurrentConfig::default(),
2020 )
2021 .await;
2022
2023 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2024 let metrics = tasked_indexer.indexer_metrics().clone();
2025
2026 tasked_indexer.run().await.unwrap().join().await.unwrap();
2027
2028 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2029 assert_eq!(
2030 metrics
2031 .total_watermarks_out_of_order
2032 .get_metric_with_label_values(&["test"])
2033 .unwrap()
2034 .get(),
2035 0
2036 );
2037 assert_eq!(
2038 metrics
2039 .total_collector_skipped_checkpoints
2040 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2041 .unwrap()
2042 .get(),
2043 0
2044 );
2045
2046 let data = store.data.get("test").unwrap();
2047 assert!(data.len() == 17);
2048 for i in 0..9 {
2049 assert!(data.get(&i).is_none());
2050 }
2051 for i in 9..26 {
2052 assert!(data.get(&i).is_some());
2053 }
2054 let main_pipeline_watermark = store.watermark("test").unwrap();
2055 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2057 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2058 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2059 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2060 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2061 }
2062
2063 #[tokio::test]
2066 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2067 let registry = Registry::new();
2068 let store = MockStore::default();
2069 let mut conn = store.connect().await.unwrap();
2070 conn.set_committer_watermark(
2072 ControllableHandler::NAME,
2073 CommitterWatermark {
2074 checkpoint_hi_inclusive: 11,
2075 ..Default::default()
2076 },
2077 )
2078 .await
2079 .unwrap();
2080
2081 let temp_dir = tempfile::tempdir().unwrap();
2083 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2084 ingestion_dir: temp_dir.path().to_owned(),
2085 starting_checkpoint: 0,
2086 num_checkpoints: 501,
2087 checkpoint_size: 2,
2088 })
2089 .await;
2090 let indexer_args = IndexerArgs {
2091 first_checkpoint: Some(0),
2092 last_checkpoint: Some(500),
2093 task: TaskArgs::tasked("task".to_string(), 10 ),
2094 ..Default::default()
2095 };
2096 let client_args = ClientArgs {
2097 ingestion: IngestionClientArgs {
2098 local_ingestion_path: Some(temp_dir.path().to_owned()),
2099 ..Default::default()
2100 },
2101 ..Default::default()
2102 };
2103 let ingestion_config = IngestionConfig::default();
2104 let mut tasked_indexer = Indexer::new(
2105 store.clone(),
2106 indexer_args,
2107 client_args,
2108 ingestion_config,
2109 None,
2110 ®istry,
2111 )
2112 .await
2113 .unwrap();
2114 let mut allow_process = 10;
2115 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2117 let _ = tasked_indexer
2118 .concurrent_pipeline(
2119 controllable_handler,
2120 ConcurrentConfig {
2121 committer: CommitterConfig {
2122 collect_interval_ms: 10,
2123 watermark_interval_ms: 10,
2124 ..Default::default()
2125 },
2126 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2129 ..Default::default()
2130 },
2131 )
2132 .await;
2133 let metrics = tasked_indexer.indexer_metrics().clone();
2134
2135 let mut s_indexer = tasked_indexer.run().await.unwrap();
2136
2137 store
2141 .wait_for_watermark(
2142 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2143 10,
2144 std::time::Duration::from_secs(10),
2145 )
2146 .await;
2147
2148 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2153 .await
2154 .unwrap();
2155
2156 let reader_lo = metrics
2157 .collector_reader_lo
2158 .with_label_values(&[ControllableHandler::NAME]);
2159
2160 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2164 while reader_lo.get() != 250 {
2165 interval.tick().await;
2166 allow_process += 1;
2168 assert!(
2169 allow_process <= 500,
2170 "Released all checkpoints but collector never observed new reader_lo"
2171 );
2172 process_below.send(allow_process).ok();
2173 }
2174
2175 process_below.send(500).ok();
2182
2183 s_indexer.join().await.unwrap();
2184
2185 let data = store.data.get(ControllableHandler::NAME).unwrap();
2186
2187 for chkpt in (allow_process + 1)..250 {
2189 assert!(
2190 data.get(&chkpt).is_none(),
2191 "Checkpoint {chkpt} should have been skipped"
2192 );
2193 }
2194
2195 for chkpt in 250..=500 {
2197 assert!(
2198 data.get(&chkpt).is_some(),
2199 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2200 );
2201 }
2202
2203 for chkpt in 0..=10 {
2205 assert!(
2206 data.get(&chkpt).is_some(),
2207 "Checkpoint {chkpt} should have been committed (baseline)"
2208 );
2209 }
2210 }
2211}