1use std::{collections::BTreeSet, sync::Arc, time::Duration};
5
6use anyhow::{Context, bail, ensure};
7use ingestion::{ClientArgs, IngestionConfig, IngestionService, ingestion_client::IngestionClient};
8use metrics::IndexerMetrics;
9use prometheus::Registry;
10use sui_indexer_alt_framework_store_traits::{
11 Connection, Store, TransactionalStore, pipeline_task,
12};
13use tracing::info;
14
15pub use anyhow::Result;
16pub use sui_field_count::FieldCount;
17pub use sui_futures::service;
18pub use sui_indexer_alt_framework_store_traits as store;
20pub use sui_types as types;
21
22use crate::metrics::IngestionMetrics;
23use crate::pipeline::{
24 Processor,
25 concurrent::{self, ConcurrentConfig},
26 sequential::{self, Handler, SequentialConfig},
27};
28use crate::service::Service;
29
30#[cfg(feature = "cluster")]
31pub mod cluster;
32pub mod ingestion;
33pub mod metrics;
34pub mod pipeline;
35#[cfg(feature = "postgres")]
36pub mod postgres;
37
38#[cfg(test)]
39pub mod mocks;
40
41#[derive(clap::Args, Default, Debug, Clone)]
43pub struct IndexerArgs {
44 #[arg(long)]
51 pub first_checkpoint: Option<u64>,
52
53 #[arg(long)]
56 pub last_checkpoint: Option<u64>,
57
58 #[arg(long, action = clap::ArgAction::Append)]
61 pub pipeline: Vec<String>,
62
63 #[clap(flatten)]
65 pub task: TaskArgs,
66}
67
68#[derive(clap::Parser, Default, Debug, Clone)]
70pub struct TaskArgs {
71 #[arg(long, requires = "reader_interval_ms")]
83 task: Option<String>,
84
85 #[arg(long, requires = "task")]
88 reader_interval_ms: Option<u64>,
89}
90
91pub struct Indexer<S: Store> {
92 store: S,
96
97 metrics: Arc<IndexerMetrics>,
99
100 ingestion_service: IngestionService,
102
103 default_next_checkpoint: u64,
110
111 last_checkpoint: Option<u64>,
114
115 task: Option<Task>,
127
128 enabled_pipelines: Option<BTreeSet<String>>,
132
133 added_pipelines: BTreeSet<&'static str>,
136
137 first_ingestion_checkpoint: u64,
141
142 next_sequential_checkpoint: Option<u64>,
145
146 pipelines: Vec<Service>,
148}
149
150#[derive(Clone)]
152pub(crate) struct Task {
153 task: String,
156 reader_interval: Duration,
159}
160
161impl TaskArgs {
162 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
163 Self {
164 task: Some(task),
165 reader_interval_ms: Some(reader_interval_ms),
166 }
167 }
168
169 fn into_task(self) -> Option<Task> {
170 Some(Task {
171 task: self.task?,
172 reader_interval: Duration::from_millis(self.reader_interval_ms?),
173 })
174 }
175}
176
177impl<S: Store> Indexer<S> {
178 pub async fn new(
190 store: S,
191 indexer_args: IndexerArgs,
192 client_args: ClientArgs,
193 ingestion_config: IngestionConfig,
194 metrics_prefix: Option<&str>,
195 registry: &Registry,
196 ) -> Result<Self> {
197 let IndexerArgs {
198 first_checkpoint,
199 last_checkpoint,
200 pipeline,
201 task,
202 } = indexer_args;
203
204 let metrics = IndexerMetrics::new(metrics_prefix, registry);
205
206 let ingestion_service =
207 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
208
209 Ok(Self {
210 store,
211 metrics,
212 ingestion_service,
213 default_next_checkpoint: first_checkpoint.unwrap_or_default(),
214 last_checkpoint,
215 task: task.into_task(),
216 enabled_pipelines: if pipeline.is_empty() {
217 None
218 } else {
219 Some(pipeline.into_iter().collect())
220 },
221 added_pipelines: BTreeSet::new(),
222 first_ingestion_checkpoint: u64::MAX,
223 next_sequential_checkpoint: None,
224 pipelines: vec![],
225 })
226 }
227
228 pub fn store(&self) -> &S {
230 &self.store
231 }
232
233 pub fn ingestion_client(&self) -> &IngestionClient {
235 self.ingestion_service.ingestion_client()
236 }
237
238 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
240 &self.metrics
241 }
242
243 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
245 self.ingestion_service.metrics()
246 }
247
248 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
250 self.added_pipelines.iter().copied().filter(|p| {
251 self.enabled_pipelines
252 .as_ref()
253 .is_none_or(|e| e.contains(*p))
254 })
255 }
256
257 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
261 self.next_sequential_checkpoint
262 }
263
264 pub async fn concurrent_pipeline<H>(
271 &mut self,
272 handler: H,
273 config: ConcurrentConfig,
274 ) -> Result<()>
275 where
276 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
277 {
278 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
279 return Ok(());
280 };
281
282 self.pipelines.push(concurrent::pipeline::<H>(
283 handler,
284 next_checkpoint,
285 config,
286 self.store.clone(),
287 self.task.clone(),
288 self.ingestion_service.subscribe().0,
289 self.metrics.clone(),
290 ));
291
292 Ok(())
293 }
294
295 pub async fn run(self) -> Result<Service> {
301 if let Some(enabled_pipelines) = self.enabled_pipelines {
302 ensure!(
303 enabled_pipelines.is_empty(),
304 "Tried to enable pipelines that this indexer does not know about: \
305 {enabled_pipelines:#?}",
306 );
307 }
308
309 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
310
311 info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
312
313 let mut service = self
314 .ingestion_service
315 .run(
316 self.first_ingestion_checkpoint..=last_checkpoint,
317 self.next_sequential_checkpoint,
318 )
319 .await
320 .context("Failed to start ingestion service")?;
321
322 for pipeline in self.pipelines {
323 service = service.merge(pipeline);
324 }
325
326 Ok(service)
327 }
328
329 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
338 ensure!(
339 self.added_pipelines.insert(P::NAME),
340 "Pipeline {:?} already added",
341 P::NAME,
342 );
343
344 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
345 && !enabled_pipelines.remove(P::NAME)
346 {
347 info!(pipeline = P::NAME, "Skipping");
348 return Ok(None);
349 }
350
351 let mut conn = self
352 .store
353 .connect()
354 .await
355 .context("Failed to establish connection to store")?;
356
357 let pipeline_task =
358 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
359
360 let checkpoint_hi_inclusive = conn
361 .init_watermark(&pipeline_task, self.default_next_checkpoint)
362 .await
363 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
364
365 let next_checkpoint =
366 checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
367
368 self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
369
370 Ok(Some(next_checkpoint))
371 }
372}
373
374impl<T: TransactionalStore> Indexer<T> {
375 pub async fn sequential_pipeline<H>(
386 &mut self,
387 handler: H,
388 config: SequentialConfig,
389 ) -> Result<()>
390 where
391 H: Handler<Store = T> + Send + Sync + 'static,
392 {
393 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
394 return Ok(());
395 };
396
397 if self.task.is_some() {
398 bail!(
399 "Sequential pipelines do not support pipeline tasks. \
400 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
401 Running the same pipeline under a different task would violate these guarantees."
402 );
403 }
404
405 self.next_sequential_checkpoint = Some(
407 self.next_sequential_checkpoint
408 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
409 );
410
411 let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
412
413 self.pipelines.push(sequential::pipeline::<H>(
414 handler,
415 next_checkpoint,
416 config,
417 self.store.clone(),
418 checkpoint_rx,
419 watermark_tx,
420 self.metrics.clone(),
421 ));
422
423 Ok(())
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use std::sync::Arc;
430
431 use async_trait::async_trait;
432 use clap::Parser;
433 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
434 use sui_synthetic_ingestion::synthetic_ingestion;
435 use tokio::sync::watch;
436
437 use crate::FieldCount;
438 use crate::ingestion::ingestion_client::IngestionClientArgs;
439 use crate::mocks::store::MockStore;
440 use crate::pipeline::CommitterConfig;
441 use crate::pipeline::{Processor, concurrent::ConcurrentConfig};
442 use crate::store::CommitterWatermark;
443
444 use super::*;
445
446 #[allow(dead_code)]
447 #[derive(Clone, FieldCount)]
448 struct MockValue(u64);
449
450 struct ControllableHandler {
452 process_below: watch::Receiver<u64>,
454 }
455
456 impl ControllableHandler {
457 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
458 let (tx, rx) = watch::channel(limit);
459 (Self { process_below: rx }, tx)
460 }
461 }
462
463 #[async_trait]
464 impl Processor for ControllableHandler {
465 const NAME: &'static str = "controllable";
466 const FANOUT: usize = 501;
471 type Value = MockValue;
472
473 async fn process(
474 &self,
475 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
476 ) -> anyhow::Result<Vec<Self::Value>> {
477 let cp_num = checkpoint.summary.sequence_number;
478
479 self.process_below
481 .clone()
482 .wait_for(|&limit| cp_num <= limit)
483 .await
484 .ok();
485
486 Ok(vec![MockValue(cp_num)])
487 }
488 }
489
490 #[async_trait]
491 impl crate::pipeline::concurrent::Handler for ControllableHandler {
492 type Store = MockStore;
493 type Batch = Vec<MockValue>;
494
495 fn batch(
496 &self,
497 batch: &mut Self::Batch,
498 values: &mut std::vec::IntoIter<Self::Value>,
499 ) -> crate::pipeline::concurrent::BatchStatus {
500 batch.extend(values);
501 crate::pipeline::concurrent::BatchStatus::Ready
502 }
503
504 async fn commit<'a>(
505 &self,
506 batch: &Self::Batch,
507 conn: &mut <Self::Store as Store>::Connection<'a>,
508 ) -> anyhow::Result<usize> {
509 for value in batch {
510 conn.0
511 .commit_data(Self::NAME, value.0, vec![value.0])
512 .await?;
513 }
514 Ok(batch.len())
515 }
516 }
517
518 macro_rules! test_pipeline {
519 ($handler:ident, $name:literal) => {
520 struct $handler;
521
522 #[async_trait]
523 impl Processor for $handler {
524 const NAME: &'static str = $name;
525 type Value = MockValue;
526 async fn process(
527 &self,
528 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
529 ) -> anyhow::Result<Vec<Self::Value>> {
530 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
531 }
532 }
533
534 #[async_trait]
535 impl crate::pipeline::concurrent::Handler for $handler {
536 type Store = MockStore;
537 type Batch = Vec<Self::Value>;
538
539 fn batch(
540 &self,
541 batch: &mut Self::Batch,
542 values: &mut std::vec::IntoIter<Self::Value>,
543 ) -> crate::pipeline::concurrent::BatchStatus {
544 batch.extend(values);
545 crate::pipeline::concurrent::BatchStatus::Pending
546 }
547
548 async fn commit<'a>(
549 &self,
550 batch: &Self::Batch,
551 conn: &mut <Self::Store as Store>::Connection<'a>,
552 ) -> anyhow::Result<usize> {
553 for value in batch {
554 conn.0
555 .commit_data(Self::NAME, value.0, vec![value.0])
556 .await?;
557 }
558 Ok(batch.len())
559 }
560 }
561
562 #[async_trait]
563 impl crate::pipeline::sequential::Handler for $handler {
564 type Store = MockStore;
565 type Batch = Vec<Self::Value>;
566
567 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
568 batch.extend(values);
569 }
570
571 async fn commit<'a>(
572 &self,
573 _batch: &Self::Batch,
574 _conn: &mut <Self::Store as Store>::Connection<'a>,
575 ) -> anyhow::Result<usize> {
576 Ok(1)
577 }
578 }
579 };
580 }
581
582 test_pipeline!(MockHandler, "test_processor");
583 test_pipeline!(SequentialHandler, "sequential_handler");
584 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
585
586 async fn test_init_watermark(
587 first_checkpoint: Option<u64>,
588 is_concurrent: bool,
589 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
590 let registry = Registry::new();
591 let store = MockStore::default();
592
593 test_pipeline!(A, "pipeline_name");
594
595 let mut conn = store.connect().await.unwrap();
596
597 let indexer_args = IndexerArgs {
598 first_checkpoint,
599 ..IndexerArgs::default()
600 };
601 let temp_dir = tempfile::tempdir().unwrap();
602 let client_args = ClientArgs {
603 ingestion: IngestionClientArgs {
604 local_ingestion_path: Some(temp_dir.path().to_owned()),
605 ..Default::default()
606 },
607 ..Default::default()
608 };
609 let ingestion_config = IngestionConfig::default();
610
611 let mut indexer = Indexer::new(
612 store.clone(),
613 indexer_args,
614 client_args,
615 ingestion_config,
616 None,
617 ®istry,
618 )
619 .await
620 .unwrap();
621
622 if is_concurrent {
623 indexer
624 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
625 .await
626 .unwrap();
627 } else {
628 indexer
629 .sequential_pipeline::<A>(A, SequentialConfig::default())
630 .await
631 .unwrap();
632 }
633
634 (
635 conn.committer_watermark(A::NAME).await.unwrap(),
636 conn.pruner_watermark(A::NAME, Duration::ZERO)
637 .await
638 .unwrap(),
639 )
640 }
641
642 #[test]
643 fn test_arg_parsing() {
644 #[derive(Parser)]
645 struct Args {
646 #[clap(flatten)]
647 indexer: IndexerArgs,
648 }
649
650 let args = Args::try_parse_from([
651 "cmd",
652 "--first-checkpoint",
653 "10",
654 "--last-checkpoint",
655 "100",
656 "--pipeline",
657 "a",
658 "--pipeline",
659 "b",
660 "--task",
661 "t",
662 "--reader-interval-ms",
663 "5000",
664 ])
665 .unwrap();
666
667 assert_eq!(args.indexer.first_checkpoint, Some(10));
668 assert_eq!(args.indexer.last_checkpoint, Some(100));
669 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
670 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
671 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
672 }
673
674 #[tokio::test]
676 async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
677 let registry = Registry::new();
678 let store = MockStore::default();
679
680 test_pipeline!(A, "concurrent_a");
681 test_pipeline!(B, "concurrent_b");
682 test_pipeline!(C, "sequential_c");
683 test_pipeline!(D, "sequential_d");
684
685 let mut conn = store.connect().await.unwrap();
686 conn.set_committer_watermark(
687 A::NAME,
688 CommitterWatermark {
689 checkpoint_hi_inclusive: 100,
690 ..Default::default()
691 },
692 )
693 .await
694 .unwrap();
695 conn.set_committer_watermark(
696 B::NAME,
697 CommitterWatermark {
698 checkpoint_hi_inclusive: 10,
699 ..Default::default()
700 },
701 )
702 .await
703 .unwrap();
704 conn.set_committer_watermark(
705 C::NAME,
706 CommitterWatermark {
707 checkpoint_hi_inclusive: 1,
708 ..Default::default()
709 },
710 )
711 .await
712 .unwrap();
713 conn.set_committer_watermark(
714 D::NAME,
715 CommitterWatermark {
716 checkpoint_hi_inclusive: 50,
717 ..Default::default()
718 },
719 )
720 .await
721 .unwrap();
722
723 let indexer_args = IndexerArgs::default();
724 let temp_dir = tempfile::tempdir().unwrap();
725 let client_args = ClientArgs {
726 ingestion: IngestionClientArgs {
727 local_ingestion_path: Some(temp_dir.path().to_owned()),
728 ..Default::default()
729 },
730 ..Default::default()
731 };
732 let ingestion_config = IngestionConfig::default();
733
734 let mut indexer = Indexer::new(
735 store,
736 indexer_args,
737 client_args,
738 ingestion_config,
739 None,
740 ®istry,
741 )
742 .await
743 .unwrap();
744
745 indexer
746 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
747 .await
748 .unwrap();
749 indexer
750 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
751 .await
752 .unwrap();
753 indexer
754 .sequential_pipeline::<C>(C, SequentialConfig::default())
755 .await
756 .unwrap();
757 indexer
758 .sequential_pipeline::<D>(D, SequentialConfig::default())
759 .await
760 .unwrap();
761
762 assert_eq!(indexer.first_ingestion_checkpoint, 2);
763 }
764
765 #[tokio::test]
767 async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
768 let registry = Registry::new();
769 let store = MockStore::default();
770
771 test_pipeline!(A, "concurrent_a");
772 test_pipeline!(B, "concurrent_b");
773 test_pipeline!(C, "sequential_c");
774 test_pipeline!(D, "sequential_d");
775
776 let mut conn = store.connect().await.unwrap();
777 conn.set_committer_watermark(
778 B::NAME,
779 CommitterWatermark {
780 checkpoint_hi_inclusive: 10,
781 ..Default::default()
782 },
783 )
784 .await
785 .unwrap();
786 conn.set_committer_watermark(
787 C::NAME,
788 CommitterWatermark {
789 checkpoint_hi_inclusive: 1,
790 ..Default::default()
791 },
792 )
793 .await
794 .unwrap();
795
796 let indexer_args = IndexerArgs::default();
797 let temp_dir = tempfile::tempdir().unwrap();
798 let client_args = ClientArgs {
799 ingestion: IngestionClientArgs {
800 local_ingestion_path: Some(temp_dir.path().to_owned()),
801 ..Default::default()
802 },
803 ..Default::default()
804 };
805 let ingestion_config = IngestionConfig::default();
806
807 let mut indexer = Indexer::new(
808 store,
809 indexer_args,
810 client_args,
811 ingestion_config,
812 None,
813 ®istry,
814 )
815 .await
816 .unwrap();
817
818 indexer
819 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
820 .await
821 .unwrap();
822 indexer
823 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
824 .await
825 .unwrap();
826 indexer
827 .sequential_pipeline::<C>(C, SequentialConfig::default())
828 .await
829 .unwrap();
830 indexer
831 .sequential_pipeline::<D>(D, SequentialConfig::default())
832 .await
833 .unwrap();
834
835 assert_eq!(indexer.first_ingestion_checkpoint, 0);
836 }
837
838 #[tokio::test]
840 async fn test_first_ingestion_checkpoint_smallest_is_0() {
841 let registry = Registry::new();
842 let store = MockStore::default();
843
844 test_pipeline!(A, "concurrent_a");
845 test_pipeline!(B, "concurrent_b");
846 test_pipeline!(C, "sequential_c");
847 test_pipeline!(D, "sequential_d");
848
849 let mut conn = store.connect().await.unwrap();
850 conn.set_committer_watermark(
851 A::NAME,
852 CommitterWatermark {
853 checkpoint_hi_inclusive: 100,
854 ..Default::default()
855 },
856 )
857 .await
858 .unwrap();
859 conn.set_committer_watermark(
860 B::NAME,
861 CommitterWatermark {
862 checkpoint_hi_inclusive: 10,
863 ..Default::default()
864 },
865 )
866 .await
867 .unwrap();
868 conn.set_committer_watermark(
869 C::NAME,
870 CommitterWatermark {
871 checkpoint_hi_inclusive: 1,
872 ..Default::default()
873 },
874 )
875 .await
876 .unwrap();
877 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
878 .await
879 .unwrap();
880
881 let indexer_args = IndexerArgs::default();
882 let temp_dir = tempfile::tempdir().unwrap();
883 let client_args = ClientArgs {
884 ingestion: IngestionClientArgs {
885 local_ingestion_path: Some(temp_dir.path().to_owned()),
886 ..Default::default()
887 },
888 ..Default::default()
889 };
890 let ingestion_config = IngestionConfig::default();
891
892 let mut indexer = Indexer::new(
893 store,
894 indexer_args,
895 client_args,
896 ingestion_config,
897 None,
898 ®istry,
899 )
900 .await
901 .unwrap();
902
903 indexer
904 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
905 .await
906 .unwrap();
907 indexer
908 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
909 .await
910 .unwrap();
911 indexer
912 .sequential_pipeline::<C>(C, SequentialConfig::default())
913 .await
914 .unwrap();
915 indexer
916 .sequential_pipeline::<D>(D, SequentialConfig::default())
917 .await
918 .unwrap();
919
920 assert_eq!(indexer.first_ingestion_checkpoint, 1);
921 }
922
923 #[tokio::test]
926 async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
927 let registry = Registry::new();
928 let store = MockStore::default();
929
930 test_pipeline!(A, "concurrent_a");
931 test_pipeline!(B, "concurrent_b");
932 test_pipeline!(C, "sequential_c");
933 test_pipeline!(D, "sequential_d");
934
935 let mut conn = store.connect().await.unwrap();
936 conn.set_committer_watermark(
937 B::NAME,
938 CommitterWatermark {
939 checkpoint_hi_inclusive: 50,
940 ..Default::default()
941 },
942 )
943 .await
944 .unwrap();
945 conn.set_committer_watermark(
946 C::NAME,
947 CommitterWatermark {
948 checkpoint_hi_inclusive: 10,
949 ..Default::default()
950 },
951 )
952 .await
953 .unwrap();
954
955 let indexer_args = IndexerArgs {
956 first_checkpoint: Some(5),
957 ..Default::default()
958 };
959 let temp_dir = tempfile::tempdir().unwrap();
960 let client_args = ClientArgs {
961 ingestion: IngestionClientArgs {
962 local_ingestion_path: Some(temp_dir.path().to_owned()),
963 ..Default::default()
964 },
965 ..Default::default()
966 };
967 let ingestion_config = IngestionConfig::default();
968
969 let mut indexer = Indexer::new(
970 store,
971 indexer_args,
972 client_args,
973 ingestion_config,
974 None,
975 ®istry,
976 )
977 .await
978 .unwrap();
979
980 indexer
981 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
982 .await
983 .unwrap();
984 indexer
985 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
986 .await
987 .unwrap();
988 indexer
989 .sequential_pipeline::<C>(C, SequentialConfig::default())
990 .await
991 .unwrap();
992 indexer
993 .sequential_pipeline::<D>(D, SequentialConfig::default())
994 .await
995 .unwrap();
996
997 assert_eq!(indexer.first_ingestion_checkpoint, 5);
998 }
999
1000 #[tokio::test]
1003 async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1004 let registry = Registry::new();
1005 let store = MockStore::default();
1006
1007 test_pipeline!(B, "concurrent_b");
1008 test_pipeline!(C, "sequential_c");
1009
1010 let mut conn = store.connect().await.unwrap();
1011 conn.set_committer_watermark(
1012 B::NAME,
1013 CommitterWatermark {
1014 checkpoint_hi_inclusive: 50,
1015 ..Default::default()
1016 },
1017 )
1018 .await
1019 .unwrap();
1020 conn.set_committer_watermark(
1021 C::NAME,
1022 CommitterWatermark {
1023 checkpoint_hi_inclusive: 10,
1024 ..Default::default()
1025 },
1026 )
1027 .await
1028 .unwrap();
1029
1030 let indexer_args = IndexerArgs {
1031 first_checkpoint: Some(5),
1032 ..Default::default()
1033 };
1034 let temp_dir = tempfile::tempdir().unwrap();
1035 let client_args = ClientArgs {
1036 ingestion: IngestionClientArgs {
1037 local_ingestion_path: Some(temp_dir.path().to_owned()),
1038 ..Default::default()
1039 },
1040 ..Default::default()
1041 };
1042 let ingestion_config = IngestionConfig::default();
1043
1044 let mut indexer = Indexer::new(
1045 store,
1046 indexer_args,
1047 client_args,
1048 ingestion_config,
1049 None,
1050 ®istry,
1051 )
1052 .await
1053 .unwrap();
1054
1055 indexer
1056 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1057 .await
1058 .unwrap();
1059 indexer
1060 .sequential_pipeline::<C>(C, SequentialConfig::default())
1061 .await
1062 .unwrap();
1063
1064 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1065 }
1066
1067 #[tokio::test]
1071 async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1072 let registry = Registry::new();
1073 let store = MockStore::default();
1074
1075 test_pipeline!(A, "concurrent_a");
1076 test_pipeline!(B, "concurrent_b");
1077 test_pipeline!(C, "sequential_c");
1078
1079 let mut conn = store.connect().await.unwrap();
1080 conn.set_committer_watermark(
1081 B::NAME,
1082 CommitterWatermark {
1083 checkpoint_hi_inclusive: 50,
1084 ..Default::default()
1085 },
1086 )
1087 .await
1088 .unwrap();
1089 conn.set_committer_watermark(
1090 C::NAME,
1091 CommitterWatermark {
1092 checkpoint_hi_inclusive: 10,
1093 ..Default::default()
1094 },
1095 )
1096 .await
1097 .unwrap();
1098
1099 let indexer_args = IndexerArgs {
1100 first_checkpoint: Some(24),
1101 ..Default::default()
1102 };
1103 let temp_dir = tempfile::tempdir().unwrap();
1104 let client_args = ClientArgs {
1105 ingestion: IngestionClientArgs {
1106 local_ingestion_path: Some(temp_dir.path().to_owned()),
1107 ..Default::default()
1108 },
1109 ..Default::default()
1110 };
1111 let ingestion_config = IngestionConfig::default();
1112
1113 let mut indexer = Indexer::new(
1114 store,
1115 indexer_args,
1116 client_args,
1117 ingestion_config,
1118 None,
1119 ®istry,
1120 )
1121 .await
1122 .unwrap();
1123
1124 indexer
1125 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1126 .await
1127 .unwrap();
1128
1129 indexer
1130 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1131 .await
1132 .unwrap();
1133
1134 indexer
1135 .sequential_pipeline::<C>(C, SequentialConfig::default())
1136 .await
1137 .unwrap();
1138
1139 assert_eq!(indexer.first_ingestion_checkpoint, 11);
1140 }
1141
1142 #[tokio::test]
1144 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1145 let registry = Registry::new();
1146 let store = MockStore::default();
1147
1148 test_pipeline!(A, "concurrent_a");
1149 test_pipeline!(B, "concurrent_b");
1150 test_pipeline!(C, "sequential_c");
1151 test_pipeline!(D, "sequential_d");
1152
1153 let mut conn = store.connect().await.unwrap();
1154 conn.set_committer_watermark(
1155 A::NAME,
1156 CommitterWatermark {
1157 checkpoint_hi_inclusive: 5,
1158 ..Default::default()
1159 },
1160 )
1161 .await
1162 .unwrap();
1163 conn.set_committer_watermark(
1164 B::NAME,
1165 CommitterWatermark {
1166 checkpoint_hi_inclusive: 10,
1167 ..Default::default()
1168 },
1169 )
1170 .await
1171 .unwrap();
1172 conn.set_committer_watermark(
1173 C::NAME,
1174 CommitterWatermark {
1175 checkpoint_hi_inclusive: 15,
1176 ..Default::default()
1177 },
1178 )
1179 .await
1180 .unwrap();
1181 conn.set_committer_watermark(
1182 D::NAME,
1183 CommitterWatermark {
1184 checkpoint_hi_inclusive: 20,
1185 ..Default::default()
1186 },
1187 )
1188 .await
1189 .unwrap();
1190
1191 let temp_dir = tempfile::tempdir().unwrap();
1193 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1194 ingestion_dir: temp_dir.path().to_owned(),
1195 starting_checkpoint: 5,
1196 num_checkpoints: 25,
1197 checkpoint_size: 1,
1198 })
1199 .await;
1200
1201 let indexer_args = IndexerArgs {
1202 last_checkpoint: Some(29),
1203 ..Default::default()
1204 };
1205
1206 let client_args = ClientArgs {
1207 ingestion: IngestionClientArgs {
1208 local_ingestion_path: Some(temp_dir.path().to_owned()),
1209 ..Default::default()
1210 },
1211 ..Default::default()
1212 };
1213
1214 let ingestion_config = IngestionConfig::default();
1215
1216 let mut indexer = Indexer::new(
1217 store.clone(),
1218 indexer_args,
1219 client_args,
1220 ingestion_config,
1221 None,
1222 ®istry,
1223 )
1224 .await
1225 .unwrap();
1226
1227 indexer
1228 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1229 .await
1230 .unwrap();
1231 indexer
1232 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1233 .await
1234 .unwrap();
1235 indexer
1236 .sequential_pipeline::<C>(C, SequentialConfig::default())
1237 .await
1238 .unwrap();
1239 indexer
1240 .sequential_pipeline::<D>(D, SequentialConfig::default())
1241 .await
1242 .unwrap();
1243
1244 let ingestion_metrics = indexer.ingestion_metrics().clone();
1245 let indexer_metrics = indexer.indexer_metrics().clone();
1246
1247 indexer.run().await.unwrap().join().await.unwrap();
1248
1249 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1250 assert_eq!(
1251 indexer_metrics
1252 .total_watermarks_out_of_order
1253 .get_metric_with_label_values(&[A::NAME])
1254 .unwrap()
1255 .get(),
1256 0
1257 );
1258 assert_eq!(
1259 indexer_metrics
1260 .total_watermarks_out_of_order
1261 .get_metric_with_label_values(&[B::NAME])
1262 .unwrap()
1263 .get(),
1264 5
1265 );
1266 assert_eq!(
1267 indexer_metrics
1268 .total_watermarks_out_of_order
1269 .get_metric_with_label_values(&[C::NAME])
1270 .unwrap()
1271 .get(),
1272 10
1273 );
1274 assert_eq!(
1275 indexer_metrics
1276 .total_watermarks_out_of_order
1277 .get_metric_with_label_values(&[D::NAME])
1278 .unwrap()
1279 .get(),
1280 15
1281 );
1282 }
1283
1284 #[tokio::test]
1286 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1287 let registry = Registry::new();
1288 let store = MockStore::default();
1289
1290 test_pipeline!(A, "concurrent_a");
1291 test_pipeline!(B, "concurrent_b");
1292 test_pipeline!(C, "sequential_c");
1293 test_pipeline!(D, "sequential_d");
1294
1295 let mut conn = store.connect().await.unwrap();
1296 conn.set_committer_watermark(
1297 A::NAME,
1298 CommitterWatermark {
1299 checkpoint_hi_inclusive: 5,
1300 ..Default::default()
1301 },
1302 )
1303 .await
1304 .unwrap();
1305 conn.set_committer_watermark(
1306 B::NAME,
1307 CommitterWatermark {
1308 checkpoint_hi_inclusive: 10,
1309 ..Default::default()
1310 },
1311 )
1312 .await
1313 .unwrap();
1314 conn.set_committer_watermark(
1315 C::NAME,
1316 CommitterWatermark {
1317 checkpoint_hi_inclusive: 15,
1318 ..Default::default()
1319 },
1320 )
1321 .await
1322 .unwrap();
1323 conn.set_committer_watermark(
1324 D::NAME,
1325 CommitterWatermark {
1326 checkpoint_hi_inclusive: 20,
1327 ..Default::default()
1328 },
1329 )
1330 .await
1331 .unwrap();
1332
1333 let temp_dir = tempfile::tempdir().unwrap();
1335 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1336 ingestion_dir: temp_dir.path().to_owned(),
1337 starting_checkpoint: 5,
1338 num_checkpoints: 25,
1339 checkpoint_size: 1,
1340 })
1341 .await;
1342
1343 let indexer_args = IndexerArgs {
1344 first_checkpoint: Some(3),
1345 last_checkpoint: Some(29),
1346 ..Default::default()
1347 };
1348
1349 let client_args = ClientArgs {
1350 ingestion: IngestionClientArgs {
1351 local_ingestion_path: Some(temp_dir.path().to_owned()),
1352 ..Default::default()
1353 },
1354 ..Default::default()
1355 };
1356
1357 let ingestion_config = IngestionConfig::default();
1358
1359 let mut indexer = Indexer::new(
1360 store.clone(),
1361 indexer_args,
1362 client_args,
1363 ingestion_config,
1364 None,
1365 ®istry,
1366 )
1367 .await
1368 .unwrap();
1369
1370 indexer
1371 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1372 .await
1373 .unwrap();
1374 indexer
1375 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1376 .await
1377 .unwrap();
1378 indexer
1379 .sequential_pipeline::<C>(C, SequentialConfig::default())
1380 .await
1381 .unwrap();
1382 indexer
1383 .sequential_pipeline::<D>(D, SequentialConfig::default())
1384 .await
1385 .unwrap();
1386
1387 let ingestion_metrics = indexer.ingestion_metrics().clone();
1388 let metrics = indexer.indexer_metrics().clone();
1389 indexer.run().await.unwrap().join().await.unwrap();
1390
1391 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1392 assert_eq!(
1393 metrics
1394 .total_watermarks_out_of_order
1395 .get_metric_with_label_values(&[A::NAME])
1396 .unwrap()
1397 .get(),
1398 0
1399 );
1400 assert_eq!(
1401 metrics
1402 .total_watermarks_out_of_order
1403 .get_metric_with_label_values(&[B::NAME])
1404 .unwrap()
1405 .get(),
1406 5
1407 );
1408 assert_eq!(
1409 metrics
1410 .total_watermarks_out_of_order
1411 .get_metric_with_label_values(&[C::NAME])
1412 .unwrap()
1413 .get(),
1414 10
1415 );
1416 assert_eq!(
1417 metrics
1418 .total_watermarks_out_of_order
1419 .get_metric_with_label_values(&[D::NAME])
1420 .unwrap()
1421 .get(),
1422 15
1423 );
1424 }
1425
1426 #[tokio::test]
1428 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1429 let registry = Registry::new();
1430 let store = MockStore::default();
1431
1432 test_pipeline!(A, "concurrent_a");
1433 test_pipeline!(B, "concurrent_b");
1434 test_pipeline!(C, "sequential_c");
1435 test_pipeline!(D, "sequential_d");
1436
1437 let mut conn = store.connect().await.unwrap();
1438 conn.set_committer_watermark(
1439 B::NAME,
1440 CommitterWatermark {
1441 checkpoint_hi_inclusive: 10,
1442 ..Default::default()
1443 },
1444 )
1445 .await
1446 .unwrap();
1447 conn.set_committer_watermark(
1448 C::NAME,
1449 CommitterWatermark {
1450 checkpoint_hi_inclusive: 15,
1451 ..Default::default()
1452 },
1453 )
1454 .await
1455 .unwrap();
1456 conn.set_committer_watermark(
1457 D::NAME,
1458 CommitterWatermark {
1459 checkpoint_hi_inclusive: 20,
1460 ..Default::default()
1461 },
1462 )
1463 .await
1464 .unwrap();
1465
1466 let temp_dir = tempfile::tempdir().unwrap();
1468 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1469 ingestion_dir: temp_dir.path().to_owned(),
1470 starting_checkpoint: 0,
1471 num_checkpoints: 30,
1472 checkpoint_size: 1,
1473 })
1474 .await;
1475
1476 let indexer_args = IndexerArgs {
1477 last_checkpoint: Some(29),
1478 ..Default::default()
1479 };
1480
1481 let client_args = ClientArgs {
1482 ingestion: IngestionClientArgs {
1483 local_ingestion_path: Some(temp_dir.path().to_owned()),
1484 ..Default::default()
1485 },
1486 ..Default::default()
1487 };
1488
1489 let ingestion_config = IngestionConfig::default();
1490
1491 let mut indexer = Indexer::new(
1492 store.clone(),
1493 indexer_args,
1494 client_args,
1495 ingestion_config,
1496 None,
1497 ®istry,
1498 )
1499 .await
1500 .unwrap();
1501
1502 indexer
1503 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1504 .await
1505 .unwrap();
1506 indexer
1507 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1508 .await
1509 .unwrap();
1510 indexer
1511 .sequential_pipeline::<C>(C, SequentialConfig::default())
1512 .await
1513 .unwrap();
1514 indexer
1515 .sequential_pipeline::<D>(D, SequentialConfig::default())
1516 .await
1517 .unwrap();
1518
1519 let ingestion_metrics = indexer.ingestion_metrics().clone();
1520 let metrics = indexer.indexer_metrics().clone();
1521 indexer.run().await.unwrap().join().await.unwrap();
1522
1523 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1524 assert_eq!(
1525 metrics
1526 .total_watermarks_out_of_order
1527 .get_metric_with_label_values(&[A::NAME])
1528 .unwrap()
1529 .get(),
1530 0
1531 );
1532 assert_eq!(
1533 metrics
1534 .total_watermarks_out_of_order
1535 .get_metric_with_label_values(&[B::NAME])
1536 .unwrap()
1537 .get(),
1538 11
1539 );
1540 assert_eq!(
1541 metrics
1542 .total_watermarks_out_of_order
1543 .get_metric_with_label_values(&[C::NAME])
1544 .unwrap()
1545 .get(),
1546 16
1547 );
1548 assert_eq!(
1549 metrics
1550 .total_watermarks_out_of_order
1551 .get_metric_with_label_values(&[D::NAME])
1552 .unwrap()
1553 .get(),
1554 21
1555 );
1556 }
1557
1558 #[tokio::test]
1560 async fn test_indexer_ingestion_use_first_checkpoint() {
1561 let registry = Registry::new();
1562 let store = MockStore::default();
1563
1564 test_pipeline!(A, "concurrent_a");
1565 test_pipeline!(B, "concurrent_b");
1566 test_pipeline!(C, "sequential_c");
1567 test_pipeline!(D, "sequential_d");
1568
1569 let mut conn = store.connect().await.unwrap();
1570 conn.set_committer_watermark(
1571 B::NAME,
1572 CommitterWatermark {
1573 checkpoint_hi_inclusive: 10,
1574 ..Default::default()
1575 },
1576 )
1577 .await
1578 .unwrap();
1579 conn.set_committer_watermark(
1580 C::NAME,
1581 CommitterWatermark {
1582 checkpoint_hi_inclusive: 15,
1583 ..Default::default()
1584 },
1585 )
1586 .await
1587 .unwrap();
1588 conn.set_committer_watermark(
1589 D::NAME,
1590 CommitterWatermark {
1591 checkpoint_hi_inclusive: 20,
1592 ..Default::default()
1593 },
1594 )
1595 .await
1596 .unwrap();
1597
1598 let temp_dir = tempfile::tempdir().unwrap();
1600 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1601 ingestion_dir: temp_dir.path().to_owned(),
1602 starting_checkpoint: 5,
1603 num_checkpoints: 25,
1604 checkpoint_size: 1,
1605 })
1606 .await;
1607
1608 let indexer_args = IndexerArgs {
1609 first_checkpoint: Some(10),
1610 last_checkpoint: Some(29),
1611 ..Default::default()
1612 };
1613
1614 let client_args = ClientArgs {
1615 ingestion: IngestionClientArgs {
1616 local_ingestion_path: Some(temp_dir.path().to_owned()),
1617 ..Default::default()
1618 },
1619 ..Default::default()
1620 };
1621
1622 let ingestion_config = IngestionConfig::default();
1623
1624 let mut indexer = Indexer::new(
1625 store.clone(),
1626 indexer_args,
1627 client_args,
1628 ingestion_config,
1629 None,
1630 ®istry,
1631 )
1632 .await
1633 .unwrap();
1634
1635 indexer
1636 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1637 .await
1638 .unwrap();
1639 indexer
1640 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1641 .await
1642 .unwrap();
1643 indexer
1644 .sequential_pipeline::<C>(C, SequentialConfig::default())
1645 .await
1646 .unwrap();
1647 indexer
1648 .sequential_pipeline::<D>(D, SequentialConfig::default())
1649 .await
1650 .unwrap();
1651
1652 let ingestion_metrics = indexer.ingestion_metrics().clone();
1653 let metrics = indexer.indexer_metrics().clone();
1654 indexer.run().await.unwrap().join().await.unwrap();
1655
1656 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1657 assert_eq!(
1658 metrics
1659 .total_watermarks_out_of_order
1660 .get_metric_with_label_values(&[A::NAME])
1661 .unwrap()
1662 .get(),
1663 0
1664 );
1665 assert_eq!(
1666 metrics
1667 .total_watermarks_out_of_order
1668 .get_metric_with_label_values(&[B::NAME])
1669 .unwrap()
1670 .get(),
1671 1
1672 );
1673 assert_eq!(
1674 metrics
1675 .total_watermarks_out_of_order
1676 .get_metric_with_label_values(&[C::NAME])
1677 .unwrap()
1678 .get(),
1679 6
1680 );
1681 assert_eq!(
1682 metrics
1683 .total_watermarks_out_of_order
1684 .get_metric_with_label_values(&[D::NAME])
1685 .unwrap()
1686 .get(),
1687 11
1688 );
1689 }
1690
1691 #[tokio::test]
1692 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1693 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1694 assert_eq!(committer_watermark, None);
1696 assert_eq!(pruner_watermark, None);
1697 }
1698
1699 #[tokio::test]
1700 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1701 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1702 assert_eq!(committer_watermark, None);
1704 assert_eq!(pruner_watermark, None);
1705 }
1706
1707 #[tokio::test]
1708 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1709 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1710
1711 let committer_watermark = committer_watermark.unwrap();
1712 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1713
1714 let pruner_watermark = pruner_watermark.unwrap();
1715 assert_eq!(pruner_watermark.reader_lo, 1);
1716 assert_eq!(pruner_watermark.pruner_hi, 1);
1717 }
1718
1719 #[tokio::test]
1720 async fn test_init_watermark_sequential() {
1721 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1722
1723 let committer_watermark = committer_watermark.unwrap();
1724 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1725
1726 let pruner_watermark = pruner_watermark.unwrap();
1727 assert_eq!(pruner_watermark.reader_lo, 1);
1728 assert_eq!(pruner_watermark.pruner_hi, 1);
1729 }
1730
1731 #[tokio::test]
1732 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1733 let registry = Registry::new();
1734 let store = MockStore::default();
1735
1736 let mut conn = store.connect().await.unwrap();
1738
1739 conn.set_committer_watermark(
1741 MockHandler::NAME,
1742 CommitterWatermark {
1743 epoch_hi_inclusive: 0,
1744 checkpoint_hi_inclusive: 10,
1745 tx_hi: 20,
1746 timestamp_ms_hi_inclusive: 10000,
1747 },
1748 )
1749 .await
1750 .unwrap();
1751
1752 conn.set_committer_watermark(
1754 SequentialHandler::NAME,
1755 CommitterWatermark {
1756 epoch_hi_inclusive: 0,
1757 checkpoint_hi_inclusive: 5,
1758 tx_hi: 10,
1759 timestamp_ms_hi_inclusive: 5000,
1760 },
1761 )
1762 .await
1763 .unwrap();
1764
1765 let temp_dir = tempfile::tempdir().unwrap();
1767 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1768 ingestion_dir: temp_dir.path().to_owned(),
1769 starting_checkpoint: 0,
1770 num_checkpoints: 20,
1771 checkpoint_size: 2,
1772 })
1773 .await;
1774
1775 let indexer_args = IndexerArgs {
1776 first_checkpoint: None,
1777 last_checkpoint: Some(19),
1778 pipeline: vec![],
1779 ..Default::default()
1780 };
1781
1782 let client_args = ClientArgs {
1783 ingestion: IngestionClientArgs {
1784 local_ingestion_path: Some(temp_dir.path().to_owned()),
1785 ..Default::default()
1786 },
1787 ..Default::default()
1788 };
1789
1790 let ingestion_config = IngestionConfig::default();
1791
1792 let mut indexer = Indexer::new(
1793 store.clone(),
1794 indexer_args,
1795 client_args,
1796 ingestion_config,
1797 None,
1798 ®istry,
1799 )
1800 .await
1801 .unwrap();
1802
1803 indexer
1805 .sequential_pipeline(
1806 MockHandler,
1807 pipeline::sequential::SequentialConfig::default(),
1808 )
1809 .await
1810 .unwrap();
1811
1812 assert_eq!(
1814 indexer.next_sequential_checkpoint(),
1815 Some(11),
1816 "next_sequential_checkpoint should be 11"
1817 );
1818
1819 indexer
1821 .sequential_pipeline(
1822 SequentialHandler,
1823 pipeline::sequential::SequentialConfig::default(),
1824 )
1825 .await
1826 .unwrap();
1827
1828 assert_eq!(
1830 indexer.next_sequential_checkpoint(),
1831 Some(6),
1832 "next_sequential_checkpoint should still be 6"
1833 );
1834
1835 indexer.run().await.unwrap().join().await.unwrap();
1837
1838 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1840 let watermark2 = conn
1841 .committer_watermark(SequentialHandler::NAME)
1842 .await
1843 .unwrap();
1844
1845 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1846 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1847 }
1848
1849 #[tokio::test]
1853 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1854 let registry = Registry::new();
1855 let store = MockStore::default();
1856
1857 let mut conn = store.connect().await.unwrap();
1860 conn.set_committer_watermark(
1861 MockCheckpointSequenceNumberHandler::NAME,
1862 CommitterWatermark {
1863 checkpoint_hi_inclusive: 10,
1864 ..Default::default()
1865 },
1866 )
1867 .await
1868 .unwrap();
1869 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1870 .await
1871 .unwrap();
1872
1873 let indexer_args = IndexerArgs {
1876 first_checkpoint: Some(0),
1877 last_checkpoint: Some(15),
1878 pipeline: vec![],
1879 task: TaskArgs::tasked("task".to_string(), 10),
1880 };
1881 let temp_dir = tempfile::tempdir().unwrap();
1882 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1883 ingestion_dir: temp_dir.path().to_owned(),
1884 starting_checkpoint: 0,
1885 num_checkpoints: 16,
1886 checkpoint_size: 2,
1887 })
1888 .await;
1889
1890 let client_args = ClientArgs {
1891 ingestion: IngestionClientArgs {
1892 local_ingestion_path: Some(temp_dir.path().to_owned()),
1893 ..Default::default()
1894 },
1895 ..Default::default()
1896 };
1897
1898 let ingestion_config = IngestionConfig::default();
1899
1900 let mut tasked_indexer = Indexer::new(
1901 store.clone(),
1902 indexer_args,
1903 client_args,
1904 ingestion_config,
1905 None,
1906 ®istry,
1907 )
1908 .await
1909 .unwrap();
1910
1911 let _ = tasked_indexer
1912 .concurrent_pipeline(
1913 MockCheckpointSequenceNumberHandler,
1914 ConcurrentConfig::default(),
1915 )
1916 .await;
1917
1918 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1919 let metrics = tasked_indexer.indexer_metrics().clone();
1920
1921 tasked_indexer.run().await.unwrap().join().await.unwrap();
1922
1923 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1924 assert_eq!(
1925 metrics
1926 .total_collector_skipped_checkpoints
1927 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1928 .unwrap()
1929 .get(),
1930 7
1931 );
1932 let data = store
1933 .data
1934 .get(MockCheckpointSequenceNumberHandler::NAME)
1935 .unwrap();
1936 assert_eq!(data.len(), 9);
1937 for i in 0..7 {
1938 assert!(data.get(&i).is_none());
1939 }
1940 for i in 7..16 {
1941 assert!(data.get(&i).is_some());
1942 }
1943 }
1944
1945 #[tokio::test]
1947 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1948 let registry = Registry::new();
1949 let store = MockStore::default();
1950
1951 let mut conn = store.connect().await.unwrap();
1952 conn.set_committer_watermark(
1953 "test",
1954 CommitterWatermark {
1955 checkpoint_hi_inclusive: 10,
1956 ..Default::default()
1957 },
1958 )
1959 .await
1960 .unwrap();
1961 conn.set_reader_watermark("test", 5).await.unwrap();
1962
1963 let indexer_args = IndexerArgs {
1966 first_checkpoint: Some(9),
1967 last_checkpoint: Some(25),
1968 pipeline: vec![],
1969 task: TaskArgs::tasked("task".to_string(), 10),
1970 };
1971 let temp_dir = tempfile::tempdir().unwrap();
1972 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1973 ingestion_dir: temp_dir.path().to_owned(),
1974 starting_checkpoint: 9,
1975 num_checkpoints: 17,
1976 checkpoint_size: 2,
1977 })
1978 .await;
1979
1980 let client_args = ClientArgs {
1981 ingestion: IngestionClientArgs {
1982 local_ingestion_path: Some(temp_dir.path().to_owned()),
1983 ..Default::default()
1984 },
1985 ..Default::default()
1986 };
1987
1988 let ingestion_config = IngestionConfig::default();
1989
1990 let mut tasked_indexer = Indexer::new(
1991 store.clone(),
1992 indexer_args,
1993 client_args,
1994 ingestion_config,
1995 None,
1996 ®istry,
1997 )
1998 .await
1999 .unwrap();
2000
2001 let _ = tasked_indexer
2002 .concurrent_pipeline(
2003 MockCheckpointSequenceNumberHandler,
2004 ConcurrentConfig::default(),
2005 )
2006 .await;
2007
2008 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2009 let metrics = tasked_indexer.indexer_metrics().clone();
2010
2011 tasked_indexer.run().await.unwrap().join().await.unwrap();
2012
2013 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2014 assert_eq!(
2015 metrics
2016 .total_watermarks_out_of_order
2017 .get_metric_with_label_values(&["test"])
2018 .unwrap()
2019 .get(),
2020 0
2021 );
2022 assert_eq!(
2023 metrics
2024 .total_collector_skipped_checkpoints
2025 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2026 .unwrap()
2027 .get(),
2028 0
2029 );
2030
2031 let data = store.data.get("test").unwrap();
2032 assert!(data.len() == 17);
2033 for i in 0..9 {
2034 assert!(data.get(&i).is_none());
2035 }
2036 for i in 9..26 {
2037 assert!(data.get(&i).is_some());
2038 }
2039 let main_pipeline_watermark = store.watermark("test").unwrap();
2040 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2042 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2043 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2044 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2045 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2046 }
2047
2048 #[tokio::test]
2051 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2052 let registry = Registry::new();
2053 let store = MockStore::default();
2054 let mut conn = store.connect().await.unwrap();
2055 conn.set_committer_watermark(
2057 ControllableHandler::NAME,
2058 CommitterWatermark {
2059 checkpoint_hi_inclusive: 11,
2060 ..Default::default()
2061 },
2062 )
2063 .await
2064 .unwrap();
2065
2066 let temp_dir = tempfile::tempdir().unwrap();
2068 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2069 ingestion_dir: temp_dir.path().to_owned(),
2070 starting_checkpoint: 0,
2071 num_checkpoints: 501,
2072 checkpoint_size: 2,
2073 })
2074 .await;
2075 let indexer_args = IndexerArgs {
2076 first_checkpoint: Some(0),
2077 last_checkpoint: Some(500),
2078 pipeline: vec![],
2079 task: TaskArgs::tasked("task".to_string(), 10 ),
2080 };
2081 let client_args = ClientArgs {
2082 ingestion: IngestionClientArgs {
2083 local_ingestion_path: Some(temp_dir.path().to_owned()),
2084 ..Default::default()
2085 },
2086 ..Default::default()
2087 };
2088 let ingestion_config = IngestionConfig::default();
2089 let mut tasked_indexer = Indexer::new(
2090 store.clone(),
2091 indexer_args,
2092 client_args,
2093 ingestion_config,
2094 None,
2095 ®istry,
2096 )
2097 .await
2098 .unwrap();
2099 let mut allow_process = 10;
2100 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2102 let _ = tasked_indexer
2103 .concurrent_pipeline(
2104 controllable_handler,
2105 ConcurrentConfig {
2106 committer: CommitterConfig {
2107 collect_interval_ms: 10,
2108 watermark_interval_ms: 10,
2109 ..Default::default()
2110 },
2111 ..Default::default()
2112 },
2113 )
2114 .await;
2115 let metrics = tasked_indexer.indexer_metrics().clone();
2116
2117 let mut s_indexer = tasked_indexer.run().await.unwrap();
2118
2119 store
2123 .wait_for_watermark(
2124 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2125 10,
2126 std::time::Duration::from_secs(10),
2127 )
2128 .await;
2129
2130 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2135 .await
2136 .unwrap();
2137
2138 let reader_lo = metrics
2139 .collector_reader_lo
2140 .with_label_values(&[ControllableHandler::NAME]);
2141
2142 let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2146 while reader_lo.get() != 250 {
2147 interval.tick().await;
2148 allow_process += 1;
2150 assert!(
2151 allow_process <= 500,
2152 "Released all checkpoints but collector never observed new reader_lo"
2153 );
2154 process_below.send(allow_process).ok();
2155 }
2156
2157 process_below.send(500).ok();
2164
2165 s_indexer.join().await.unwrap();
2166
2167 let data = store.data.get(ControllableHandler::NAME).unwrap();
2168
2169 for chkpt in (allow_process + 1)..250 {
2171 assert!(
2172 data.get(&chkpt).is_none(),
2173 "Checkpoint {chkpt} should have been skipped"
2174 );
2175 }
2176
2177 for chkpt in 250..=500 {
2179 assert!(
2180 data.get(&chkpt).is_some(),
2181 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2182 );
2183 }
2184
2185 for chkpt in 0..=10 {
2187 assert!(
2188 data.get(&chkpt).is_some(),
2189 "Checkpoint {chkpt} should have been committed (baseline)"
2190 );
2191 }
2192 }
2193}