1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::Connection;
19use sui_indexer_alt_framework_store_traits::InitWatermark;
20use sui_indexer_alt_framework_store_traits::Store;
21use sui_indexer_alt_framework_store_traits::TransactionalStore;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::Handler;
30use crate::pipeline::sequential::SequentialConfig;
31use crate::pipeline::sequential::{self};
32use crate::service::Service;
33
34pub use anyhow::Result;
35pub use sui_field_count::FieldCount;
36pub use sui_futures::service;
37pub use sui_indexer_alt_framework_store_traits as store;
39pub use sui_types as types;
40
41#[cfg(feature = "cluster")]
42pub mod cluster;
43pub mod config;
44pub mod ingestion;
45pub mod metrics;
46pub mod pipeline;
47#[cfg(feature = "postgres")]
48pub mod postgres;
49
50#[cfg(test)]
51pub mod mocks;
52
53#[derive(clap::Args, Default, Debug, Clone)]
55pub struct IndexerArgs {
56 #[arg(long)]
63 pub first_checkpoint: Option<u64>,
64
65 #[arg(long)]
68 pub last_checkpoint: Option<u64>,
69
70 #[arg(long, action = clap::ArgAction::Append)]
73 pub pipeline: Vec<String>,
74
75 #[clap(flatten)]
77 pub task: TaskArgs,
78}
79
80#[derive(clap::Parser, Default, Debug, Clone)]
82pub struct TaskArgs {
83 #[arg(long, requires = "reader_interval_ms")]
95 task: Option<String>,
96
97 #[arg(long, requires = "task")]
108 reader_interval_ms: Option<u64>,
109}
110
111pub struct Indexer<S: Store> {
112 store: S,
116
117 metrics: Arc<IndexerMetrics>,
119
120 ingestion_service: IngestionService,
122
123 first_checkpoint: Option<u64>,
126
127 last_checkpoint: Option<u64>,
130
131 next_checkpoint: u64,
134
135 next_sequential_checkpoint: Option<u64>,
138
139 task: Option<Task>,
151
152 enabled_pipelines: Option<BTreeSet<String>>,
156
157 added_pipelines: BTreeSet<&'static str>,
160
161 pipelines: Vec<Service>,
163}
164
165#[derive(Clone)]
167pub(crate) struct Task {
168 task: String,
171 reader_interval: Duration,
174}
175
176impl TaskArgs {
177 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
178 Self {
179 task: Some(task),
180 reader_interval_ms: Some(reader_interval_ms),
181 }
182 }
183
184 fn into_task(self) -> Option<Task> {
185 Some(Task {
186 task: self.task?,
187 reader_interval: Duration::from_millis(self.reader_interval_ms?),
188 })
189 }
190}
191
192impl<S: Store> Indexer<S> {
193 pub async fn new(
205 store: S,
206 indexer_args: IndexerArgs,
207 client_args: ClientArgs,
208 ingestion_config: IngestionConfig,
209 metrics_prefix: Option<&str>,
210 registry: &Registry,
211 ) -> Result<Self> {
212 let IndexerArgs {
213 first_checkpoint,
214 last_checkpoint,
215 pipeline,
216 task,
217 } = indexer_args;
218
219 let metrics = IndexerMetrics::new(metrics_prefix, registry);
220
221 let ingestion_service =
222 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
223
224 Ok(Self {
225 store,
226 metrics,
227 ingestion_service,
228 first_checkpoint,
229 last_checkpoint,
230 next_checkpoint: u64::MAX,
231 next_sequential_checkpoint: None,
232 task: task.into_task(),
233 enabled_pipelines: if pipeline.is_empty() {
234 None
235 } else {
236 Some(pipeline.into_iter().collect())
237 },
238 added_pipelines: BTreeSet::new(),
239 pipelines: vec![],
240 })
241 }
242
243 pub fn store(&self) -> &S {
245 &self.store
246 }
247
248 pub fn ingestion_client(&self) -> &IngestionClient {
250 self.ingestion_service.ingestion_client()
251 }
252
253 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
255 &self.metrics
256 }
257
258 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
260 self.ingestion_service.metrics()
261 }
262
263 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
265 self.added_pipelines.iter().copied().filter(|p| {
266 self.enabled_pipelines
267 .as_ref()
268 .is_none_or(|e| e.contains(*p))
269 })
270 }
271
272 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
276 self.next_sequential_checkpoint
277 }
278
279 pub async fn concurrent_pipeline<H>(
286 &mut self,
287 handler: H,
288 config: ConcurrentConfig,
289 ) -> Result<()>
290 where
291 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
292 {
293 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
294 return Ok(());
295 };
296
297 self.pipelines.push(concurrent::pipeline::<H>(
298 handler,
299 next_checkpoint,
300 config,
301 self.store.clone(),
302 self.task.clone(),
303 self.ingestion_service.subscribe().0,
304 self.metrics.clone(),
305 ));
306
307 Ok(())
308 }
309
310 pub async fn run(self) -> anyhow::Result<Service> {
316 if let Some(enabled_pipelines) = self.enabled_pipelines {
317 ensure!(
318 enabled_pipelines.is_empty(),
319 "Tried to enable pipelines that this indexer does not know about: \
320 {enabled_pipelines:#?}",
321 );
322 }
323
324 let start = self.next_checkpoint;
325 let end = self.last_checkpoint;
326 info!(start, end, "Ingestion range");
327
328 let mut service = self
329 .ingestion_service
330 .run(
331 (
332 Bound::Included(start),
333 end.map_or(Bound::Unbounded, Bound::Included),
334 ),
335 self.next_sequential_checkpoint,
336 )
337 .await
338 .context("Failed to start ingestion service")?;
339
340 for pipeline in self.pipelines {
341 service = service.merge(pipeline);
342 }
343
344 Ok(service)
345 }
346
347 async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
356 ensure!(
357 self.added_pipelines.insert(P::NAME),
358 "Pipeline {:?} already added",
359 P::NAME,
360 );
361
362 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
363 && !enabled_pipelines.remove(P::NAME)
364 {
365 info!(pipeline = P::NAME, "Skipping");
366 return Ok(None);
367 }
368
369 let mut conn = self
370 .store
371 .connect()
372 .await
373 .context("Failed to establish connection to store")?;
374
375 let pipeline_task =
376 pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
377
378 let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
381 let InitWatermark {
382 checkpoint_hi_inclusive,
383 reader_lo,
384 } = conn
385 .init_watermark(
386 &pipeline_task,
387 InitWatermark {
388 checkpoint_hi_inclusive: proposed_next_checkpoint.checked_sub(1),
389 reader_lo: proposed_next_checkpoint,
390 },
391 )
392 .await
393 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
394
395 let next_checkpoint = checkpoint_hi_inclusive.map_or(reader_lo, |c| c + 1);
396
397 self.next_checkpoint = self.next_checkpoint.min(next_checkpoint);
398
399 Ok(Some(next_checkpoint))
400 }
401}
402
403impl<T: TransactionalStore> Indexer<T> {
404 pub async fn sequential_pipeline<H>(
415 &mut self,
416 handler: H,
417 config: SequentialConfig,
418 ) -> Result<()>
419 where
420 H: Handler<Store = T> + Send + Sync + 'static,
421 {
422 if self.task.is_some() {
423 bail!(
424 "Sequential pipelines do not support pipeline tasks. \
425 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
426 Running the same pipeline under a different task would violate these guarantees."
427 );
428 }
429
430 let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
431 return Ok(());
432 };
433
434 self.next_sequential_checkpoint = Some(
436 self.next_sequential_checkpoint
437 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
438 );
439
440 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
441
442 self.pipelines.push(sequential::pipeline::<H>(
443 handler,
444 next_checkpoint,
445 config,
446 self.store.clone(),
447 checkpoint_rx,
448 commit_hi_tx,
449 self.metrics.clone(),
450 ));
451
452 Ok(())
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::sync::Arc;
459
460 use async_trait::async_trait;
461 use clap::Parser;
462 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
463 use sui_synthetic_ingestion::synthetic_ingestion;
464 use tokio::sync::watch;
465
466 use crate::FieldCount;
467 use crate::config::ConcurrencyConfig;
468 use crate::ingestion::ingestion_client::IngestionClientArgs;
469 use crate::mocks::store::MockStore;
470 use crate::pipeline::CommitterConfig;
471 use crate::pipeline::Processor;
472 use crate::pipeline::concurrent::ConcurrentConfig;
473 use crate::store::CommitterWatermark;
474
475 use super::*;
476
477 #[allow(dead_code)]
478 #[derive(Clone, FieldCount)]
479 struct MockValue(u64);
480
481 struct ControllableHandler {
483 process_below: watch::Receiver<u64>,
485 }
486
487 impl ControllableHandler {
488 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
489 let (tx, rx) = watch::channel(limit);
490 (Self { process_below: rx }, tx)
491 }
492 }
493
494 #[async_trait]
495 impl Processor for ControllableHandler {
496 const NAME: &'static str = "controllable";
497 type Value = MockValue;
498
499 async fn process(
500 &self,
501 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
502 ) -> anyhow::Result<Vec<Self::Value>> {
503 let cp_num = checkpoint.summary.sequence_number;
504
505 self.process_below
507 .clone()
508 .wait_for(|&limit| cp_num <= limit)
509 .await
510 .ok();
511
512 Ok(vec![MockValue(cp_num)])
513 }
514 }
515
516 #[async_trait]
517 impl concurrent::Handler for ControllableHandler {
518 type Store = MockStore;
519 type Batch = Vec<MockValue>;
520
521 fn batch(
522 &self,
523 batch: &mut Self::Batch,
524 values: &mut std::vec::IntoIter<Self::Value>,
525 ) -> concurrent::BatchStatus {
526 batch.extend(values);
527 concurrent::BatchStatus::Ready
528 }
529
530 async fn commit<'a>(
531 &self,
532 batch: &Self::Batch,
533 conn: &mut <Self::Store as Store>::Connection<'a>,
534 ) -> anyhow::Result<usize> {
535 for value in batch {
536 conn.0
537 .commit_data(Self::NAME, value.0, vec![value.0])
538 .await?;
539 }
540 Ok(batch.len())
541 }
542 }
543
544 macro_rules! test_pipeline {
545 ($handler:ident, $name:literal) => {
546 struct $handler;
547
548 #[async_trait]
549 impl Processor for $handler {
550 const NAME: &'static str = $name;
551 type Value = MockValue;
552 async fn process(
553 &self,
554 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
555 ) -> anyhow::Result<Vec<Self::Value>> {
556 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
557 }
558 }
559
560 #[async_trait]
561 impl crate::pipeline::concurrent::Handler for $handler {
562 type Store = MockStore;
563 type Batch = Vec<Self::Value>;
564
565 fn batch(
566 &self,
567 batch: &mut Self::Batch,
568 values: &mut std::vec::IntoIter<Self::Value>,
569 ) -> crate::pipeline::concurrent::BatchStatus {
570 batch.extend(values);
571 crate::pipeline::concurrent::BatchStatus::Pending
572 }
573
574 async fn commit<'a>(
575 &self,
576 batch: &Self::Batch,
577 conn: &mut <Self::Store as Store>::Connection<'a>,
578 ) -> anyhow::Result<usize> {
579 for value in batch {
580 conn.0
581 .commit_data(Self::NAME, value.0, vec![value.0])
582 .await?;
583 }
584 Ok(batch.len())
585 }
586 }
587
588 #[async_trait]
589 impl crate::pipeline::sequential::Handler for $handler {
590 type Store = MockStore;
591 type Batch = Vec<Self::Value>;
592
593 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
594 batch.extend(values);
595 }
596
597 async fn commit<'a>(
598 &self,
599 _batch: &Self::Batch,
600 _conn: &mut <Self::Store as Store>::Connection<'a>,
601 ) -> anyhow::Result<usize> {
602 Ok(1)
603 }
604 }
605 };
606 }
607
608 test_pipeline!(MockHandler, "test_processor");
609 test_pipeline!(SequentialHandler, "sequential_handler");
610 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
611
612 async fn test_init_watermark(
613 first_checkpoint: Option<u64>,
614 is_concurrent: bool,
615 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
616 let registry = Registry::new();
617 let store = MockStore::default();
618
619 test_pipeline!(A, "pipeline_name");
620
621 let mut conn = store.connect().await.unwrap();
622
623 let indexer_args = IndexerArgs {
624 first_checkpoint,
625 ..IndexerArgs::default()
626 };
627 let temp_dir = tempfile::tempdir().unwrap();
628 let client_args = ClientArgs {
629 ingestion: IngestionClientArgs {
630 local_ingestion_path: Some(temp_dir.path().to_owned()),
631 ..Default::default()
632 },
633 ..Default::default()
634 };
635 let ingestion_config = IngestionConfig::default();
636
637 let mut indexer = Indexer::new(
638 store.clone(),
639 indexer_args,
640 client_args,
641 ingestion_config,
642 None,
643 ®istry,
644 )
645 .await
646 .unwrap();
647
648 if is_concurrent {
649 indexer
650 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
651 .await
652 .unwrap();
653 } else {
654 indexer
655 .sequential_pipeline::<A>(A, SequentialConfig::default())
656 .await
657 .unwrap();
658 }
659
660 (
661 conn.committer_watermark(A::NAME).await.unwrap(),
662 conn.pruner_watermark(A::NAME, Duration::ZERO)
663 .await
664 .unwrap(),
665 )
666 }
667
668 #[test]
669 fn test_arg_parsing() {
670 #[derive(Parser)]
671 struct Args {
672 #[clap(flatten)]
673 indexer: IndexerArgs,
674 }
675
676 let args = Args::try_parse_from([
677 "cmd",
678 "--first-checkpoint",
679 "10",
680 "--last-checkpoint",
681 "100",
682 "--pipeline",
683 "a",
684 "--pipeline",
685 "b",
686 "--task",
687 "t",
688 "--reader-interval-ms",
689 "5000",
690 ])
691 .unwrap();
692
693 assert_eq!(args.indexer.first_checkpoint, Some(10));
694 assert_eq!(args.indexer.last_checkpoint, Some(100));
695 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
696 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
697 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
698 }
699
700 #[tokio::test]
702 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
703 let registry = Registry::new();
704 let store = MockStore::default();
705
706 test_pipeline!(A, "concurrent_a");
707 test_pipeline!(B, "concurrent_b");
708 test_pipeline!(C, "sequential_c");
709 test_pipeline!(D, "sequential_d");
710
711 let mut conn = store.connect().await.unwrap();
712
713 conn.init_watermark(A::NAME, InitWatermark::default())
714 .await
715 .unwrap();
716 conn.set_committer_watermark(
717 A::NAME,
718 CommitterWatermark {
719 checkpoint_hi_inclusive: 100,
720 ..Default::default()
721 },
722 )
723 .await
724 .unwrap();
725
726 conn.init_watermark(B::NAME, InitWatermark::default())
727 .await
728 .unwrap();
729 conn.set_committer_watermark(
730 B::NAME,
731 CommitterWatermark {
732 checkpoint_hi_inclusive: 10,
733 ..Default::default()
734 },
735 )
736 .await
737 .unwrap();
738
739 conn.init_watermark(C::NAME, InitWatermark::default())
740 .await
741 .unwrap();
742 conn.set_committer_watermark(
743 C::NAME,
744 CommitterWatermark {
745 checkpoint_hi_inclusive: 1,
746 ..Default::default()
747 },
748 )
749 .await
750 .unwrap();
751
752 conn.init_watermark(D::NAME, InitWatermark::default())
753 .await
754 .unwrap();
755 conn.set_committer_watermark(
756 D::NAME,
757 CommitterWatermark {
758 checkpoint_hi_inclusive: 50,
759 ..Default::default()
760 },
761 )
762 .await
763 .unwrap();
764
765 let indexer_args = IndexerArgs::default();
766 let temp_dir = tempfile::tempdir().unwrap();
767 let client_args = ClientArgs {
768 ingestion: IngestionClientArgs {
769 local_ingestion_path: Some(temp_dir.path().to_owned()),
770 ..Default::default()
771 },
772 ..Default::default()
773 };
774 let ingestion_config = IngestionConfig::default();
775
776 let mut indexer = Indexer::new(
777 store,
778 indexer_args,
779 client_args,
780 ingestion_config,
781 None,
782 ®istry,
783 )
784 .await
785 .unwrap();
786
787 indexer
788 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
789 .await
790 .unwrap();
791 indexer
792 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
793 .await
794 .unwrap();
795 indexer
796 .sequential_pipeline::<C>(C, SequentialConfig::default())
797 .await
798 .unwrap();
799 indexer
800 .sequential_pipeline::<D>(D, SequentialConfig::default())
801 .await
802 .unwrap();
803
804 assert_eq!(indexer.first_checkpoint, None);
805 assert_eq!(indexer.last_checkpoint, None);
806 assert_eq!(indexer.next_checkpoint, 2);
807 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
808 }
809
810 #[tokio::test]
812 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
813 let registry = Registry::new();
814 let store = MockStore::default();
815
816 test_pipeline!(A, "concurrent_a");
817 test_pipeline!(B, "concurrent_b");
818 test_pipeline!(C, "sequential_c");
819 test_pipeline!(D, "sequential_d");
820
821 let mut conn = store.connect().await.unwrap();
822 conn.set_committer_watermark(
823 B::NAME,
824 CommitterWatermark {
825 checkpoint_hi_inclusive: 10,
826 ..Default::default()
827 },
828 )
829 .await
830 .unwrap();
831 conn.set_committer_watermark(
832 C::NAME,
833 CommitterWatermark {
834 checkpoint_hi_inclusive: 1,
835 ..Default::default()
836 },
837 )
838 .await
839 .unwrap();
840
841 let indexer_args = IndexerArgs::default();
842 let temp_dir = tempfile::tempdir().unwrap();
843 let client_args = ClientArgs {
844 ingestion: IngestionClientArgs {
845 local_ingestion_path: Some(temp_dir.path().to_owned()),
846 ..Default::default()
847 },
848 ..Default::default()
849 };
850 let ingestion_config = IngestionConfig::default();
851
852 let mut indexer = Indexer::new(
853 store,
854 indexer_args,
855 client_args,
856 ingestion_config,
857 None,
858 ®istry,
859 )
860 .await
861 .unwrap();
862
863 indexer
864 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
865 .await
866 .unwrap();
867 indexer
868 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
869 .await
870 .unwrap();
871 indexer
872 .sequential_pipeline::<C>(C, SequentialConfig::default())
873 .await
874 .unwrap();
875 indexer
876 .sequential_pipeline::<D>(D, SequentialConfig::default())
877 .await
878 .unwrap();
879
880 assert_eq!(indexer.first_checkpoint, None);
881 assert_eq!(indexer.last_checkpoint, None);
882 assert_eq!(indexer.next_checkpoint, 0);
883 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
884 }
885
886 #[tokio::test]
888 async fn test_next_checkpoint_smallest_is_0() {
889 let registry = Registry::new();
890 let store = MockStore::default();
891
892 test_pipeline!(A, "concurrent_a");
893 test_pipeline!(B, "concurrent_b");
894 test_pipeline!(C, "sequential_c");
895 test_pipeline!(D, "sequential_d");
896
897 let mut conn = store.connect().await.unwrap();
898 conn.set_committer_watermark(
899 A::NAME,
900 CommitterWatermark {
901 checkpoint_hi_inclusive: 100,
902 ..Default::default()
903 },
904 )
905 .await
906 .unwrap();
907 conn.set_committer_watermark(
908 B::NAME,
909 CommitterWatermark {
910 checkpoint_hi_inclusive: 10,
911 ..Default::default()
912 },
913 )
914 .await
915 .unwrap();
916 conn.set_committer_watermark(
917 C::NAME,
918 CommitterWatermark {
919 checkpoint_hi_inclusive: 1,
920 ..Default::default()
921 },
922 )
923 .await
924 .unwrap();
925 conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
926 .await
927 .unwrap();
928
929 let indexer_args = IndexerArgs::default();
930 let temp_dir = tempfile::tempdir().unwrap();
931 let client_args = ClientArgs {
932 ingestion: IngestionClientArgs {
933 local_ingestion_path: Some(temp_dir.path().to_owned()),
934 ..Default::default()
935 },
936 ..Default::default()
937 };
938 let ingestion_config = IngestionConfig::default();
939
940 let mut indexer = Indexer::new(
941 store,
942 indexer_args,
943 client_args,
944 ingestion_config,
945 None,
946 ®istry,
947 )
948 .await
949 .unwrap();
950
951 indexer
952 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
953 .await
954 .unwrap();
955 indexer
956 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
957 .await
958 .unwrap();
959 indexer
960 .sequential_pipeline::<C>(C, SequentialConfig::default())
961 .await
962 .unwrap();
963 indexer
964 .sequential_pipeline::<D>(D, SequentialConfig::default())
965 .await
966 .unwrap();
967
968 assert_eq!(indexer.next_checkpoint, 1);
969 }
970
971 #[tokio::test]
974 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
975 let registry = Registry::new();
976 let store = MockStore::default();
977
978 test_pipeline!(A, "concurrent_a");
979 test_pipeline!(B, "concurrent_b");
980 test_pipeline!(C, "sequential_c");
981 test_pipeline!(D, "sequential_d");
982
983 let mut conn = store.connect().await.unwrap();
984 conn.set_committer_watermark(
985 B::NAME,
986 CommitterWatermark {
987 checkpoint_hi_inclusive: 50,
988 ..Default::default()
989 },
990 )
991 .await
992 .unwrap();
993 conn.set_committer_watermark(
994 C::NAME,
995 CommitterWatermark {
996 checkpoint_hi_inclusive: 10,
997 ..Default::default()
998 },
999 )
1000 .await
1001 .unwrap();
1002
1003 let indexer_args = IndexerArgs {
1004 first_checkpoint: Some(5),
1005 ..Default::default()
1006 };
1007 let temp_dir = tempfile::tempdir().unwrap();
1008 let client_args = ClientArgs {
1009 ingestion: IngestionClientArgs {
1010 local_ingestion_path: Some(temp_dir.path().to_owned()),
1011 ..Default::default()
1012 },
1013 ..Default::default()
1014 };
1015 let ingestion_config = IngestionConfig::default();
1016
1017 let mut indexer = Indexer::new(
1018 store,
1019 indexer_args,
1020 client_args,
1021 ingestion_config,
1022 None,
1023 ®istry,
1024 )
1025 .await
1026 .unwrap();
1027
1028 indexer
1029 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1030 .await
1031 .unwrap();
1032 indexer
1033 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1034 .await
1035 .unwrap();
1036 indexer
1037 .sequential_pipeline::<C>(C, SequentialConfig::default())
1038 .await
1039 .unwrap();
1040 indexer
1041 .sequential_pipeline::<D>(D, SequentialConfig::default())
1042 .await
1043 .unwrap();
1044
1045 assert_eq!(indexer.first_checkpoint, Some(5));
1046 assert_eq!(indexer.last_checkpoint, None);
1047 assert_eq!(indexer.next_checkpoint, 5);
1048 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1049 }
1050
1051 #[tokio::test]
1054 async fn test_next_checkpoint_ignore_first_checkpoint() {
1055 let registry = Registry::new();
1056 let store = MockStore::default();
1057
1058 test_pipeline!(B, "concurrent_b");
1059 test_pipeline!(C, "sequential_c");
1060
1061 let mut conn = store.connect().await.unwrap();
1062 conn.set_committer_watermark(
1063 B::NAME,
1064 CommitterWatermark {
1065 checkpoint_hi_inclusive: 50,
1066 ..Default::default()
1067 },
1068 )
1069 .await
1070 .unwrap();
1071 conn.set_committer_watermark(
1072 C::NAME,
1073 CommitterWatermark {
1074 checkpoint_hi_inclusive: 10,
1075 ..Default::default()
1076 },
1077 )
1078 .await
1079 .unwrap();
1080
1081 let indexer_args = IndexerArgs {
1082 first_checkpoint: Some(5),
1083 ..Default::default()
1084 };
1085 let temp_dir = tempfile::tempdir().unwrap();
1086 let client_args = ClientArgs {
1087 ingestion: IngestionClientArgs {
1088 local_ingestion_path: Some(temp_dir.path().to_owned()),
1089 ..Default::default()
1090 },
1091 ..Default::default()
1092 };
1093 let ingestion_config = IngestionConfig::default();
1094
1095 let mut indexer = Indexer::new(
1096 store,
1097 indexer_args,
1098 client_args,
1099 ingestion_config,
1100 None,
1101 ®istry,
1102 )
1103 .await
1104 .unwrap();
1105
1106 indexer
1107 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1108 .await
1109 .unwrap();
1110 indexer
1111 .sequential_pipeline::<C>(C, SequentialConfig::default())
1112 .await
1113 .unwrap();
1114
1115 assert_eq!(indexer.first_checkpoint, Some(5));
1116 assert_eq!(indexer.last_checkpoint, None);
1117 assert_eq!(indexer.next_checkpoint, 11);
1118 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1119 }
1120
1121 #[tokio::test]
1125 async fn test_next_checkpoint_large_first_checkpoint() {
1126 let registry = Registry::new();
1127 let store = MockStore::default();
1128
1129 test_pipeline!(A, "concurrent_a");
1130 test_pipeline!(B, "concurrent_b");
1131 test_pipeline!(C, "sequential_c");
1132
1133 let mut conn = store.connect().await.unwrap();
1134 conn.set_committer_watermark(
1135 B::NAME,
1136 CommitterWatermark {
1137 checkpoint_hi_inclusive: 50,
1138 ..Default::default()
1139 },
1140 )
1141 .await
1142 .unwrap();
1143 conn.set_committer_watermark(
1144 C::NAME,
1145 CommitterWatermark {
1146 checkpoint_hi_inclusive: 10,
1147 ..Default::default()
1148 },
1149 )
1150 .await
1151 .unwrap();
1152
1153 let indexer_args = IndexerArgs {
1154 first_checkpoint: Some(24),
1155 ..Default::default()
1156 };
1157 let temp_dir = tempfile::tempdir().unwrap();
1158 let client_args = ClientArgs {
1159 ingestion: IngestionClientArgs {
1160 local_ingestion_path: Some(temp_dir.path().to_owned()),
1161 ..Default::default()
1162 },
1163 ..Default::default()
1164 };
1165 let ingestion_config = IngestionConfig::default();
1166
1167 let mut indexer = Indexer::new(
1168 store,
1169 indexer_args,
1170 client_args,
1171 ingestion_config,
1172 None,
1173 ®istry,
1174 )
1175 .await
1176 .unwrap();
1177
1178 indexer
1179 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1180 .await
1181 .unwrap();
1182
1183 indexer
1184 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1185 .await
1186 .unwrap();
1187
1188 indexer
1189 .sequential_pipeline::<C>(C, SequentialConfig::default())
1190 .await
1191 .unwrap();
1192
1193 assert_eq!(indexer.first_checkpoint, Some(24));
1194 assert_eq!(indexer.last_checkpoint, None);
1195 assert_eq!(indexer.next_checkpoint, 11);
1196 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1197 }
1198
1199 #[tokio::test]
1201 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1202 let registry = Registry::new();
1203 let store = MockStore::default();
1204
1205 test_pipeline!(A, "concurrent_a");
1206 test_pipeline!(B, "concurrent_b");
1207 test_pipeline!(C, "sequential_c");
1208 test_pipeline!(D, "sequential_d");
1209
1210 let mut conn = store.connect().await.unwrap();
1211 conn.set_committer_watermark(
1212 A::NAME,
1213 CommitterWatermark {
1214 checkpoint_hi_inclusive: 5,
1215 ..Default::default()
1216 },
1217 )
1218 .await
1219 .unwrap();
1220 conn.set_committer_watermark(
1221 B::NAME,
1222 CommitterWatermark {
1223 checkpoint_hi_inclusive: 10,
1224 ..Default::default()
1225 },
1226 )
1227 .await
1228 .unwrap();
1229 conn.set_committer_watermark(
1230 C::NAME,
1231 CommitterWatermark {
1232 checkpoint_hi_inclusive: 15,
1233 ..Default::default()
1234 },
1235 )
1236 .await
1237 .unwrap();
1238 conn.set_committer_watermark(
1239 D::NAME,
1240 CommitterWatermark {
1241 checkpoint_hi_inclusive: 20,
1242 ..Default::default()
1243 },
1244 )
1245 .await
1246 .unwrap();
1247
1248 let temp_dir = tempfile::tempdir().unwrap();
1250 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1251 ingestion_dir: temp_dir.path().to_owned(),
1252 starting_checkpoint: 0,
1253 num_checkpoints: 30,
1254 checkpoint_size: 1,
1255 })
1256 .await;
1257
1258 let indexer_args = IndexerArgs {
1259 last_checkpoint: Some(29),
1260 ..Default::default()
1261 };
1262
1263 let client_args = ClientArgs {
1264 ingestion: IngestionClientArgs {
1265 local_ingestion_path: Some(temp_dir.path().to_owned()),
1266 ..Default::default()
1267 },
1268 ..Default::default()
1269 };
1270
1271 let ingestion_config = IngestionConfig::default();
1272
1273 let mut indexer = Indexer::new(
1274 store.clone(),
1275 indexer_args,
1276 client_args,
1277 ingestion_config,
1278 None,
1279 ®istry,
1280 )
1281 .await
1282 .unwrap();
1283
1284 indexer
1285 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1286 .await
1287 .unwrap();
1288 indexer
1289 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1290 .await
1291 .unwrap();
1292 indexer
1293 .sequential_pipeline::<C>(C, SequentialConfig::default())
1294 .await
1295 .unwrap();
1296 indexer
1297 .sequential_pipeline::<D>(D, SequentialConfig::default())
1298 .await
1299 .unwrap();
1300
1301 let ingestion_metrics = indexer.ingestion_metrics().clone();
1302 let indexer_metrics = indexer.indexer_metrics().clone();
1303
1304 indexer.run().await.unwrap().join().await.unwrap();
1305
1306 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1307 assert_eq!(
1308 indexer_metrics
1309 .total_watermarks_out_of_order
1310 .get_metric_with_label_values(&[A::NAME])
1311 .unwrap()
1312 .get(),
1313 0
1314 );
1315 assert_eq!(
1316 indexer_metrics
1317 .total_watermarks_out_of_order
1318 .get_metric_with_label_values(&[B::NAME])
1319 .unwrap()
1320 .get(),
1321 5
1322 );
1323 assert_eq!(
1324 indexer_metrics
1325 .total_watermarks_out_of_order
1326 .get_metric_with_label_values(&[C::NAME])
1327 .unwrap()
1328 .get(),
1329 10
1330 );
1331 assert_eq!(
1332 indexer_metrics
1333 .total_watermarks_out_of_order
1334 .get_metric_with_label_values(&[D::NAME])
1335 .unwrap()
1336 .get(),
1337 15
1338 );
1339 }
1340
1341 #[tokio::test]
1343 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1344 let registry = Registry::new();
1345 let store = MockStore::default();
1346
1347 test_pipeline!(A, "concurrent_a");
1348 test_pipeline!(B, "concurrent_b");
1349 test_pipeline!(C, "sequential_c");
1350 test_pipeline!(D, "sequential_d");
1351
1352 let mut conn = store.connect().await.unwrap();
1353 conn.set_committer_watermark(
1354 A::NAME,
1355 CommitterWatermark {
1356 checkpoint_hi_inclusive: 5,
1357 ..Default::default()
1358 },
1359 )
1360 .await
1361 .unwrap();
1362 conn.set_committer_watermark(
1363 B::NAME,
1364 CommitterWatermark {
1365 checkpoint_hi_inclusive: 10,
1366 ..Default::default()
1367 },
1368 )
1369 .await
1370 .unwrap();
1371 conn.set_committer_watermark(
1372 C::NAME,
1373 CommitterWatermark {
1374 checkpoint_hi_inclusive: 15,
1375 ..Default::default()
1376 },
1377 )
1378 .await
1379 .unwrap();
1380 conn.set_committer_watermark(
1381 D::NAME,
1382 CommitterWatermark {
1383 checkpoint_hi_inclusive: 20,
1384 ..Default::default()
1385 },
1386 )
1387 .await
1388 .unwrap();
1389
1390 let temp_dir = tempfile::tempdir().unwrap();
1392 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1393 ingestion_dir: temp_dir.path().to_owned(),
1394 starting_checkpoint: 0,
1395 num_checkpoints: 30,
1396 checkpoint_size: 1,
1397 })
1398 .await;
1399
1400 let indexer_args = IndexerArgs {
1401 first_checkpoint: Some(3),
1402 last_checkpoint: Some(29),
1403 ..Default::default()
1404 };
1405
1406 let client_args = ClientArgs {
1407 ingestion: IngestionClientArgs {
1408 local_ingestion_path: Some(temp_dir.path().to_owned()),
1409 ..Default::default()
1410 },
1411 ..Default::default()
1412 };
1413
1414 let ingestion_config = IngestionConfig::default();
1415
1416 let mut indexer = Indexer::new(
1417 store.clone(),
1418 indexer_args,
1419 client_args,
1420 ingestion_config,
1421 None,
1422 ®istry,
1423 )
1424 .await
1425 .unwrap();
1426
1427 indexer
1428 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1429 .await
1430 .unwrap();
1431 indexer
1432 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1433 .await
1434 .unwrap();
1435 indexer
1436 .sequential_pipeline::<C>(C, SequentialConfig::default())
1437 .await
1438 .unwrap();
1439 indexer
1440 .sequential_pipeline::<D>(D, SequentialConfig::default())
1441 .await
1442 .unwrap();
1443
1444 let ingestion_metrics = indexer.ingestion_metrics().clone();
1445 let metrics = indexer.indexer_metrics().clone();
1446 indexer.run().await.unwrap().join().await.unwrap();
1447
1448 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1449 assert_eq!(
1450 metrics
1451 .total_watermarks_out_of_order
1452 .get_metric_with_label_values(&[A::NAME])
1453 .unwrap()
1454 .get(),
1455 0
1456 );
1457 assert_eq!(
1458 metrics
1459 .total_watermarks_out_of_order
1460 .get_metric_with_label_values(&[B::NAME])
1461 .unwrap()
1462 .get(),
1463 5
1464 );
1465 assert_eq!(
1466 metrics
1467 .total_watermarks_out_of_order
1468 .get_metric_with_label_values(&[C::NAME])
1469 .unwrap()
1470 .get(),
1471 10
1472 );
1473 assert_eq!(
1474 metrics
1475 .total_watermarks_out_of_order
1476 .get_metric_with_label_values(&[D::NAME])
1477 .unwrap()
1478 .get(),
1479 15
1480 );
1481 }
1482
1483 #[tokio::test]
1485 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1486 let registry = Registry::new();
1487 let store = MockStore::default();
1488
1489 test_pipeline!(A, "concurrent_a");
1490 test_pipeline!(B, "concurrent_b");
1491 test_pipeline!(C, "sequential_c");
1492 test_pipeline!(D, "sequential_d");
1493
1494 let mut conn = store.connect().await.unwrap();
1495 conn.set_committer_watermark(
1496 B::NAME,
1497 CommitterWatermark {
1498 checkpoint_hi_inclusive: 10,
1499 ..Default::default()
1500 },
1501 )
1502 .await
1503 .unwrap();
1504 conn.set_committer_watermark(
1505 C::NAME,
1506 CommitterWatermark {
1507 checkpoint_hi_inclusive: 15,
1508 ..Default::default()
1509 },
1510 )
1511 .await
1512 .unwrap();
1513 conn.set_committer_watermark(
1514 D::NAME,
1515 CommitterWatermark {
1516 checkpoint_hi_inclusive: 20,
1517 ..Default::default()
1518 },
1519 )
1520 .await
1521 .unwrap();
1522
1523 let temp_dir = tempfile::tempdir().unwrap();
1525 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1526 ingestion_dir: temp_dir.path().to_owned(),
1527 starting_checkpoint: 0,
1528 num_checkpoints: 30,
1529 checkpoint_size: 1,
1530 })
1531 .await;
1532
1533 let indexer_args = IndexerArgs {
1534 last_checkpoint: Some(29),
1535 ..Default::default()
1536 };
1537
1538 let client_args = ClientArgs {
1539 ingestion: IngestionClientArgs {
1540 local_ingestion_path: Some(temp_dir.path().to_owned()),
1541 ..Default::default()
1542 },
1543 ..Default::default()
1544 };
1545
1546 let ingestion_config = IngestionConfig::default();
1547
1548 let mut indexer = Indexer::new(
1549 store.clone(),
1550 indexer_args,
1551 client_args,
1552 ingestion_config,
1553 None,
1554 ®istry,
1555 )
1556 .await
1557 .unwrap();
1558
1559 indexer
1560 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1561 .await
1562 .unwrap();
1563 indexer
1564 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1565 .await
1566 .unwrap();
1567 indexer
1568 .sequential_pipeline::<C>(C, SequentialConfig::default())
1569 .await
1570 .unwrap();
1571 indexer
1572 .sequential_pipeline::<D>(D, SequentialConfig::default())
1573 .await
1574 .unwrap();
1575
1576 let ingestion_metrics = indexer.ingestion_metrics().clone();
1577 let metrics = indexer.indexer_metrics().clone();
1578 indexer.run().await.unwrap().join().await.unwrap();
1579
1580 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1581 assert_eq!(
1582 metrics
1583 .total_watermarks_out_of_order
1584 .get_metric_with_label_values(&[A::NAME])
1585 .unwrap()
1586 .get(),
1587 0
1588 );
1589 assert_eq!(
1590 metrics
1591 .total_watermarks_out_of_order
1592 .get_metric_with_label_values(&[B::NAME])
1593 .unwrap()
1594 .get(),
1595 11
1596 );
1597 assert_eq!(
1598 metrics
1599 .total_watermarks_out_of_order
1600 .get_metric_with_label_values(&[C::NAME])
1601 .unwrap()
1602 .get(),
1603 16
1604 );
1605 assert_eq!(
1606 metrics
1607 .total_watermarks_out_of_order
1608 .get_metric_with_label_values(&[D::NAME])
1609 .unwrap()
1610 .get(),
1611 21
1612 );
1613 }
1614
1615 #[tokio::test]
1617 async fn test_indexer_ingestion_use_first_checkpoint() {
1618 let registry = Registry::new();
1619 let store = MockStore::default();
1620
1621 test_pipeline!(A, "concurrent_a");
1622 test_pipeline!(B, "concurrent_b");
1623 test_pipeline!(C, "sequential_c");
1624 test_pipeline!(D, "sequential_d");
1625
1626 let mut conn = store.connect().await.unwrap();
1627 conn.set_committer_watermark(
1628 B::NAME,
1629 CommitterWatermark {
1630 checkpoint_hi_inclusive: 10,
1631 ..Default::default()
1632 },
1633 )
1634 .await
1635 .unwrap();
1636 conn.set_committer_watermark(
1637 C::NAME,
1638 CommitterWatermark {
1639 checkpoint_hi_inclusive: 15,
1640 ..Default::default()
1641 },
1642 )
1643 .await
1644 .unwrap();
1645 conn.set_committer_watermark(
1646 D::NAME,
1647 CommitterWatermark {
1648 checkpoint_hi_inclusive: 20,
1649 ..Default::default()
1650 },
1651 )
1652 .await
1653 .unwrap();
1654
1655 let temp_dir = tempfile::tempdir().unwrap();
1657 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1658 ingestion_dir: temp_dir.path().to_owned(),
1659 starting_checkpoint: 0,
1660 num_checkpoints: 30,
1661 checkpoint_size: 1,
1662 })
1663 .await;
1664
1665 let indexer_args = IndexerArgs {
1666 first_checkpoint: Some(10),
1667 last_checkpoint: Some(29),
1668 ..Default::default()
1669 };
1670
1671 let client_args = ClientArgs {
1672 ingestion: IngestionClientArgs {
1673 local_ingestion_path: Some(temp_dir.path().to_owned()),
1674 ..Default::default()
1675 },
1676 ..Default::default()
1677 };
1678
1679 let ingestion_config = IngestionConfig::default();
1680
1681 let mut indexer = Indexer::new(
1682 store.clone(),
1683 indexer_args,
1684 client_args,
1685 ingestion_config,
1686 None,
1687 ®istry,
1688 )
1689 .await
1690 .unwrap();
1691
1692 indexer
1693 .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1694 .await
1695 .unwrap();
1696 indexer
1697 .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1698 .await
1699 .unwrap();
1700 indexer
1701 .sequential_pipeline::<C>(C, SequentialConfig::default())
1702 .await
1703 .unwrap();
1704 indexer
1705 .sequential_pipeline::<D>(D, SequentialConfig::default())
1706 .await
1707 .unwrap();
1708
1709 let ingestion_metrics = indexer.ingestion_metrics().clone();
1710 let metrics = indexer.indexer_metrics().clone();
1711 indexer.run().await.unwrap().join().await.unwrap();
1712
1713 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1714 assert_eq!(
1715 metrics
1716 .total_watermarks_out_of_order
1717 .get_metric_with_label_values(&[A::NAME])
1718 .unwrap()
1719 .get(),
1720 0
1721 );
1722 assert_eq!(
1723 metrics
1724 .total_watermarks_out_of_order
1725 .get_metric_with_label_values(&[B::NAME])
1726 .unwrap()
1727 .get(),
1728 1
1729 );
1730 assert_eq!(
1731 metrics
1732 .total_watermarks_out_of_order
1733 .get_metric_with_label_values(&[C::NAME])
1734 .unwrap()
1735 .get(),
1736 6
1737 );
1738 assert_eq!(
1739 metrics
1740 .total_watermarks_out_of_order
1741 .get_metric_with_label_values(&[D::NAME])
1742 .unwrap()
1743 .get(),
1744 11
1745 );
1746 }
1747
1748 #[tokio::test]
1749 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1750 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1751 assert_eq!(committer_watermark, None);
1752 assert_eq!(pruner_watermark, None);
1753 }
1754
1755 #[tokio::test]
1756 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1757 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1758 assert_eq!(committer_watermark, None);
1759 assert_eq!(pruner_watermark, None);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1764 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1765
1766 let committer_watermark = committer_watermark.unwrap();
1767 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1768
1769 let pruner_watermark = pruner_watermark.unwrap();
1770 assert_eq!(pruner_watermark.reader_lo, 1);
1771 assert_eq!(pruner_watermark.pruner_hi, 1);
1772 }
1773
1774 #[tokio::test]
1775 async fn test_init_watermark_sequential() {
1776 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1777
1778 let committer_watermark = committer_watermark.unwrap();
1779 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1780
1781 let pruner_watermark = pruner_watermark.unwrap();
1782 assert_eq!(pruner_watermark.reader_lo, 1);
1783 assert_eq!(pruner_watermark.pruner_hi, 1);
1784 }
1785
1786 #[tokio::test]
1787 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1788 let registry = Registry::new();
1789 let store = MockStore::default();
1790
1791 let mut conn = store.connect().await.unwrap();
1793
1794 conn.set_committer_watermark(
1796 MockHandler::NAME,
1797 CommitterWatermark {
1798 epoch_hi_inclusive: 0,
1799 checkpoint_hi_inclusive: 10,
1800 tx_hi: 20,
1801 timestamp_ms_hi_inclusive: 10000,
1802 },
1803 )
1804 .await
1805 .unwrap();
1806
1807 conn.set_committer_watermark(
1809 SequentialHandler::NAME,
1810 CommitterWatermark {
1811 epoch_hi_inclusive: 0,
1812 checkpoint_hi_inclusive: 5,
1813 tx_hi: 10,
1814 timestamp_ms_hi_inclusive: 5000,
1815 },
1816 )
1817 .await
1818 .unwrap();
1819
1820 let temp_dir = tempfile::tempdir().unwrap();
1822 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1823 ingestion_dir: temp_dir.path().to_owned(),
1824 starting_checkpoint: 0,
1825 num_checkpoints: 20,
1826 checkpoint_size: 2,
1827 })
1828 .await;
1829
1830 let indexer_args = IndexerArgs {
1831 first_checkpoint: None,
1832 last_checkpoint: Some(19),
1833 pipeline: vec![],
1834 ..Default::default()
1835 };
1836
1837 let client_args = ClientArgs {
1838 ingestion: IngestionClientArgs {
1839 local_ingestion_path: Some(temp_dir.path().to_owned()),
1840 ..Default::default()
1841 },
1842 ..Default::default()
1843 };
1844
1845 let ingestion_config = IngestionConfig::default();
1846
1847 let mut indexer = Indexer::new(
1848 store.clone(),
1849 indexer_args,
1850 client_args,
1851 ingestion_config,
1852 None,
1853 ®istry,
1854 )
1855 .await
1856 .unwrap();
1857
1858 indexer
1860 .sequential_pipeline(MockHandler, SequentialConfig::default())
1861 .await
1862 .unwrap();
1863
1864 assert_eq!(
1866 indexer.next_sequential_checkpoint(),
1867 Some(11),
1868 "next_sequential_checkpoint should be 11"
1869 );
1870
1871 indexer
1873 .sequential_pipeline(SequentialHandler, SequentialConfig::default())
1874 .await
1875 .unwrap();
1876
1877 assert_eq!(
1879 indexer.next_sequential_checkpoint(),
1880 Some(6),
1881 "next_sequential_checkpoint should still be 6"
1882 );
1883
1884 indexer.run().await.unwrap().join().await.unwrap();
1886
1887 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1889 let watermark2 = conn
1890 .committer_watermark(SequentialHandler::NAME)
1891 .await
1892 .unwrap();
1893
1894 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1895 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1896 }
1897
1898 #[tokio::test]
1902 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1903 let registry = Registry::new();
1904 let store = MockStore::default();
1905
1906 let mut conn = store.connect().await.unwrap();
1909 conn.set_committer_watermark(
1910 MockCheckpointSequenceNumberHandler::NAME,
1911 CommitterWatermark {
1912 checkpoint_hi_inclusive: 10,
1913 ..Default::default()
1914 },
1915 )
1916 .await
1917 .unwrap();
1918 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1919 .await
1920 .unwrap();
1921
1922 let indexer_args = IndexerArgs {
1925 first_checkpoint: Some(0),
1926 last_checkpoint: Some(15),
1927 task: TaskArgs::tasked("task".to_string(), 10),
1928 ..Default::default()
1929 };
1930 let temp_dir = tempfile::tempdir().unwrap();
1931 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1932 ingestion_dir: temp_dir.path().to_owned(),
1933 starting_checkpoint: 0,
1934 num_checkpoints: 16,
1935 checkpoint_size: 2,
1936 })
1937 .await;
1938
1939 let client_args = ClientArgs {
1940 ingestion: IngestionClientArgs {
1941 local_ingestion_path: Some(temp_dir.path().to_owned()),
1942 ..Default::default()
1943 },
1944 ..Default::default()
1945 };
1946
1947 let ingestion_config = IngestionConfig::default();
1948
1949 let mut tasked_indexer = Indexer::new(
1950 store.clone(),
1951 indexer_args,
1952 client_args,
1953 ingestion_config,
1954 None,
1955 ®istry,
1956 )
1957 .await
1958 .unwrap();
1959
1960 let _ = tasked_indexer
1961 .concurrent_pipeline(
1962 MockCheckpointSequenceNumberHandler,
1963 ConcurrentConfig::default(),
1964 )
1965 .await;
1966
1967 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1968 let metrics = tasked_indexer.indexer_metrics().clone();
1969
1970 tasked_indexer.run().await.unwrap().join().await.unwrap();
1971
1972 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1973 assert_eq!(
1974 metrics
1975 .total_collector_skipped_checkpoints
1976 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1977 .unwrap()
1978 .get(),
1979 7
1980 );
1981 let data = store
1982 .data
1983 .get(MockCheckpointSequenceNumberHandler::NAME)
1984 .unwrap();
1985 assert_eq!(data.len(), 9);
1986 for i in 0..7 {
1987 assert!(data.get(&i).is_none());
1988 }
1989 for i in 7..16 {
1990 assert!(data.get(&i).is_some());
1991 }
1992 }
1993
1994 #[tokio::test]
1996 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1997 let registry = Registry::new();
1998 let store = MockStore::default();
1999
2000 let mut conn = store.connect().await.unwrap();
2001 conn.set_committer_watermark(
2002 "test",
2003 CommitterWatermark {
2004 checkpoint_hi_inclusive: 10,
2005 ..Default::default()
2006 },
2007 )
2008 .await
2009 .unwrap();
2010 conn.set_reader_watermark("test", 5).await.unwrap();
2011
2012 let indexer_args = IndexerArgs {
2015 first_checkpoint: Some(9),
2016 last_checkpoint: Some(25),
2017 task: TaskArgs::tasked("task".to_string(), 10),
2018 ..Default::default()
2019 };
2020 let temp_dir = tempfile::tempdir().unwrap();
2021 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2022 ingestion_dir: temp_dir.path().to_owned(),
2023 starting_checkpoint: 0,
2024 num_checkpoints: 26,
2025 checkpoint_size: 2,
2026 })
2027 .await;
2028
2029 let client_args = ClientArgs {
2030 ingestion: IngestionClientArgs {
2031 local_ingestion_path: Some(temp_dir.path().to_owned()),
2032 ..Default::default()
2033 },
2034 ..Default::default()
2035 };
2036
2037 let ingestion_config = IngestionConfig::default();
2038
2039 let mut tasked_indexer = Indexer::new(
2040 store.clone(),
2041 indexer_args,
2042 client_args,
2043 ingestion_config,
2044 None,
2045 ®istry,
2046 )
2047 .await
2048 .unwrap();
2049
2050 let _ = tasked_indexer
2051 .concurrent_pipeline(
2052 MockCheckpointSequenceNumberHandler,
2053 ConcurrentConfig::default(),
2054 )
2055 .await;
2056
2057 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2058 let metrics = tasked_indexer.indexer_metrics().clone();
2059
2060 tasked_indexer.run().await.unwrap().join().await.unwrap();
2061
2062 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2063 assert_eq!(
2064 metrics
2065 .total_watermarks_out_of_order
2066 .get_metric_with_label_values(&["test"])
2067 .unwrap()
2068 .get(),
2069 0
2070 );
2071 assert_eq!(
2072 metrics
2073 .total_collector_skipped_checkpoints
2074 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2075 .unwrap()
2076 .get(),
2077 0
2078 );
2079
2080 let data = store.data.get("test").unwrap();
2081 assert_eq!(data.len(), 17);
2082 for i in 0..9 {
2083 assert!(data.get(&i).is_none());
2084 }
2085 for i in 9..26 {
2086 assert!(data.get(&i).is_some());
2087 }
2088 let main_pipeline_watermark = store.watermark("test").unwrap();
2089 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2091 assert_eq!(main_pipeline_watermark.reader_lo, 5);
2092 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2093 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2094 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2095 }
2096
2097 #[tokio::test]
2100 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2101 let registry = Registry::new();
2102 let store = MockStore::default();
2103 let mut conn = store.connect().await.unwrap();
2104 conn.set_committer_watermark(
2106 ControllableHandler::NAME,
2107 CommitterWatermark {
2108 checkpoint_hi_inclusive: 11,
2109 ..Default::default()
2110 },
2111 )
2112 .await
2113 .unwrap();
2114
2115 let temp_dir = tempfile::tempdir().unwrap();
2117 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2118 ingestion_dir: temp_dir.path().to_owned(),
2119 starting_checkpoint: 0,
2120 num_checkpoints: 501,
2121 checkpoint_size: 2,
2122 })
2123 .await;
2124 let indexer_args = IndexerArgs {
2125 first_checkpoint: Some(0),
2126 last_checkpoint: Some(500),
2127 task: TaskArgs::tasked("task".to_string(), 10 ),
2128 ..Default::default()
2129 };
2130 let client_args = ClientArgs {
2131 ingestion: IngestionClientArgs {
2132 local_ingestion_path: Some(temp_dir.path().to_owned()),
2133 ..Default::default()
2134 },
2135 ..Default::default()
2136 };
2137 let ingestion_config = IngestionConfig::default();
2138 let mut tasked_indexer = Indexer::new(
2139 store.clone(),
2140 indexer_args,
2141 client_args,
2142 ingestion_config,
2143 None,
2144 ®istry,
2145 )
2146 .await
2147 .unwrap();
2148 let mut allow_process = 10;
2149 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2151 let _ = tasked_indexer
2152 .concurrent_pipeline(
2153 controllable_handler,
2154 ConcurrentConfig {
2155 committer: CommitterConfig {
2156 collect_interval_ms: 10,
2157 watermark_interval_ms: 10,
2158 ..Default::default()
2159 },
2160 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2163 ..Default::default()
2164 },
2165 )
2166 .await;
2167 let metrics = tasked_indexer.indexer_metrics().clone();
2168
2169 let mut s_indexer = tasked_indexer.run().await.unwrap();
2170
2171 store
2175 .wait_for_watermark(
2176 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2177 10,
2178 Duration::from_secs(10),
2179 )
2180 .await;
2181
2182 conn.set_reader_watermark(ControllableHandler::NAME, 250)
2187 .await
2188 .unwrap();
2189
2190 let reader_lo = metrics
2191 .collector_reader_lo
2192 .with_label_values(&[ControllableHandler::NAME]);
2193
2194 let mut interval = tokio::time::interval(Duration::from_millis(10));
2198 while reader_lo.get() != 250 {
2199 interval.tick().await;
2200 allow_process += 1;
2202 assert!(
2203 allow_process <= 500,
2204 "Released all checkpoints but collector never observed new reader_lo"
2205 );
2206 process_below.send(allow_process).ok();
2207 }
2208
2209 process_below.send(500).ok();
2216
2217 s_indexer.join().await.unwrap();
2218
2219 let data = store.data.get(ControllableHandler::NAME).unwrap();
2220
2221 for chkpt in (allow_process + 1)..250 {
2223 assert!(
2224 data.get(&chkpt).is_none(),
2225 "Checkpoint {chkpt} should have been skipped"
2226 );
2227 }
2228
2229 for chkpt in 250..=500 {
2231 assert!(
2232 data.get(&chkpt).is_some(),
2233 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2234 );
2235 }
2236
2237 for chkpt in 0..=10 {
2239 assert!(
2240 data.get(&chkpt).is_some(),
2241 "Checkpoint {chkpt} should have been committed (baseline)"
2242 );
2243 }
2244 }
2245}