1use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53 #[arg(long)]
60 pub first_checkpoint: Option<u64>,
61
62 #[arg(long)]
65 pub last_checkpoint: Option<u64>,
66
67 #[arg(long, action = clap::ArgAction::Append)]
70 pub pipeline: Vec<String>,
71
72 #[clap(flatten)]
74 pub task: TaskArgs,
75}
76
77#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80 #[arg(long, requires = "reader_interval_ms")]
92 task: Option<String>,
93
94 #[arg(long, requires = "task")]
97 reader_interval_ms: Option<u64>,
98}
99
100pub struct Indexer<S: Store> {
101 store: S,
105
106 metrics: Arc<IndexerMetrics>,
108
109 ingestion_service: IngestionService,
111
112 default_next_checkpoint: u64,
119
120 last_checkpoint: Option<u64>,
123
124 task: Option<Task>,
136
137 enabled_pipelines: Option<BTreeSet<String>>,
141
142 added_pipelines: BTreeSet<&'static str>,
145
146 first_ingestion_checkpoint: u64,
150
151 next_sequential_checkpoint: Option<u64>,
154
155 pipelines: Vec<Service>,
157}
158
159#[derive(Clone)]
161pub(crate) struct Task {
162 task: String,
165 reader_interval: Duration,
168}
169
170impl TaskArgs {
171 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
172 Self {
173 task: Some(task),
174 reader_interval_ms: Some(reader_interval_ms),
175 }
176 }
177
178 fn into_task(self) -> Option<Task> {
179 Some(Task {
180 task: self.task?,
181 reader_interval: Duration::from_millis(self.reader_interval_ms?),
182 })
183 }
184}
185
186impl<S: Store> Indexer<S> {
187 pub async fn new(
199 store: S,
200 indexer_args: IndexerArgs,
201 client_args: ClientArgs,
202 ingestion_config: IngestionConfig,
203 metrics_prefix: Option<&str>,
204 registry: &Registry,
205 ) -> Result<Self> {
206 let IndexerArgs {
207 first_checkpoint,
208 last_checkpoint,
209 pipeline,
210 task,
211 } = indexer_args;
212
213 let metrics = IndexerMetrics::new(metrics_prefix, registry);
214
215 let ingestion_service =
216 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
217
218 Ok(Self {
219 store,
220 metrics,
221 ingestion_service,
222 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
223 last_checkpoint,
224 task: task.into_task(),
225 enabled_pipelines: if pipeline.is_empty() {
226 None
227 } else {
228 Some(pipeline.into_iter().collect())
229 },
230 added_pipelines: BTreeSet::new(),
231 first_ingestion_checkpoint: u64::MAX,
232 next_sequential_checkpoint: None,
233 pipelines: vec![],
234 })
235 }
236
237 pub fn store(&self) -> &S {
239 &self.store
240 }
241
242 pub fn ingestion_client(&self) -> &IngestionClient {
244 self.ingestion_service.ingestion_client()
245 }
246
247 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
249 &self.metrics
250 }
251
252 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
254 self.ingestion_service.metrics()
255 }
256
257 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
259 self.added_pipelines.iter().copied().filter(|p| {
260 self.enabled_pipelines
261 .as_ref()
262 .is_none_or(|e| e.contains(*p))
263 })
264 }
265
266 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
270 self.next_sequential_checkpoint
271 }
272
273 pub async fn concurrent_pipeline<H>(
280 &mut self,
281 handler: H,
282 config: ConcurrentConfig,
283 ) -> Result<()>
284 where
285 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
286 {
287 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
288 return Ok(());
289 };
290
291 self.pipelines.push(concurrent::pipeline::<H>(
292 handler,
293 next_checkpoint,
294 config,
295 self.store.clone(),
296 self.task.clone(),
297 self.ingestion_service.subscribe().0,
298 self.metrics.clone(),
299 ));
300
301 Ok(())
302 }
303
304 pub async fn run(self) -> Result<Service> {
310 if let Some(enabled_pipelines) = self.enabled_pipelines {
311 ensure!(
312 enabled_pipelines.is_empty(),
313 "Tried to enable pipelines that this indexer does not know about: \
314 {enabled_pipelines:#?}",
315 );
316 }
317
318 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
319
320 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
321
322 let mut service = self
323 .ingestion_service
324 .run(
325 self.first_ingestion_checkpoint..=last_checkpoint,
326 self.next_sequential_checkpoint,
327 )
328 .await
329 .context("Failed to start ingestion service")?;
330
331 for pipeline in self.pipelines {
332 service = service.merge(pipeline);
333 }
334
335 Ok(service)
336 }
337
338 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
347 ensure!(
348 self.added_pipelines.insert(P::NAME),
349 "Pipeline {:?} already added",
350 P::NAME,
351 );
352
353 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
354 && !enabled_pipelines.remove(P::NAME)
355 {
356 info!(pipeline = P::NAME, "Skipping");
357 return Ok(None);
358 }
359
360 let mut conn = self
361 .store
362 .connect()
363 .await
364 .context("Failed to establish connection to store")?;
365
366 let pipeline_task =
367 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
368
369 let checkpoint_hi_inclusive = conn
370 .init_watermark(&pipeline_task, self.default_next_checkpoint)
371 .await
372 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
373
374 let next_checkpoint =
375 checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
376
377 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
378
379 Ok(Some(next_checkpoint))
380 }
381}
382
383impl<T: TransactionalStore> Indexer<T> {
384 pub async fn sequential_pipeline<H>(
395 &mut self,
396 handler: H,
397 config: SequentialConfig,
398 ) -> Result<()>
399 where
400 H: Handler<Store = T> + Send + Sync + 'static,
401 {
402 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
403 return Ok(());
404 };
405
406 if self.task.is_some() {
407 bail!(
408 "Sequential pipelines do not support pipeline tasks. \
409 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
410 Running the same pipeline under a different task would violate these guarantees."
411 );
412 }
413
414 self.next_sequential_checkpoint = Some(
416 self.next_sequential_checkpoint
417 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
418 );
419
420 let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
421
422 self.pipelines.push(sequential::pipeline::<H>(
423 handler,
424 next_checkpoint,
425 config,
426 self.store.clone(),
427 checkpoint_rx,
428 watermark_tx,
429 self.metrics.clone(),
430 ));
431
432 Ok(())
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::sync::Arc;
439
440 use async_trait::async_trait;
441 use clap::Parser;
442 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
443 use sui_synthetic_ingestion::synthetic_ingestion;
444 use tokio::sync::watch;
445
446 use crate::FieldCount;
447 use crate::ingestion::ingestion_client::IngestionClientArgs;
448 use crate::mocks::store::MockStore;
449 use crate::pipeline::CommitterConfig;
450 use crate::pipeline::Processor;
451 use crate::pipeline::concurrent::ConcurrentConfig;
452 use crate::store::CommitterWatermark;
453
454 use super::*;
455
456 #[allow(dead_code)]
457 #[derive(Clone, FieldCount)]
458 struct MockValue(u64);
459
460 struct ControllableHandler {
462 process_below: watch::Receiver<u64>,
464 }
465
466 impl ControllableHandler {
467 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
468 let (tx, rx) = watch::channel(limit);
469 (Self { process_below: rx }, tx)
470 }
471 }
472
473 #[async_trait]
474 impl Processor for ControllableHandler {
475 const NAME: &'static str = "controllable";
476 const FANOUT: usize = 501;
481 type Value = MockValue;
482
483 async fn process(
484 &self,
485 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
486 ) -> anyhow::Result<Vec<Self::Value>> {
487 let cp_num = checkpoint.summary.sequence_number;
488
489 self.process_below
491 .clone()
492 .wait_for(|&limit| cp_num <= limit)
493 .await
494 .ok();
495
496 Ok(vec![MockValue(cp_num)])
497 }
498 }
499
500 #[async_trait]
501 impl crate::pipeline::concurrent::Handler for ControllableHandler {
502 type Store = MockStore;
503 type Batch = Vec<MockValue>;
504
505 fn batch(
506 &self,
507 batch: &mut Self::Batch,
508 values: &mut std::vec::IntoIter<Self::Value>,
509 ) -> crate::pipeline::concurrent::BatchStatus {
510 batch.extend(values);
511 crate::pipeline::concurrent::BatchStatus::Ready
512 }
513
514 async fn commit<'a>(
515 &self,
516 batch: &Self::Batch,
517 conn: &mut <Self::Store as Store>::Connection<'a>,
518 ) -> anyhow::Result<usize> {
519 for value in batch {
520 conn.0
521 .commit_data(Self::NAME, value.0, vec![value.0])
522 .await?;
523 }
524 Ok(batch.len())
525 }
526 }
527
528 macro_rules! test_pipeline {
529 ($handler:ident, $name:literal) => {
530 struct $handler;
531
532 #[async_trait]
533 impl Processor for $handler {
534 const NAME: &'static str = $name;
535 type Value = MockValue;
536 async fn process(
537 &self,
538 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
539 ) -> anyhow::Result<Vec<Self::Value>> {
540 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
541 }
542 }
543
544 #[async_trait]
545 impl crate::pipeline::concurrent::Handler for $handler {
546 type Store = MockStore;
547 type Batch = Vec<Self::Value>;
548
549 fn batch(
550 &self,
551 batch: &mut Self::Batch,
552 values: &mut std::vec::IntoIter<Self::Value>,
553 ) -> crate::pipeline::concurrent::BatchStatus {
554 batch.extend(values);
555 crate::pipeline::concurrent::BatchStatus::Pending
556 }
557
558 async fn commit<'a>(
559 &self,
560 batch: &Self::Batch,
561 conn: &mut <Self::Store as Store>::Connection<'a>,
562 ) -> anyhow::Result<usize> {
563 for value in batch {
564 conn.0
565 .commit_data(Self::NAME, value.0, vec![value.0])
566 .await?;
567 }
568 Ok(batch.len())
569 }
570 }
571
572 #[async_trait]
573 impl crate::pipeline::sequential::Handler for $handler {
574 type Store = MockStore;
575 type Batch = Vec<Self::Value>;
576
577 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
578 batch.extend(values);
579 }
580
581 async fn commit<'a>(
582 &self,
583 _batch: &Self::Batch,
584 _conn: &mut <Self::Store as Store>::Connection<'a>,
585 ) -> anyhow::Result<usize> {
586 Ok(1)
587 }
588 }
589 };
590 }
591
592 test_pipeline!(MockHandler, "test_processor");
593 test_pipeline!(SequentialHandler, "sequential_handler");
594 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
595
596 async fn test_init_watermark(
597 first_checkpoint: Option<u64>,
598 is_concurrent: bool,
599 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
600 let registry = Registry::new();
601 let store = MockStore::default();
602
603 test_pipeline!(A, "pipeline_name");
604
605 let mut conn = store.connect().await.unwrap();
606
607 let indexer_args = IndexerArgs {
608 first_checkpoint,
609 ..IndexerArgs::default()
610 };
611 let temp_dir = tempfile::tempdir().unwrap();
612 let client_args = ClientArgs {
613 ingestion: IngestionClientArgs {
614 local_ingestion_path: Some(temp_dir.path().to_owned()),
615 ..Default::default()
616 },
617 ..Default::default()
618 };
619 let ingestion_config = IngestionConfig::default();
620
621 let mut indexer = Indexer::new(
622 store.clone(),
623 indexer_args,
624 client_args,
625 ingestion_config,
626 None,
627 ®istry,
628 )
629 .await
630 .unwrap();
631
632 if is_concurrent {
633 indexer
634 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
635 .await
636 .unwrap();
637 } else {
638 indexer
639 .sequential_pipeline::<A>(A, SequentialConfig::default())
640 .await
641 .unwrap();
642 }
643
644 (
645 conn.committer_watermark(A::NAME).await.unwrap(),
646 conn.pruner_watermark(A::NAME, Duration::ZERO)
647 .await
648 .unwrap(),
649 )
650 }
651
652 #[test]
653 fn test_arg_parsing() {
654 #[derive(Parser)]
655 struct Args {
656 #[clap(flatten)]
657 indexer: IndexerArgs,
658 }
659
660 let args = Args::try_parse_from([
661 "cmd",
662 "--first-checkpoint",
663 "10",
664 "--last-checkpoint",
665 "100",
666 "--pipeline",
667 "a",
668 "--pipeline",
669 "b",
670 "--task",
671 "t",
672 "--reader-interval-ms",
673 "5000",
674 ])
675 .unwrap();
676
677 assert_eq!(args.indexer.first_checkpoint, Some(10));
678 assert_eq!(args.indexer.last_checkpoint, Some(100));
679 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
680 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
681 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
682 }
683
684 #[tokio::test]
686 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
687 let registry = Registry::new();
688 let store = MockStore::default();
689
690 test_pipeline!(A, "concurrent_a");
691 test_pipeline!(B, "concurrent_b");
692 test_pipeline!(C, "sequential_c");
693 test_pipeline!(D, "sequential_d");
694
695 let mut conn = store.connect().await.unwrap();
696 conn.set_committer_watermark(
697 A::NAME,
698 CommitterWatermark {
699 checkpoint_hi_inclusive: 100,
700 ..Default::default()
701 },
702 )
703 .await
704 .unwrap();
705 conn.set_committer_watermark(
706 B::NAME,
707 CommitterWatermark {
708 checkpoint_hi_inclusive: 10,
709 ..Default::default()
710 },
711 )
712 .await
713 .unwrap();
714 conn.set_committer_watermark(
715 C::NAME,
716 CommitterWatermark {
717 checkpoint_hi_inclusive: 1,
718 ..Default::default()
719 },
720 )
721 .await
722 .unwrap();
723 conn.set_committer_watermark(
724 D::NAME,
725 CommitterWatermark {
726 checkpoint_hi_inclusive: 50,
727 ..Default::default()
728 },
729 )
730 .await
731 .unwrap();
732
733 let indexer_args = IndexerArgs::default();
734 let temp_dir = tempfile::tempdir().unwrap();
735 let client_args = ClientArgs {
736 ingestion: IngestionClientArgs {
737 local_ingestion_path: Some(temp_dir.path().to_owned()),
738 ..Default::default()
739 },
740 ..Default::default()
741 };
742 let ingestion_config = IngestionConfig::default();
743
744 let mut indexer = Indexer::new(
745 store,
746 indexer_args,
747 client_args,
748 ingestion_config,
749 None,
750 ®istry,
751 )
752 .await
753 .unwrap();
754
755 indexer
756 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
757 .await
758 .unwrap();
759 indexer
760 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
761 .await
762 .unwrap();
763 indexer
764 .sequential_pipeline::<C>(C, SequentialConfig::default())
765 .await
766 .unwrap();
767 indexer
768 .sequential_pipeline::<D>(D, SequentialConfig::default())
769 .await
770 .unwrap();
771
772 assert_eq!(indexer.first_ingestion_checkpoint, 2);
773 }
774
775 #[tokio::test]
777 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
778 let registry = Registry::new();
779 let store = MockStore::default();
780
781 test_pipeline!(A, "concurrent_a");
782 test_pipeline!(B, "concurrent_b");
783 test_pipeline!(C, "sequential_c");
784 test_pipeline!(D, "sequential_d");
785
786 let mut conn = store.connect().await.unwrap();
787 conn.set_committer_watermark(
788 B::NAME,
789 CommitterWatermark {
790 checkpoint_hi_inclusive: 10,
791 ..Default::default()
792 },
793 )
794 .await
795 .unwrap();
796 conn.set_committer_watermark(
797 C::NAME,
798 CommitterWatermark {
799 checkpoint_hi_inclusive: 1,
800 ..Default::default()
801 },
802 )
803 .await
804 .unwrap();
805
806 let indexer_args = IndexerArgs::default();
807 let temp_dir = tempfile::tempdir().unwrap();
808 let client_args = ClientArgs {
809 ingestion: IngestionClientArgs {
810 local_ingestion_path: Some(temp_dir.path().to_owned()),
811 ..Default::default()
812 },
813 ..Default::default()
814 };
815 let ingestion_config = IngestionConfig::default();
816
817 let mut indexer = Indexer::new(
818 store,
819 indexer_args,
820 client_args,
821 ingestion_config,
822 None,
823 ®istry,
824 )
825 .await
826 .unwrap();
827
828 indexer
829 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
830 .await
831 .unwrap();
832 indexer
833 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
834 .await
835 .unwrap();
836 indexer
837 .sequential_pipeline::<C>(C, SequentialConfig::default())
838 .await
839 .unwrap();
840 indexer
841 .sequential_pipeline::<D>(D, SequentialConfig::default())
842 .await
843 .unwrap();
844
845 assert_eq!(indexer.first_ingestion_checkpoint, 0);
846 }
847
848 #[tokio::test]
850 async fn test_first_ingestion_checkpoint_smallest_is_0() {
851 let registry = Registry::new();
852 let store = MockStore::default();
853
854 test_pipeline!(A, "concurrent_a");
855 test_pipeline!(B, "concurrent_b");
856 test_pipeline!(C, "sequential_c");
857 test_pipeline!(D, "sequential_d");
858
859 let mut conn = store.connect().await.unwrap();
860 conn.set_committer_watermark(
861 A::NAME,
862 CommitterWatermark {
863 checkpoint_hi_inclusive: 100,
864 ..Default::default()
865 },
866 )
867 .await
868 .unwrap();
869 conn.set_committer_watermark(
870 B::NAME,
871 CommitterWatermark {
872 checkpoint_hi_inclusive: 10,
873 ..Default::default()
874 },
875 )
876 .await
877 .unwrap();
878 conn.set_committer_watermark(
879 C::NAME,
880 CommitterWatermark {
881 checkpoint_hi_inclusive: 1,
882 ..Default::default()
883 },
884 )
885 .await
886 .unwrap();
887 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
888 .await
889 .unwrap();
890
891 let indexer_args = IndexerArgs::default();
892 let temp_dir = tempfile::tempdir().unwrap();
893 let client_args = ClientArgs {
894 ingestion: IngestionClientArgs {
895 local_ingestion_path: Some(temp_dir.path().to_owned()),
896 ..Default::default()
897 },
898 ..Default::default()
899 };
900 let ingestion_config = IngestionConfig::default();
901
902 let mut indexer = Indexer::new(
903 store,
904 indexer_args,
905 client_args,
906 ingestion_config,
907 None,
908 ®istry,
909 )
910 .await
911 .unwrap();
912
913 indexer
914 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
915 .await
916 .unwrap();
917 indexer
918 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
919 .await
920 .unwrap();
921 indexer
922 .sequential_pipeline::<C>(C, SequentialConfig::default())
923 .await
924 .unwrap();
925 indexer
926 .sequential_pipeline::<D>(D, SequentialConfig::default())
927 .await
928 .unwrap();
929
930 assert_eq!(indexer.first_ingestion_checkpoint, 1);
931 }
932
933 #[tokio::test]
936 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
937 let registry = Registry::new();
938 let store = MockStore::default();
939
940 test_pipeline!(A, "concurrent_a");
941 test_pipeline!(B, "concurrent_b");
942 test_pipeline!(C, "sequential_c");
943 test_pipeline!(D, "sequential_d");
944
945 let mut conn = store.connect().await.unwrap();
946 conn.set_committer_watermark(
947 B::NAME,
948 CommitterWatermark {
949 checkpoint_hi_inclusive: 50,
950 ..Default::default()
951 },
952 )
953 .await
954 .unwrap();
955 conn.set_committer_watermark(
956 C::NAME,
957 CommitterWatermark {
958 checkpoint_hi_inclusive: 10,
959 ..Default::default()
960 },
961 )
962 .await
963 .unwrap();
964
965 let indexer_args = IndexerArgs {
966 first_checkpoint: Some(5),
967 ..Default::default()
968 };
969 let temp_dir = tempfile::tempdir().unwrap();
970 let client_args = ClientArgs {
971 ingestion: IngestionClientArgs {
972 local_ingestion_path: Some(temp_dir.path().to_owned()),
973 ..Default::default()
974 },
975 ..Default::default()
976 };
977 let ingestion_config = IngestionConfig::default();
978
979 let mut indexer = Indexer::new(
980 store,
981 indexer_args,
982 client_args,
983 ingestion_config,
984 None,
985 ®istry,
986 )
987 .await
988 .unwrap();
989
990 indexer
991 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
992 .await
993 .unwrap();
994 indexer
995 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
996 .await
997 .unwrap();
998 indexer
999 .sequential_pipeline::<C>(C, SequentialConfig::default())
1000 .await
1001 .unwrap();
1002 indexer
1003 .sequential_pipeline::<D>(D, SequentialConfig::default())
1004 .await
1005 .unwrap();
1006
1007 assert_eq!(indexer.first_ingestion_checkpoint, 5);
1008 }
1009
1010 #[tokio::test]
1013 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1014 let registry = Registry::new();
1015 let store = MockStore::default();
1016
1017 test_pipeline!(B, "concurrent_b");
1018 test_pipeline!(C, "sequential_c");
1019
1020 let mut conn = store.connect().await.unwrap();
1021 conn.set_committer_watermark(
1022 B::NAME,
1023 CommitterWatermark {
1024 checkpoint_hi_inclusive: 50,
1025 ..Default::default()
1026 },
1027 )
1028 .await
1029 .unwrap();
1030 conn.set_committer_watermark(
1031 C::NAME,
1032 CommitterWatermark {
1033 checkpoint_hi_inclusive: 10,
1034 ..Default::default()
1035 },
1036 )
1037 .await
1038 .unwrap();
1039
1040 let indexer_args = IndexerArgs {
1041 first_checkpoint: Some(5),
1042 ..Default::default()
1043 };
1044 let temp_dir = tempfile::tempdir().unwrap();
1045 let client_args = ClientArgs {
1046 ingestion: IngestionClientArgs {
1047 local_ingestion_path: Some(temp_dir.path().to_owned()),
1048 ..Default::default()
1049 },
1050 ..Default::default()
1051 };
1052 let ingestion_config = IngestionConfig::default();
1053
1054 let mut indexer = Indexer::new(
1055 store,
1056 indexer_args,
1057 client_args,
1058 ingestion_config,
1059 None,
1060 ®istry,
1061 )
1062 .await
1063 .unwrap();
1064
1065 indexer
1066 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1067 .await
1068 .unwrap();
1069 indexer
1070 .sequential_pipeline::<C>(C, SequentialConfig::default())
1071 .await
1072 .unwrap();
1073
1074 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1075 }
1076
1077 #[tokio::test]
1081 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1082 let registry = Registry::new();
1083 let store = MockStore::default();
1084
1085 test_pipeline!(A, "concurrent_a");
1086 test_pipeline!(B, "concurrent_b");
1087 test_pipeline!(C, "sequential_c");
1088
1089 let mut conn = store.connect().await.unwrap();
1090 conn.set_committer_watermark(
1091 B::NAME,
1092 CommitterWatermark {
1093 checkpoint_hi_inclusive: 50,
1094 ..Default::default()
1095 },
1096 )
1097 .await
1098 .unwrap();
1099 conn.set_committer_watermark(
1100 C::NAME,
1101 CommitterWatermark {
1102 checkpoint_hi_inclusive: 10,
1103 ..Default::default()
1104 },
1105 )
1106 .await
1107 .unwrap();
1108
1109 let indexer_args = IndexerArgs {
1110 first_checkpoint: Some(24),
1111 ..Default::default()
1112 };
1113 let temp_dir = tempfile::tempdir().unwrap();
1114 let client_args = ClientArgs {
1115 ingestion: IngestionClientArgs {
1116 local_ingestion_path: Some(temp_dir.path().to_owned()),
1117 ..Default::default()
1118 },
1119 ..Default::default()
1120 };
1121 let ingestion_config = IngestionConfig::default();
1122
1123 let mut indexer = Indexer::new(
1124 store,
1125 indexer_args,
1126 client_args,
1127 ingestion_config,
1128 None,
1129 ®istry,
1130 )
1131 .await
1132 .unwrap();
1133
1134 indexer
1135 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1136 .await
1137 .unwrap();
1138
1139 indexer
1140 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1141 .await
1142 .unwrap();
1143
1144 indexer
1145 .sequential_pipeline::<C>(C, SequentialConfig::default())
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1150 }
1151
1152 #[tokio::test]
1154 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1155 let registry = Registry::new();
1156 let store = MockStore::default();
1157
1158 test_pipeline!(A, "concurrent_a");
1159 test_pipeline!(B, "concurrent_b");
1160 test_pipeline!(C, "sequential_c");
1161 test_pipeline!(D, "sequential_d");
1162
1163 let mut conn = store.connect().await.unwrap();
1164 conn.set_committer_watermark(
1165 A::NAME,
1166 CommitterWatermark {
1167 checkpoint_hi_inclusive: 5,
1168 ..Default::default()
1169 },
1170 )
1171 .await
1172 .unwrap();
1173 conn.set_committer_watermark(
1174 B::NAME,
1175 CommitterWatermark {
1176 checkpoint_hi_inclusive: 10,
1177 ..Default::default()
1178 },
1179 )
1180 .await
1181 .unwrap();
1182 conn.set_committer_watermark(
1183 C::NAME,
1184 CommitterWatermark {
1185 checkpoint_hi_inclusive: 15,
1186 ..Default::default()
1187 },
1188 )
1189 .await
1190 .unwrap();
1191 conn.set_committer_watermark(
1192 D::NAME,
1193 CommitterWatermark {
1194 checkpoint_hi_inclusive: 20,
1195 ..Default::default()
1196 },
1197 )
1198 .await
1199 .unwrap();
1200
1201 let temp_dir = tempfile::tempdir().unwrap();
1203 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1204 ingestion_dir: temp_dir.path().to_owned(),
1205 starting_checkpoint: 5,
1206 num_checkpoints: 25,
1207 checkpoint_size: 1,
1208 })
1209 .await;
1210
1211 let indexer_args = IndexerArgs {
1212 last_checkpoint: Some(29),
1213 ..Default::default()
1214 };
1215
1216 let client_args = ClientArgs {
1217 ingestion: IngestionClientArgs {
1218 local_ingestion_path: Some(temp_dir.path().to_owned()),
1219 ..Default::default()
1220 },
1221 ..Default::default()
1222 };
1223
1224 let ingestion_config = IngestionConfig::default();
1225
1226 let mut indexer = Indexer::new(
1227 store.clone(),
1228 indexer_args,
1229 client_args,
1230 ingestion_config,
1231 None,
1232 ®istry,
1233 )
1234 .await
1235 .unwrap();
1236
1237 indexer
1238 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1239 .await
1240 .unwrap();
1241 indexer
1242 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1243 .await
1244 .unwrap();
1245 indexer
1246 .sequential_pipeline::<C>(C, SequentialConfig::default())
1247 .await
1248 .unwrap();
1249 indexer
1250 .sequential_pipeline::<D>(D, SequentialConfig::default())
1251 .await
1252 .unwrap();
1253
1254 let ingestion_metrics = indexer.ingestion_metrics().clone();
1255 let indexer_metrics = indexer.indexer_metrics().clone();
1256
1257 indexer.run().await.unwrap().join().await.unwrap();
1258
1259 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1260 assert_eq!(
1261 indexer_metrics
1262 .total_watermarks_out_of_order
1263 .get_metric_with_label_values(&[A::NAME])
1264 .unwrap()
1265 .get(),
1266 0
1267 );
1268 assert_eq!(
1269 indexer_metrics
1270 .total_watermarks_out_of_order
1271 .get_metric_with_label_values(&[B::NAME])
1272 .unwrap()
1273 .get(),
1274 5
1275 );
1276 assert_eq!(
1277 indexer_metrics
1278 .total_watermarks_out_of_order
1279 .get_metric_with_label_values(&[C::NAME])
1280 .unwrap()
1281 .get(),
1282 10
1283 );
1284 assert_eq!(
1285 indexer_metrics
1286 .total_watermarks_out_of_order
1287 .get_metric_with_label_values(&[D::NAME])
1288 .unwrap()
1289 .get(),
1290 15
1291 );
1292 }
1293
1294 #[tokio::test]
1296 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1297 let registry = Registry::new();
1298 let store = MockStore::default();
1299
1300 test_pipeline!(A, "concurrent_a");
1301 test_pipeline!(B, "concurrent_b");
1302 test_pipeline!(C, "sequential_c");
1303 test_pipeline!(D, "sequential_d");
1304
1305 let mut conn = store.connect().await.unwrap();
1306 conn.set_committer_watermark(
1307 A::NAME,
1308 CommitterWatermark {
1309 checkpoint_hi_inclusive: 5,
1310 ..Default::default()
1311 },
1312 )
1313 .await
1314 .unwrap();
1315 conn.set_committer_watermark(
1316 B::NAME,
1317 CommitterWatermark {
1318 checkpoint_hi_inclusive: 10,
1319 ..Default::default()
1320 },
1321 )
1322 .await
1323 .unwrap();
1324 conn.set_committer_watermark(
1325 C::NAME,
1326 CommitterWatermark {
1327 checkpoint_hi_inclusive: 15,
1328 ..Default::default()
1329 },
1330 )
1331 .await
1332 .unwrap();
1333 conn.set_committer_watermark(
1334 D::NAME,
1335 CommitterWatermark {
1336 checkpoint_hi_inclusive: 20,
1337 ..Default::default()
1338 },
1339 )
1340 .await
1341 .unwrap();
1342
1343 let temp_dir = tempfile::tempdir().unwrap();
1345 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1346 ingestion_dir: temp_dir.path().to_owned(),
1347 starting_checkpoint: 5,
1348 num_checkpoints: 25,
1349 checkpoint_size: 1,
1350 })
1351 .await;
1352
1353 let indexer_args = IndexerArgs {
1354 first_checkpoint: Some(3),
1355 last_checkpoint: Some(29),
1356 ..Default::default()
1357 };
1358
1359 let client_args = ClientArgs {
1360 ingestion: IngestionClientArgs {
1361 local_ingestion_path: Some(temp_dir.path().to_owned()),
1362 ..Default::default()
1363 },
1364 ..Default::default()
1365 };
1366
1367 let ingestion_config = IngestionConfig::default();
1368
1369 let mut indexer = Indexer::new(
1370 store.clone(),
1371 indexer_args,
1372 client_args,
1373 ingestion_config,
1374 None,
1375 ®istry,
1376 )
1377 .await
1378 .unwrap();
1379
1380 indexer
1381 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1382 .await
1383 .unwrap();
1384 indexer
1385 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1386 .await
1387 .unwrap();
1388 indexer
1389 .sequential_pipeline::<C>(C, SequentialConfig::default())
1390 .await
1391 .unwrap();
1392 indexer
1393 .sequential_pipeline::<D>(D, SequentialConfig::default())
1394 .await
1395 .unwrap();
1396
1397 let ingestion_metrics = indexer.ingestion_metrics().clone();
1398 let metrics = indexer.indexer_metrics().clone();
1399 indexer.run().await.unwrap().join().await.unwrap();
1400
1401 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1402 assert_eq!(
1403 metrics
1404 .total_watermarks_out_of_order
1405 .get_metric_with_label_values(&[A::NAME])
1406 .unwrap()
1407 .get(),
1408 0
1409 );
1410 assert_eq!(
1411 metrics
1412 .total_watermarks_out_of_order
1413 .get_metric_with_label_values(&[B::NAME])
1414 .unwrap()
1415 .get(),
1416 5
1417 );
1418 assert_eq!(
1419 metrics
1420 .total_watermarks_out_of_order
1421 .get_metric_with_label_values(&[C::NAME])
1422 .unwrap()
1423 .get(),
1424 10
1425 );
1426 assert_eq!(
1427 metrics
1428 .total_watermarks_out_of_order
1429 .get_metric_with_label_values(&[D::NAME])
1430 .unwrap()
1431 .get(),
1432 15
1433 );
1434 }
1435
1436 #[tokio::test]
1438 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1439 let registry = Registry::new();
1440 let store = MockStore::default();
1441
1442 test_pipeline!(A, "concurrent_a");
1443 test_pipeline!(B, "concurrent_b");
1444 test_pipeline!(C, "sequential_c");
1445 test_pipeline!(D, "sequential_d");
1446
1447 let mut conn = store.connect().await.unwrap();
1448 conn.set_committer_watermark(
1449 B::NAME,
1450 CommitterWatermark {
1451 checkpoint_hi_inclusive: 10,
1452 ..Default::default()
1453 },
1454 )
1455 .await
1456 .unwrap();
1457 conn.set_committer_watermark(
1458 C::NAME,
1459 CommitterWatermark {
1460 checkpoint_hi_inclusive: 15,
1461 ..Default::default()
1462 },
1463 )
1464 .await
1465 .unwrap();
1466 conn.set_committer_watermark(
1467 D::NAME,
1468 CommitterWatermark {
1469 checkpoint_hi_inclusive: 20,
1470 ..Default::default()
1471 },
1472 )
1473 .await
1474 .unwrap();
1475
1476 let temp_dir = tempfile::tempdir().unwrap();
1478 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1479 ingestion_dir: temp_dir.path().to_owned(),
1480 starting_checkpoint: 0,
1481 num_checkpoints: 30,
1482 checkpoint_size: 1,
1483 })
1484 .await;
1485
1486 let indexer_args = IndexerArgs {
1487 last_checkpoint: Some(29),
1488 ..Default::default()
1489 };
1490
1491 let client_args = ClientArgs {
1492 ingestion: IngestionClientArgs {
1493 local_ingestion_path: Some(temp_dir.path().to_owned()),
1494 ..Default::default()
1495 },
1496 ..Default::default()
1497 };
1498
1499 let ingestion_config = IngestionConfig::default();
1500
1501 let mut indexer = Indexer::new(
1502 store.clone(),
1503 indexer_args,
1504 client_args,
1505 ingestion_config,
1506 None,
1507 ®istry,
1508 )
1509 .await
1510 .unwrap();
1511
1512 indexer
1513 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1514 .await
1515 .unwrap();
1516 indexer
1517 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1518 .await
1519 .unwrap();
1520 indexer
1521 .sequential_pipeline::<C>(C, SequentialConfig::default())
1522 .await
1523 .unwrap();
1524 indexer
1525 .sequential_pipeline::<D>(D, SequentialConfig::default())
1526 .await
1527 .unwrap();
1528
1529 let ingestion_metrics = indexer.ingestion_metrics().clone();
1530 let metrics = indexer.indexer_metrics().clone();
1531 indexer.run().await.unwrap().join().await.unwrap();
1532
1533 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1534 assert_eq!(
1535 metrics
1536 .total_watermarks_out_of_order
1537 .get_metric_with_label_values(&[A::NAME])
1538 .unwrap()
1539 .get(),
1540 0
1541 );
1542 assert_eq!(
1543 metrics
1544 .total_watermarks_out_of_order
1545 .get_metric_with_label_values(&[B::NAME])
1546 .unwrap()
1547 .get(),
1548 11
1549 );
1550 assert_eq!(
1551 metrics
1552 .total_watermarks_out_of_order
1553 .get_metric_with_label_values(&[C::NAME])
1554 .unwrap()
1555 .get(),
1556 16
1557 );
1558 assert_eq!(
1559 metrics
1560 .total_watermarks_out_of_order
1561 .get_metric_with_label_values(&[D::NAME])
1562 .unwrap()
1563 .get(),
1564 21
1565 );
1566 }
1567
1568 #[tokio::test]
1570 async fn test_indexer_ingestion_use_first_checkpoint() {
1571 let registry = Registry::new();
1572 let store = MockStore::default();
1573
1574 test_pipeline!(A, "concurrent_a");
1575 test_pipeline!(B, "concurrent_b");
1576 test_pipeline!(C, "sequential_c");
1577 test_pipeline!(D, "sequential_d");
1578
1579 let mut conn = store.connect().await.unwrap();
1580 conn.set_committer_watermark(
1581 B::NAME,
1582 CommitterWatermark {
1583 checkpoint_hi_inclusive: 10,
1584 ..Default::default()
1585 },
1586 )
1587 .await
1588 .unwrap();
1589 conn.set_committer_watermark(
1590 C::NAME,
1591 CommitterWatermark {
1592 checkpoint_hi_inclusive: 15,
1593 ..Default::default()
1594 },
1595 )
1596 .await
1597 .unwrap();
1598 conn.set_committer_watermark(
1599 D::NAME,
1600 CommitterWatermark {
1601 checkpoint_hi_inclusive: 20,
1602 ..Default::default()
1603 },
1604 )
1605 .await
1606 .unwrap();
1607
1608 let temp_dir = tempfile::tempdir().unwrap();
1610 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1611 ingestion_dir: temp_dir.path().to_owned(),
1612 starting_checkpoint: 5,
1613 num_checkpoints: 25,
1614 checkpoint_size: 1,
1615 })
1616 .await;
1617
1618 let indexer_args = IndexerArgs {
1619 first_checkpoint: Some(10),
1620 last_checkpoint: Some(29),
1621 ..Default::default()
1622 };
1623
1624 let client_args = ClientArgs {
1625 ingestion: IngestionClientArgs {
1626 local_ingestion_path: Some(temp_dir.path().to_owned()),
1627 ..Default::default()
1628 },
1629 ..Default::default()
1630 };
1631
1632 let ingestion_config = IngestionConfig::default();
1633
1634 let mut indexer = Indexer::new(
1635 store.clone(),
1636 indexer_args,
1637 client_args,
1638 ingestion_config,
1639 None,
1640 ®istry,
1641 )
1642 .await
1643 .unwrap();
1644
1645 indexer
1646 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1647 .await
1648 .unwrap();
1649 indexer
1650 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1651 .await
1652 .unwrap();
1653 indexer
1654 .sequential_pipeline::<C>(C, SequentialConfig::default())
1655 .await
1656 .unwrap();
1657 indexer
1658 .sequential_pipeline::<D>(D, SequentialConfig::default())
1659 .await
1660 .unwrap();
1661
1662 let ingestion_metrics = indexer.ingestion_metrics().clone();
1663 let metrics = indexer.indexer_metrics().clone();
1664 indexer.run().await.unwrap().join().await.unwrap();
1665
1666 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1667 assert_eq!(
1668 metrics
1669 .total_watermarks_out_of_order
1670 .get_metric_with_label_values(&[A::NAME])
1671 .unwrap()
1672 .get(),
1673 0
1674 );
1675 assert_eq!(
1676 metrics
1677 .total_watermarks_out_of_order
1678 .get_metric_with_label_values(&[B::NAME])
1679 .unwrap()
1680 .get(),
1681 1
1682 );
1683 assert_eq!(
1684 metrics
1685 .total_watermarks_out_of_order
1686 .get_metric_with_label_values(&[C::NAME])
1687 .unwrap()
1688 .get(),
1689 6
1690 );
1691 assert_eq!(
1692 metrics
1693 .total_watermarks_out_of_order
1694 .get_metric_with_label_values(&[D::NAME])
1695 .unwrap()
1696 .get(),
1697 11
1698 );
1699 }
1700
1701 #[tokio::test]
1702 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1703 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1704 assert_eq!(committer_watermark, None);
1706 assert_eq!(pruner_watermark, None);
1707 }
1708
1709 #[tokio::test]
1710 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1711 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1712 assert_eq!(committer_watermark, None);
1714 assert_eq!(pruner_watermark, None);
1715 }
1716
1717 #[tokio::test]
1718 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1719 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1720
1721 let committer_watermark = committer_watermark.unwrap();
1722 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1723
1724 let pruner_watermark = pruner_watermark.unwrap();
1725 assert_eq!(pruner_watermark.reader_lo, 1);
1726 assert_eq!(pruner_watermark.pruner_hi, 1);
1727 }
1728
1729 #[tokio::test]
1730 async fn test_init_watermark_sequential() {
1731 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1732
1733 let committer_watermark = committer_watermark.unwrap();
1734 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1735
1736 let pruner_watermark = pruner_watermark.unwrap();
1737 assert_eq!(pruner_watermark.reader_lo, 1);
1738 assert_eq!(pruner_watermark.pruner_hi, 1);
1739 }
1740
1741 #[tokio::test]
1742 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1743 let registry = Registry::new();
1744 let store = MockStore::default();
1745
1746 let mut conn = store.connect().await.unwrap();
1748
1749 conn.set_committer_watermark(
1751 MockHandler::NAME,
1752 CommitterWatermark {
1753 epoch_hi_inclusive: 0,
1754 checkpoint_hi_inclusive: 10,
1755 tx_hi: 20,
1756 timestamp_ms_hi_inclusive: 10000,
1757 },
1758 )
1759 .await
1760 .unwrap();
1761
1762 conn.set_committer_watermark(
1764 SequentialHandler::NAME,
1765 CommitterWatermark {
1766 epoch_hi_inclusive: 0,
1767 checkpoint_hi_inclusive: 5,
1768 tx_hi: 10,
1769 timestamp_ms_hi_inclusive: 5000,
1770 },
1771 )
1772 .await
1773 .unwrap();
1774
1775 let temp_dir = tempfile::tempdir().unwrap();
1777 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1778 ingestion_dir: temp_dir.path().to_owned(),
1779 starting_checkpoint: 0,
1780 num_checkpoints: 20,
1781 checkpoint_size: 2,
1782 })
1783 .await;
1784
1785 let indexer_args = IndexerArgs {
1786 first_checkpoint: None,
1787 last_checkpoint: Some(19),
1788 pipeline: vec![],
1789 ..Default::default()
1790 };
1791
1792 let client_args = ClientArgs {
1793 ingestion: IngestionClientArgs {
1794 local_ingestion_path: Some(temp_dir.path().to_owned()),
1795 ..Default::default()
1796 },
1797 ..Default::default()
1798 };
1799
1800 let ingestion_config = IngestionConfig::default();
1801
1802 let mut indexer = Indexer::new(
1803 store.clone(),
1804 indexer_args,
1805 client_args,
1806 ingestion_config,
1807 None,
1808 ®istry,
1809 )
1810 .await
1811 .unwrap();
1812
1813 indexer
1815 .sequential_pipeline(
1816 MockHandler,
1817 pipeline::sequential::SequentialConfig::default(),
1818 )
1819 .await
1820 .unwrap();
1821
1822 assert_eq!(
1824 indexer.next_sequential_checkpoint(),
1825 Some(11),
1826 "next_sequential_checkpoint should be 11"
1827 );
1828
1829 indexer
1831 .sequential_pipeline(
1832 SequentialHandler,
1833 pipeline::sequential::SequentialConfig::default(),
1834 )
1835 .await
1836 .unwrap();
1837
1838 assert_eq!(
1840 indexer.next_sequential_checkpoint(),
1841 Some(6),
1842 "next_sequential_checkpoint should still be 6"
1843 );
1844
1845 indexer.run().await.unwrap().join().await.unwrap();
1847
1848 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1850 let watermark2 = conn
1851 .committer_watermark(SequentialHandler::NAME)
1852 .await
1853 .unwrap();
1854
1855 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1856 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1857 }
1858
1859 #[tokio::test]
1863 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1864 let registry = Registry::new();
1865 let store = MockStore::default();
1866
1867 let mut conn = store.connect().await.unwrap();
1870 conn.set_committer_watermark(
1871 MockCheckpointSequenceNumberHandler::NAME,
1872 CommitterWatermark {
1873 checkpoint_hi_inclusive: 10,
1874 ..Default::default()
1875 },
1876 )
1877 .await
1878 .unwrap();
1879 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1880 .await
1881 .unwrap();
1882
1883 let indexer_args = IndexerArgs {
1886 first_checkpoint: Some(0),
1887 last_checkpoint: Some(15),
1888 pipeline: vec![],
1889 task: TaskArgs::tasked("task".to_string(), 10),
1890 };
1891 let temp_dir = tempfile::tempdir().unwrap();
1892 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1893 ingestion_dir: temp_dir.path().to_owned(),
1894 starting_checkpoint: 0,
1895 num_checkpoints: 16,
1896 checkpoint_size: 2,
1897 })
1898 .await;
1899
1900 let client_args = ClientArgs {
1901 ingestion: IngestionClientArgs {
1902 local_ingestion_path: Some(temp_dir.path().to_owned()),
1903 ..Default::default()
1904 },
1905 ..Default::default()
1906 };
1907
1908 let ingestion_config = IngestionConfig::default();
1909
1910 let mut tasked_indexer = Indexer::new(
1911 store.clone(),
1912 indexer_args,
1913 client_args,
1914 ingestion_config,
1915 None,
1916 ®istry,
1917 )
1918 .await
1919 .unwrap();
1920
1921 let _ = tasked_indexer
1922 .concurrent_pipeline(
1923 MockCheckpointSequenceNumberHandler,
1924 ConcurrentConfig::default(),
1925 )
1926 .await;
1927
1928 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1929 let metrics = tasked_indexer.indexer_metrics().clone();
1930
1931 tasked_indexer.run().await.unwrap().join().await.unwrap();
1932
1933 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1934 assert_eq!(
1935 metrics
1936 .total_collector_skipped_checkpoints
1937 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1938 .unwrap()
1939 .get(),
1940 7
1941 );
1942 let data = store
1943 .data
1944 .get(MockCheckpointSequenceNumberHandler::NAME)
1945 .unwrap();
1946 assert_eq!(data.len(), 9);
1947 for i in 0..7 {
1948 assert!(data.get(&i).is_none());
1949 }
1950 for i in 7..16 {
1951 assert!(data.get(&i).is_some());
1952 }
1953 }
1954
1955 #[tokio::test]
1957 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1958 let registry = Registry::new();
1959 let store = MockStore::default();
1960
1961 let mut conn = store.connect().await.unwrap();
1962 conn.set_committer_watermark(
1963 "test",
1964 CommitterWatermark {
1965 checkpoint_hi_inclusive: 10,
1966 ..Default::default()
1967 },
1968 )
1969 .await
1970 .unwrap();
1971 conn.set_reader_watermark("test", 5).await.unwrap();
1972
1973 let indexer_args = IndexerArgs {
1976 first_checkpoint: Some(9),
1977 last_checkpoint: Some(25),
1978 pipeline: vec![],
1979 task: TaskArgs::tasked("task".to_string(), 10),
1980 };
1981 let temp_dir = tempfile::tempdir().unwrap();
1982 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1983 ingestion_dir: temp_dir.path().to_owned(),
1984 starting_checkpoint: 9,
1985 num_checkpoints: 17,
1986 checkpoint_size: 2,
1987 })
1988 .await;
1989
1990 let client_args = ClientArgs {
1991 ingestion: IngestionClientArgs {
1992 local_ingestion_path: Some(temp_dir.path().to_owned()),
1993 ..Default::default()
1994 },
1995 ..Default::default()
1996 };
1997
1998 let ingestion_config = IngestionConfig::default();
1999
2000 let mut tasked_indexer = Indexer::new(
2001 store.clone(),
2002 indexer_args,
2003 client_args,
2004 ingestion_config,
2005 None,
2006 ®istry,
2007 )
2008 .await
2009 .unwrap();
2010
2011 let _ = tasked_indexer
2012 .concurrent_pipeline(
2013 MockCheckpointSequenceNumberHandler,
2014 ConcurrentConfig::default(),
2015 )
2016 .await;
2017
2018 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2019 let metrics = tasked_indexer.indexer_metrics().clone();
2020
2021 tasked_indexer.run().await.unwrap().join().await.unwrap();
2022
2023 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2024 assert_eq!(
2025 metrics
2026 .total_watermarks_out_of_order
2027 .get_metric_with_label_values(&["test"])
2028 .unwrap()
2029 .get(),
2030 0
2031 );
2032 assert_eq!(
2033 metrics
2034 .total_collector_skipped_checkpoints
2035 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2036 .unwrap()
2037 .get(),
2038 0
2039 );
2040
2041 let data = store.data.get("test").unwrap();
2042 assert!(data.len() == 17);
2043 for i in 0..9 {
2044 assert!(data.get(&i).is_none());
2045 }
2046 for i in 9..26 {
2047 assert!(data.get(&i).is_some());
2048 }
2049 let main_pipeline_watermark = store.watermark("test").unwrap();
2050 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2052 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2053 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2054 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2055 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2056 }
2057
2058 #[tokio::test]
2061 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2062 let registry = Registry::new();
2063 let store = MockStore::default();
2064 let mut conn = store.connect().await.unwrap();
2065 conn.set_committer_watermark(
2067 ControllableHandler::NAME,
2068 CommitterWatermark {
2069 checkpoint_hi_inclusive: 11,
2070 ..Default::default()
2071 },
2072 )
2073 .await
2074 .unwrap();
2075
2076 let temp_dir = tempfile::tempdir().unwrap();
2078 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2079 ingestion_dir: temp_dir.path().to_owned(),
2080 starting_checkpoint: 0,
2081 num_checkpoints: 501,
2082 checkpoint_size: 2,
2083 })
2084 .await;
2085 let indexer_args = IndexerArgs {
2086 first_checkpoint: Some(0),
2087 last_checkpoint: Some(500),
2088 pipeline: vec![],
2089 task: TaskArgs::tasked("task".to_string(), 10 ),
2090 };
2091 let client_args = ClientArgs {
2092 ingestion: IngestionClientArgs {
2093 local_ingestion_path: Some(temp_dir.path().to_owned()),
2094 ..Default::default()
2095 },
2096 ..Default::default()
2097 };
2098 let ingestion_config = IngestionConfig::default();
2099 let mut tasked_indexer = Indexer::new(
2100 store.clone(),
2101 indexer_args,
2102 client_args,
2103 ingestion_config,
2104 None,
2105 ®istry,
2106 )
2107 .await
2108 .unwrap();
2109 let mut allow_process = 10;
2110 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2112 let _ = tasked_indexer
2113 .concurrent_pipeline(
2114 controllable_handler,
2115 ConcurrentConfig {
2116 committer: CommitterConfig {
2117 collect_interval_ms: 10,
2118 watermark_interval_ms: 10,
2119 ..Default::default()
2120 },
2121 ..Default::default()
2122 },
2123 )
2124 .await;
2125 let metrics = tasked_indexer.indexer_metrics().clone();
2126
2127 let mut s_indexer = tasked_indexer.run().await.unwrap();
2128
2129 store
2133 .wait_for_watermark(
2134 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2135 10,
2136 std::time::Duration::from_secs(10),
2137 )
2138 .await;
2139
2140 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2145 .await
2146 .unwrap();
2147
2148 let reader_lo = metrics
2149 .collector_reader_lo
2150 .with_label_values(&[ControllableHandler::NAME]);
2151
2152 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2156 while reader_lo.get() != 250 {
2157 interval.tick().await;
2158 allow_process += 1;
2160 assert!(
2161 allow_process <= 500,
2162 "Released all checkpoints but collector never observed new reader_lo"
2163 );
2164 process_below.send(allow_process).ok();
2165 }
2166
2167 process_below.send(500).ok();
2174
2175 s_indexer.join().await.unwrap();
2176
2177 let data = store.data.get(ControllableHandler::NAME).unwrap();
2178
2179 for chkpt in (allow_process + 1)..250 {
2181 assert!(
2182 data.get(&chkpt).is_none(),
2183 "Checkpoint {chkpt} should have been skipped"
2184 );
2185 }
2186
2187 for chkpt in 250..=500 {
2189 assert!(
2190 data.get(&chkpt).is_some(),
2191 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2192 );
2193 }
2194
2195 for chkpt in 0..=10 {
2197 assert!(
2198 data.get(&chkpt).is_some(),
2199 "Checkpoint {chkpt} should have been committed (baseline)"
2200 );
2201 }
2202 }
2203}