1use std::{collections::BTreeSet, sync::Arc};
5
6use anyhow::{Context, ensure};
7use futures::future;
8use ingestion::{ClientArgs, IngestionConfig, IngestionService, client::IngestionClient};
9use metrics::IndexerMetrics;
10use pipeline::{
11 Processor,
12 concurrent::{self, ConcurrentConfig},
13 sequential::{self, Handler, SequentialConfig},
14};
15use prometheus::Registry;
16use sui_indexer_alt_framework_store_traits::{
17 CommitterWatermark, Connection, Store, TransactionalStore,
18};
19use tokio::task::JoinHandle;
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22
23pub use anyhow::Result;
24pub use sui_field_count::FieldCount;
25pub use sui_indexer_alt_framework_store_traits as store;
27pub use sui_types as types;
28
29#[cfg(feature = "cluster")]
30pub mod cluster;
31pub mod ingestion;
32pub mod metrics;
33pub mod pipeline;
34#[cfg(feature = "postgres")]
35pub mod postgres;
36pub mod task;
37
38#[cfg(test)]
39pub mod mocks;
40
41#[derive(clap::Args, Default, Debug, Clone)]
43pub struct IndexerArgs {
44 #[arg(long)]
62 pub first_checkpoint: Option<u64>,
63
64 #[arg(long)]
67 pub last_checkpoint: Option<u64>,
68
69 #[arg(long, action = clap::ArgAction::Append)]
72 pub pipeline: Vec<String>,
73
74 #[arg(long)]
76 pub skip_watermark: bool,
77}
78
79pub struct Indexer<S: Store> {
80 store: S,
84
85 metrics: Arc<IndexerMetrics>,
87
88 ingestion_service: IngestionService,
90
91 first_checkpoint: Option<u64>,
93
94 last_checkpoint: Option<u64>,
96
97 skip_watermark: bool,
99
100 enabled_pipelines: Option<BTreeSet<String>>,
104
105 added_pipelines: BTreeSet<&'static str>,
108
109 cancel: CancellationToken,
111
112 first_checkpoint_from_watermark: u64,
116
117 next_sequential_checkpoint: Option<u64>,
120
121 handles: Vec<JoinHandle<()>>,
123}
124
125impl<S: Store> Indexer<S> {
126 pub async fn new(
138 store: S,
139 indexer_args: IndexerArgs,
140 client_args: ClientArgs,
141 ingestion_config: IngestionConfig,
142 metrics_prefix: Option<&str>,
143 registry: &Registry,
144 cancel: CancellationToken,
145 ) -> Result<Self> {
146 let IndexerArgs {
147 first_checkpoint,
148 last_checkpoint,
149 pipeline,
150 skip_watermark,
151 } = indexer_args;
152
153 let metrics = IndexerMetrics::new(metrics_prefix, registry);
154
155 let ingestion_service = IngestionService::new(
156 client_args,
157 ingestion_config,
158 metrics.clone(),
159 cancel.clone(),
160 )?;
161
162 Ok(Self {
163 store,
164 metrics,
165 ingestion_service,
166 first_checkpoint,
167 last_checkpoint,
168 skip_watermark,
169 enabled_pipelines: if pipeline.is_empty() {
170 None
171 } else {
172 Some(pipeline.into_iter().collect())
173 },
174 added_pipelines: BTreeSet::new(),
175 cancel,
176 first_checkpoint_from_watermark: u64::MAX,
177 next_sequential_checkpoint: None,
178 handles: vec![],
179 })
180 }
181
182 pub fn store(&self) -> &S {
184 &self.store
185 }
186
187 pub fn ingestion_client(&self) -> &IngestionClient {
189 self.ingestion_service.ingestion_client()
190 }
191
192 pub fn metrics(&self) -> &Arc<IndexerMetrics> {
194 &self.metrics
195 }
196
197 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
199 self.added_pipelines.iter().copied().filter(|p| {
200 self.enabled_pipelines
201 .as_ref()
202 .is_none_or(|e| e.contains(*p))
203 })
204 }
205
206 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
210 self.next_sequential_checkpoint
211 }
212
213 pub async fn concurrent_pipeline<H>(
220 &mut self,
221 handler: H,
222 config: ConcurrentConfig,
223 ) -> Result<()>
224 where
225 H: concurrent::Handler<Store = S> + Send + Sync + 'static,
226 {
227 let Some(watermark) = self.add_pipeline::<H>().await? else {
228 return Ok(());
229 };
230
231 let next_checkpoint = match (watermark, self.first_checkpoint) {
234 (Some(watermark), Some(first_checkpoint)) => {
235 if !self.skip_watermark {
239 ensure!(
240 first_checkpoint <= watermark.checkpoint_hi_inclusive + 1,
241 "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \
242 This could create gaps in the data.",
243 H::NAME,
244 first_checkpoint,
245 watermark.checkpoint_hi_inclusive,
246 );
247 }
248 first_checkpoint
249 }
250 (Some(watermark), _) => watermark.checkpoint_hi_inclusive + 1,
251 (_, Some(first_checkpoint)) => first_checkpoint,
252 (None, None) => 0,
253 };
254
255 self.handles.push(concurrent::pipeline::<H>(
256 handler,
257 next_checkpoint,
258 config,
259 self.skip_watermark,
260 self.store.clone(),
261 self.ingestion_service.subscribe().0,
262 self.metrics.clone(),
263 self.cancel.clone(),
264 ));
265
266 Ok(())
267 }
268
269 pub async fn run(mut self) -> Result<JoinHandle<()>> {
277 if let Some(enabled_pipelines) = self.enabled_pipelines {
278 ensure!(
279 enabled_pipelines.is_empty(),
280 "Tried to enable pipelines that this indexer does not know about: \
281 {enabled_pipelines:#?}",
282 );
283 }
284
285 let first_checkpoint = self
288 .first_checkpoint
289 .unwrap_or(self.first_checkpoint_from_watermark);
290
291 let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
292
293 info!(first_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
294
295 let broadcaster_handle = self
296 .ingestion_service
297 .run(
298 first_checkpoint..=last_checkpoint,
299 self.next_sequential_checkpoint,
300 )
301 .await
302 .context("Failed to start ingestion service")?;
303
304 self.handles.push(broadcaster_handle);
305
306 Ok(tokio::spawn(async move {
307 future::join_all(self.handles).await;
312 info!("Indexing pipeline gracefully shut down");
313 }))
314 }
315
316 async fn add_pipeline<P: Processor + 'static>(
321 &mut self,
322 ) -> Result<Option<Option<CommitterWatermark>>> {
323 ensure!(
324 self.added_pipelines.insert(P::NAME),
325 "Pipeline {:?} already added",
326 P::NAME,
327 );
328
329 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
330 && !enabled_pipelines.remove(P::NAME)
331 {
332 info!(pipeline = P::NAME, "Skipping");
333 return Ok(None);
334 }
335
336 let mut conn = self
337 .store
338 .connect()
339 .await
340 .context("Failed to establish connection to store")?;
341
342 let watermark = conn
343 .committer_watermark(P::NAME)
344 .await
345 .with_context(|| format!("Failed to get watermark for {}", P::NAME))?;
346
347 let expected_first_checkpoint = watermark
348 .as_ref()
349 .map(|w| w.checkpoint_hi_inclusive + 1)
350 .unwrap_or_default();
351
352 self.first_checkpoint_from_watermark =
353 expected_first_checkpoint.min(self.first_checkpoint_from_watermark);
354
355 Ok(Some(watermark))
356 }
357}
358
359impl<T: TransactionalStore> Indexer<T> {
360 pub async fn sequential_pipeline<H>(
371 &mut self,
372 handler: H,
373 config: SequentialConfig,
374 ) -> Result<()>
375 where
376 H: Handler<Store = T> + Send + Sync + 'static,
377 {
378 let Some(watermark) = self.add_pipeline::<H>().await? else {
379 return Ok(());
380 };
381
382 if self.skip_watermark {
383 warn!(
384 pipeline = H::NAME,
385 "--skip-watermarks enabled and ignored for sequential pipeline"
386 );
387 }
388
389 let next_checkpoint = match (watermark, self.first_checkpoint) {
392 (Some(watermark), Some(first_checkpoint)) => {
393 ensure!(
396 first_checkpoint <= watermark.checkpoint_hi_inclusive + 1,
397 "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \
398 This could create gaps in the data.",
399 H::NAME,
400 first_checkpoint,
401 watermark.checkpoint_hi_inclusive,
402 );
403 warn!(
406 pipeline = H::NAME,
407 first_checkpoint,
408 committer_hi = watermark.checkpoint_hi_inclusive,
409 "Ignoring --first-checkpoint and will resume from committer_hi",
410 );
411 watermark.checkpoint_hi_inclusive + 1
412 }
413 (Some(watermark), _) => watermark.checkpoint_hi_inclusive + 1,
416 (_, Some(first_checkpoint)) => first_checkpoint,
418 (None, None) => 0,
419 };
420
421 self.next_sequential_checkpoint = Some(
423 self.next_sequential_checkpoint
424 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
425 );
426
427 let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
428
429 self.handles.push(sequential::pipeline::<H>(
430 handler,
431 next_checkpoint,
432 config,
433 self.store.clone(),
434 checkpoint_rx,
435 watermark_tx,
436 self.metrics.clone(),
437 self.cancel.clone(),
438 ));
439
440 Ok(())
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::FieldCount;
448 use crate::mocks::store::MockStore;
449 use crate::pipeline::{Processor, concurrent::ConcurrentConfig};
450 use crate::store::CommitterWatermark;
451 use async_trait::async_trait;
452 use std::sync::Arc;
453 use sui_synthetic_ingestion::synthetic_ingestion;
454 use tokio_util::sync::CancellationToken;
455
456 #[async_trait]
457 impl Processor for MockHandler {
458 const NAME: &'static str = "test_processor";
459 type Value = MockValue;
460 async fn process(
461 &self,
462 _checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
463 ) -> anyhow::Result<Vec<Self::Value>> {
464 Ok(vec![MockValue(1)])
465 }
466 }
467
468 #[allow(dead_code)]
469 #[derive(Clone, FieldCount)]
470 struct MockValue(u64);
471
472 struct MockHandler;
473
474 #[async_trait]
475 impl crate::pipeline::concurrent::Handler for MockHandler {
476 type Store = MockStore;
477 type Batch = Vec<MockValue>;
478
479 fn batch(
480 &self,
481 batch: &mut Self::Batch,
482 values: &mut std::vec::IntoIter<Self::Value>,
483 ) -> crate::pipeline::concurrent::BatchStatus {
484 batch.extend(values);
485 crate::pipeline::concurrent::BatchStatus::Pending
486 }
487
488 async fn commit<'a>(
489 &self,
490 _batch: &Self::Batch,
491 _conn: &mut <Self::Store as Store>::Connection<'a>,
492 ) -> anyhow::Result<usize> {
493 Ok(1)
494 }
495 }
496
497 #[async_trait]
498 impl crate::pipeline::sequential::Handler for MockHandler {
499 type Store = MockStore;
500 type Batch = Vec<Self::Value>;
501
502 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
503 batch.extend(values);
504 }
505
506 async fn commit<'a>(
507 &self,
508 _batch: &Self::Batch,
509 _conn: &mut <Self::Store as Store>::Connection<'a>,
510 ) -> anyhow::Result<usize> {
511 Ok(1)
512 }
513 }
514
515 struct SequentialHandler;
517
518 #[async_trait]
519 impl Processor for SequentialHandler {
520 const NAME: &'static str = "sequential_handler";
521 type Value = MockValue;
522 async fn process(
523 &self,
524 _checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
525 ) -> anyhow::Result<Vec<Self::Value>> {
526 Ok(vec![MockValue(1)])
527 }
528 }
529
530 #[async_trait]
531 impl crate::pipeline::sequential::Handler for SequentialHandler {
532 type Store = MockStore;
533 type Batch = Vec<MockValue>;
534
535 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
536 batch.extend(values);
537 }
538
539 async fn commit<'a>(
540 &self,
541 _batch: &Self::Batch,
542 _conn: &mut <Self::Store as Store>::Connection<'a>,
543 ) -> anyhow::Result<usize> {
544 Ok(1)
545 }
546 }
547
548 #[tokio::test]
549 async fn test_first_checkpoint_from_watermark() {
550 let cancel = CancellationToken::new();
551 let registry = Registry::new();
552
553 let store = MockStore::default();
554 let mut conn = store.connect().await.unwrap();
555 conn.set_committer_watermark(
556 "test_processor",
557 CommitterWatermark {
558 epoch_hi_inclusive: 1,
559 checkpoint_hi_inclusive: 100,
560 tx_hi: 1000,
561 timestamp_ms_hi_inclusive: 1000000,
562 },
563 )
564 .await
565 .unwrap();
566
567 let indexer_args = IndexerArgs {
568 first_checkpoint: Some(50),
569 last_checkpoint: None,
570 pipeline: vec![],
571 skip_watermark: false,
572 };
573 let temp_dir = tempfile::tempdir().unwrap();
574 let client_args = ClientArgs {
575 local_ingestion_path: Some(temp_dir.path().to_owned()),
576 ..Default::default()
577 };
578
579 let ingestion_config = IngestionConfig::default();
580
581 let mut indexer = Indexer::new(
582 store,
583 indexer_args,
584 client_args,
585 ingestion_config,
586 None,
587 ®istry,
588 cancel,
589 )
590 .await
591 .unwrap();
592
593 indexer
594 .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
595 .await
596 .unwrap();
597
598 assert_eq!(indexer.first_checkpoint_from_watermark, 101);
599 }
600
601 #[tokio::test]
602 async fn test_indexer_concurrent_pipeline_disallow_inconsistent_first_checkpoint() {
603 let cancel = CancellationToken::new();
604 let registry = Registry::new();
605
606 let store = MockStore::default();
607 let mut conn = store.connect().await.unwrap();
608 conn.set_committer_watermark(
609 "test_processor",
610 CommitterWatermark {
611 epoch_hi_inclusive: 1,
612 checkpoint_hi_inclusive: 100,
613 tx_hi: 1000,
614 timestamp_ms_hi_inclusive: 1000000,
615 },
616 )
617 .await
618 .unwrap();
619
620 let indexer_args = IndexerArgs {
621 first_checkpoint: Some(1001),
622 last_checkpoint: None,
623 pipeline: vec![],
624 skip_watermark: false,
625 };
626 let temp_dir = tempfile::tempdir().unwrap();
627 let client_args = ClientArgs {
628 local_ingestion_path: Some(temp_dir.path().to_owned()),
629 ..Default::default()
630 };
631
632 let ingestion_config = IngestionConfig::default();
633
634 let mut indexer = Indexer::new(
635 store,
636 indexer_args,
637 client_args,
638 ingestion_config,
639 None,
640 ®istry,
641 cancel,
642 )
643 .await
644 .unwrap();
645
646 let result = indexer
647 .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
648 .await;
649
650 assert!(result.is_err());
651 }
652
653 #[tokio::test]
654 async fn test_indexer_concurrent_pipeline_allow_inconsistent_first_checkpoint_with_skip_watermark()
655 {
656 let cancel = CancellationToken::new();
657 let registry = Registry::new();
658
659 let store = MockStore::default();
660 let mut conn = store.connect().await.unwrap();
661 conn.set_committer_watermark(
662 "test_processor",
663 CommitterWatermark {
664 epoch_hi_inclusive: 1,
665 checkpoint_hi_inclusive: 100,
666 tx_hi: 1000,
667 timestamp_ms_hi_inclusive: 1000000,
668 },
669 )
670 .await
671 .unwrap();
672
673 let indexer_args = IndexerArgs {
674 first_checkpoint: Some(1001),
675 last_checkpoint: None,
676 pipeline: vec![],
677 skip_watermark: true,
678 };
679 let temp_dir = tempfile::tempdir().unwrap();
680 let client_args = ClientArgs {
681 local_ingestion_path: Some(temp_dir.path().to_owned()),
682 ..Default::default()
683 };
684
685 let ingestion_config = IngestionConfig::default();
686
687 let mut indexer = Indexer::new(
688 store,
689 indexer_args,
690 client_args,
691 ingestion_config,
692 None,
693 ®istry,
694 cancel,
695 )
696 .await
697 .unwrap();
698
699 let result = indexer
700 .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
701 .await;
702
703 assert!(result.is_ok());
704 }
705
706 #[tokio::test]
707 async fn test_indexer_sequential_pipeline_disallow_inconsistent_first_checkpoint() {
708 let cancel = CancellationToken::new();
709 let registry = Registry::new();
710
711 let store = MockStore::default();
712 let mut conn = store.connect().await.unwrap();
713 conn.set_committer_watermark(
714 "test_processor",
715 CommitterWatermark {
716 epoch_hi_inclusive: 1,
717 checkpoint_hi_inclusive: 100,
718 tx_hi: 1000,
719 timestamp_ms_hi_inclusive: 1000000,
720 },
721 )
722 .await
723 .unwrap();
724
725 let indexer_args = IndexerArgs {
726 first_checkpoint: Some(1001),
727 last_checkpoint: None,
728 pipeline: vec![],
729 skip_watermark: false,
730 };
731 let temp_dir = tempfile::tempdir().unwrap();
732 let client_args = ClientArgs {
733 local_ingestion_path: Some(temp_dir.path().to_owned()),
734 ..Default::default()
735 };
736
737 let ingestion_config = IngestionConfig::default();
738
739 let mut indexer = Indexer::new(
740 store,
741 indexer_args,
742 client_args,
743 ingestion_config,
744 None,
745 ®istry,
746 cancel,
747 )
748 .await
749 .unwrap();
750
751 let result = indexer
752 .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
753 .await;
754
755 assert!(result.is_err());
756 }
757
758 #[tokio::test]
759 async fn test_indexer_sequential_pipeline_disallow_inconsistent_first_checkpoint_with_skip_watermark()
760 {
761 let cancel = CancellationToken::new();
762 let registry = Registry::new();
763
764 let store = MockStore::default();
765 let mut conn = store.connect().await.unwrap();
766 conn.set_committer_watermark(
767 "test_processor",
768 CommitterWatermark {
769 epoch_hi_inclusive: 1,
770 checkpoint_hi_inclusive: 100,
771 tx_hi: 1000,
772 timestamp_ms_hi_inclusive: 1000000,
773 },
774 )
775 .await
776 .unwrap();
777
778 let indexer_args = IndexerArgs {
779 first_checkpoint: Some(1001),
780 last_checkpoint: None,
781 pipeline: vec![],
782 skip_watermark: true,
783 };
784 let temp_dir = tempfile::tempdir().unwrap();
785 let client_args = ClientArgs {
786 local_ingestion_path: Some(temp_dir.path().to_owned()),
787 ..Default::default()
788 };
789
790 let ingestion_config = IngestionConfig::default();
791
792 let mut indexer = Indexer::new(
793 store,
794 indexer_args,
795 client_args,
796 ingestion_config,
797 None,
798 ®istry,
799 cancel,
800 )
801 .await
802 .unwrap();
803
804 let result = indexer
805 .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
806 .await;
807
808 assert!(result.is_err());
809 }
810
811 #[tokio::test]
812 async fn test_indexer_sequential_pipeline_always_resume_from_watermark() {
813 let cancel = CancellationToken::new();
814 let registry = Registry::new();
815 let store = MockStore::default();
816 let pipeline_checkpoint_hi = 10;
817 let indexer_first_checkpoint = 5;
818 let num_ingested_checkpoints = 10;
819
820 let mut conn = store.connect().await.unwrap();
821 conn.set_committer_watermark(
822 "test_processor",
823 CommitterWatermark {
824 epoch_hi_inclusive: 1,
825 checkpoint_hi_inclusive: pipeline_checkpoint_hi,
826 tx_hi: 1000,
827 timestamp_ms_hi_inclusive: 1000000,
828 },
829 )
830 .await
831 .unwrap();
832
833 let indexer_args = IndexerArgs {
834 first_checkpoint: Some(indexer_first_checkpoint),
835 last_checkpoint: Some(indexer_first_checkpoint + num_ingested_checkpoints - 1),
836 pipeline: vec![],
837 skip_watermark: true,
838 };
839 let temp_dir = tempfile::tempdir().unwrap();
840 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
841 ingestion_dir: temp_dir.path().to_owned(),
842 starting_checkpoint: indexer_first_checkpoint,
843 num_checkpoints: num_ingested_checkpoints,
844 checkpoint_size: 2,
845 })
846 .await;
847
848 let client_args = ClientArgs {
849 local_ingestion_path: Some(temp_dir.path().to_owned()),
850 ..Default::default()
851 };
852
853 let ingestion_config = IngestionConfig::default();
854
855 let mut indexer = Indexer::new(
856 store,
857 indexer_args,
858 client_args,
859 ingestion_config,
860 None,
861 ®istry,
862 cancel,
863 )
864 .await
865 .unwrap();
866
867 let _ = indexer
868 .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
869 .await;
870
871 let metrics = indexer.metrics().clone();
872
873 indexer.run().await.unwrap().await.unwrap();
874
875 assert_eq!(
876 metrics.total_ingested_checkpoints.get(),
877 num_ingested_checkpoints
878 );
879 assert_eq!(
880 metrics
881 .total_watermarks_out_of_order
882 .get_metric_with_label_values(&["test_processor"])
883 .unwrap()
884 .get(),
885 pipeline_checkpoint_hi - indexer_first_checkpoint + 1
886 );
887 }
888
889 #[tokio::test]
890 async fn test_indexer_concurrent_pipeline_always_resume_from_first_checkpoint() {
891 let cancel = CancellationToken::new();
892 let registry = Registry::new();
893 let store = MockStore::default();
894 let pipeline_checkpoint_hi = 10;
895 let indexer_first_checkpoint = 5;
896 let num_ingested_checkpoints = 10;
897
898 let mut conn = store.connect().await.unwrap();
899 conn.set_committer_watermark(
900 "test_processor",
901 CommitterWatermark {
902 epoch_hi_inclusive: 1,
903 checkpoint_hi_inclusive: pipeline_checkpoint_hi,
904 tx_hi: 1000,
905 timestamp_ms_hi_inclusive: 1000000,
906 },
907 )
908 .await
909 .unwrap();
910
911 let indexer_args = IndexerArgs {
912 first_checkpoint: Some(indexer_first_checkpoint),
913 last_checkpoint: Some(indexer_first_checkpoint + num_ingested_checkpoints - 1),
914 pipeline: vec![],
915 skip_watermark: true,
916 };
917 let temp_dir = tempfile::tempdir().unwrap();
918 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
919 ingestion_dir: temp_dir.path().to_owned(),
920 starting_checkpoint: indexer_first_checkpoint,
921 num_checkpoints: num_ingested_checkpoints,
922 checkpoint_size: 2,
923 })
924 .await;
925
926 let client_args = ClientArgs {
927 local_ingestion_path: Some(temp_dir.path().to_owned()),
928 ..Default::default()
929 };
930
931 let ingestion_config = IngestionConfig::default();
932
933 let mut indexer = Indexer::new(
934 store,
935 indexer_args,
936 client_args,
937 ingestion_config,
938 None,
939 ®istry,
940 cancel,
941 )
942 .await
943 .unwrap();
944
945 let _ = indexer
946 .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
947 .await;
948
949 let metrics = indexer.metrics().clone();
950
951 indexer.run().await.unwrap().await.unwrap();
952
953 assert_eq!(
954 metrics.total_ingested_checkpoints.get(),
955 num_ingested_checkpoints
956 );
957 assert_eq!(
958 metrics
959 .total_watermarks_out_of_order
960 .get_metric_with_label_values(&["test_processor"])
961 .unwrap()
962 .get(),
963 0
964 );
965 }
966
967 #[tokio::test]
968 async fn test_multiple_sequential_pipelines_next_checkpoint() {
969 let cancel = CancellationToken::new();
970 let registry = Registry::new();
971 let store = MockStore::default();
972
973 let mut conn = store.connect().await.unwrap();
975
976 conn.set_committer_watermark(
978 MockHandler::NAME,
979 CommitterWatermark {
980 epoch_hi_inclusive: 0,
981 checkpoint_hi_inclusive: 10,
982 tx_hi: 20,
983 timestamp_ms_hi_inclusive: 10000,
984 },
985 )
986 .await
987 .unwrap();
988
989 conn.set_committer_watermark(
991 SequentialHandler::NAME,
992 CommitterWatermark {
993 epoch_hi_inclusive: 0,
994 checkpoint_hi_inclusive: 5,
995 tx_hi: 10,
996 timestamp_ms_hi_inclusive: 5000,
997 },
998 )
999 .await
1000 .unwrap();
1001
1002 let temp_dir = tempfile::tempdir().unwrap();
1004 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1005 ingestion_dir: temp_dir.path().to_owned(),
1006 starting_checkpoint: 0,
1007 num_checkpoints: 20,
1008 checkpoint_size: 2,
1009 })
1010 .await;
1011
1012 let indexer_args = IndexerArgs {
1013 first_checkpoint: None,
1014 last_checkpoint: Some(19),
1015 pipeline: vec![],
1016 skip_watermark: false,
1017 };
1018
1019 let client_args = ClientArgs {
1020 local_ingestion_path: Some(temp_dir.path().to_owned()),
1021 ..Default::default()
1022 };
1023
1024 let ingestion_config = IngestionConfig::default();
1025
1026 let mut indexer = Indexer::new(
1027 store.clone(),
1028 indexer_args,
1029 client_args,
1030 ingestion_config,
1031 None,
1032 ®istry,
1033 cancel.clone(),
1034 )
1035 .await
1036 .unwrap();
1037
1038 indexer
1040 .sequential_pipeline(
1041 MockHandler,
1042 pipeline::sequential::SequentialConfig::default(),
1043 )
1044 .await
1045 .unwrap();
1046
1047 assert_eq!(
1049 indexer.next_sequential_checkpoint(),
1050 Some(11),
1051 "next_sequential_checkpoint should be 11"
1052 );
1053
1054 indexer
1056 .sequential_pipeline(
1057 SequentialHandler,
1058 pipeline::sequential::SequentialConfig::default(),
1059 )
1060 .await
1061 .unwrap();
1062
1063 assert_eq!(
1065 indexer.next_sequential_checkpoint(),
1066 Some(6),
1067 "next_sequential_checkpoint should still be 6"
1068 );
1069
1070 indexer.run().await.unwrap().await.unwrap();
1072
1073 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1075 let watermark2 = conn
1076 .committer_watermark(SequentialHandler::NAME)
1077 .await
1078 .unwrap();
1079
1080 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1081 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1082 }
1083}