1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55 #[arg(long)]
62 pub first_checkpoint: Option<u64>,
63
64 #[arg(long)]
67 pub last_checkpoint: Option<u64>,
68
69 #[arg(long, action = clap::ArgAction::Append)]
72 pub pipeline: Vec<String>,
73
74 #[clap(flatten)]
76 pub task: TaskArgs,
77}
78
79#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82 #[arg(long, requires = "reader_interval_ms")]
94 task: Option<String>,
95
96 #[arg(long, requires = "task")]
107 reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111 store: S,
115
116 metrics: Arc<IndexerMetrics>,
118
119 ingestion_service: IngestionService,
121
122 first_checkpoint: Option<u64>,
125
126 last_checkpoint: Option<u64>,
129
130 next_checkpoint: u64,
133
134 next_sequential_checkpoint: Option<u64>,
137
138 task: Option<Task>,
150
151 enabled_pipelines: Option<BTreeSet<String>>,
155
156 added_pipelines: BTreeSet<&'static str>,
159
160 pipelines: Vec<Service>,
162}
163
164#[derive(Clone)]
166pub(crate) struct Task {
167 task: String,
170 reader_interval: Duration,
173}
174
175impl TaskArgs {
176 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
177 Self {
178 task: Some(task),
179 reader_interval_ms: Some(reader_interval_ms),
180 }
181 }
182
183 fn into_task(self) -> Option<Task> {
184 Some(Task {
185 task: self.task?,
186 reader_interval: Duration::from_millis(self.reader_interval_ms?),
187 })
188 }
189}
190
191impl<S: Store> Indexer<S> {
192 pub async fn new(
204 store: S,
205 indexer_args: IndexerArgs,
206 client_args: ClientArgs,
207 ingestion_config: IngestionConfig,
208 metrics_prefix: Option<&str>,
209 registry: &Registry,
210 ) -> Result<Self> {
211 let IndexerArgs {
212 first_checkpoint,
213 last_checkpoint,
214 pipeline,
215 task,
216 } = indexer_args;
217
218 let metrics = IndexerMetrics::new(metrics_prefix, registry);
219
220 let ingestion_service =
221 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
222
223 Ok(Self {
224 store,
225 metrics,
226 ingestion_service,
227 first_checkpoint,
228 last_checkpoint,
229 next_checkpoint: u64::MAX,
230 next_sequential_checkpoint: None,
231 task: task.into_task(),
232 enabled_pipelines: if pipeline.is_empty() {
233 None
234 } else {
235 Some(pipeline.into_iter().collect())
236 },
237 added_pipelines: BTreeSet::new(),
238 pipelines: vec![],
239 })
240 }
241
242 pub fn store(&self) -> &S {
244 &self.store
245 }
246
247 pub fn ingestion_client(&self) -> &IngestionClient {
249 self.ingestion_service.ingestion_client()
250 }
251
252 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
254 &self.metrics
255 }
256
257 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
259 self.ingestion_service.metrics()
260 }
261
262 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
264 self.added_pipelines.iter().copied().filter(|p| {
265 self.enabled_pipelines
266 .as_ref()
267 .is_none_or(|e| e.contains(*p))
268 })
269 }
270
271 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
275 self.next_sequential_checkpoint
276 }
277
278 pub async fn run(self) -> anyhow::Result<Service> {
284 if let Some(enabled_pipelines) = self.enabled_pipelines {
285 ensure!(
286 enabled_pipelines.is_empty(),
287 "Tried to enable pipelines that this indexer does not know about: \
288 {enabled_pipelines:#?}",
289 );
290 }
291
292 let start = self.next_checkpoint;
293 let end = self.last_checkpoint;
294 info!(start, end, "Ingestion range");
295
296 let mut service = self
297 .ingestion_service
298 .run(
299 (
300 Bound::Included(start),
301 end.map_or(Bound::Unbounded, Bound::Included),
302 ),
303 self.next_sequential_checkpoint,
304 )
305 .await
306 .context("Failed to start ingestion service")?;
307
308 for pipeline in self.pipelines {
309 service = service.merge(pipeline);
310 }
311
312 Ok(service)
313 }
314
315 async fn add_pipeline<P: Processor + 'static>(
324 &mut self,
325 pipeline_task: String,
326 ) -> Result<Option<u64>> {
327 ensure!(
328 self.added_pipelines.insert(P::NAME),
329 "Pipeline {:?} already added",
330 P::NAME,
331 );
332
333 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
334 && !enabled_pipelines.remove(P::NAME)
335 {
336 info!(pipeline = P::NAME, "Skipping");
337 return Ok(None);
338 }
339
340 let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
343 let mut conn = self.store.connect().await?;
344 let init_watermark = conn
345 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
346 .await
347 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
348
349 let next_checkpoint = if let Some(init_watermark) = init_watermark {
350 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
351 checkpoint_hi_inclusive + 1
352 } else {
353 0
354 }
355 } else {
356 proposed_next_checkpoint
357 };
358
359 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
360
361 Ok(Some(next_checkpoint))
362 }
363}
364
365impl<S: ConcurrentStore> Indexer<S> {
366 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
373 &mut self,
374 handler: H,
375 config: ConcurrentConfig,
376 ) -> Result<()> {
377 let pipeline_task =
378 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
379 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task).await? else {
380 return Ok(());
381 };
382
383 self.pipelines.push(concurrent::pipeline::<H>(
384 handler,
385 next_checkpoint,
386 config,
387 self.store.clone(),
388 self.task.clone(),
389 self.ingestion_service.subscribe().0,
390 self.metrics.clone(),
391 ));
392
393 Ok(())
394 }
395}
396
397impl<T: SequentialStore> Indexer<T> {
398 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
409 &mut self,
410 handler: H,
411 config: SequentialConfig,
412 ) -> Result<()> {
413 if self.task.is_some() {
414 bail!(
415 "Sequential pipelines do not support pipeline tasks. \
416 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417 Running the same pipeline under a different task would violate these guarantees."
418 );
419 }
420
421 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned()).await? else {
422 return Ok(());
423 };
424
425 self.next_sequential_checkpoint = Some(
427 self.next_sequential_checkpoint
428 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
429 );
430
431 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
432
433 self.pipelines.push(sequential::pipeline::<H>(
434 handler,
435 next_checkpoint,
436 config,
437 self.store.clone(),
438 checkpoint_rx,
439 commit_hi_tx,
440 self.metrics.clone(),
441 ));
442
443 Ok(())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use std::sync::Arc;
450
451 use async_trait::async_trait;
452 use clap::Parser;
453 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
454 use sui_synthetic_ingestion::synthetic_ingestion;
455 use tokio::sync::watch;
456
457 use crate::FieldCount;
458 use crate::config::ConcurrencyConfig;
459 use crate::ingestion::ingestion_client::IngestionClientArgs;
460 use crate::mocks::store::MockStore;
461 use crate::pipeline::CommitterConfig;
462 use crate::pipeline::Processor;
463 use crate::pipeline::concurrent::ConcurrentConfig;
464 use crate::store::CommitterWatermark;
465 use crate::store::ConcurrentConnection as _;
466 use crate::store::Connection as _;
467
468 use super::*;
469
470 #[allow(dead_code)]
471 #[derive(Clone, FieldCount)]
472 struct MockValue(u64);
473
474 struct ControllableHandler {
476 process_below: watch::Receiver<u64>,
478 }
479
480 impl ControllableHandler {
481 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
482 let (tx, rx) = watch::channel(limit);
483 (Self { process_below: rx }, tx)
484 }
485 }
486
487 #[async_trait]
488 impl Processor for ControllableHandler {
489 const NAME: &'static str = "controllable";
490 type Value = MockValue;
491
492 async fn process(
493 &self,
494 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
495 ) -> anyhow::Result<Vec<Self::Value>> {
496 let cp_num = checkpoint.summary.sequence_number;
497
498 self.process_below
500 .clone()
501 .wait_for(|&limit| cp_num <= limit)
502 .await
503 .ok();
504
505 Ok(vec![MockValue(cp_num)])
506 }
507 }
508
509 #[async_trait]
510 impl concurrent::Handler for ControllableHandler {
511 type Store = MockStore;
512 type Batch = Vec<MockValue>;
513
514 fn batch(
515 &self,
516 batch: &mut Self::Batch,
517 values: &mut std::vec::IntoIter<Self::Value>,
518 ) -> concurrent::BatchStatus {
519 batch.extend(values);
520 concurrent::BatchStatus::Ready
521 }
522
523 async fn commit<'a>(
524 &self,
525 batch: &Self::Batch,
526 conn: &mut <Self::Store as Store>::Connection<'a>,
527 ) -> anyhow::Result<usize> {
528 for value in batch {
529 conn.0
530 .commit_data(Self::NAME, value.0, vec![value.0])
531 .await?;
532 }
533 Ok(batch.len())
534 }
535 }
536
537 macro_rules! test_pipeline {
538 ($handler:ident, $name:literal) => {
539 struct $handler;
540
541 #[async_trait]
542 impl Processor for $handler {
543 const NAME: &'static str = $name;
544 type Value = MockValue;
545 async fn process(
546 &self,
547 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
548 ) -> anyhow::Result<Vec<Self::Value>> {
549 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
550 }
551 }
552
553 #[async_trait]
554 impl crate::pipeline::concurrent::Handler for $handler {
555 type Store = MockStore;
556 type Batch = Vec<Self::Value>;
557
558 fn batch(
559 &self,
560 batch: &mut Self::Batch,
561 values: &mut std::vec::IntoIter<Self::Value>,
562 ) -> crate::pipeline::concurrent::BatchStatus {
563 batch.extend(values);
564 crate::pipeline::concurrent::BatchStatus::Pending
565 }
566
567 async fn commit<'a>(
568 &self,
569 batch: &Self::Batch,
570 conn: &mut <Self::Store as Store>::Connection<'a>,
571 ) -> anyhow::Result<usize> {
572 for value in batch {
573 conn.0
574 .commit_data(Self::NAME, value.0, vec![value.0])
575 .await?;
576 }
577 Ok(batch.len())
578 }
579 }
580
581 #[async_trait]
582 impl crate::pipeline::sequential::Handler for $handler {
583 type Store = MockStore;
584 type Batch = Vec<Self::Value>;
585
586 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
587 batch.extend(values);
588 }
589
590 async fn commit<'a>(
591 &self,
592 _batch: &Self::Batch,
593 _conn: &mut <Self::Store as Store>::Connection<'a>,
594 ) -> anyhow::Result<usize> {
595 Ok(1)
596 }
597 }
598 };
599 }
600
601 test_pipeline!(MockHandler, "test_processor");
602 test_pipeline!(SequentialHandler, "sequential_handler");
603 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
604
605 async fn create_test_indexer(
608 store: MockStore,
609 indexer_args: IndexerArgs,
610 registry: &Registry,
611 ingestion_data: Option<(u64, u64)>,
612 ) -> (Indexer<MockStore>, tempfile::TempDir) {
613 let temp_dir = tempfile::tempdir().unwrap();
614 if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
615 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
616 ingestion_dir: temp_dir.path().to_owned(),
617 starting_checkpoint: 0,
618 num_checkpoints,
619 checkpoint_size,
620 })
621 .await;
622 }
623 let client_args = ClientArgs {
624 ingestion: IngestionClientArgs {
625 local_ingestion_path: Some(temp_dir.path().to_owned()),
626 ..Default::default()
627 },
628 ..Default::default()
629 };
630 let indexer = Indexer::new(
631 store,
632 indexer_args,
633 client_args,
634 IngestionConfig::default(),
635 None,
636 registry,
637 )
638 .await
639 .unwrap();
640 (indexer, temp_dir)
641 }
642
643 async fn set_committer_watermark(
644 conn: &mut <MockStore as Store>::Connection<'_>,
645 name: &str,
646 hi: u64,
647 ) {
648 conn.set_committer_watermark(
649 name,
650 CommitterWatermark {
651 checkpoint_hi_inclusive: hi,
652 ..Default::default()
653 },
654 )
655 .await
656 .unwrap();
657 }
658
659 async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
660 indexer: &mut Indexer<MockStore>,
661 handler: H,
662 ) {
663 indexer
664 .concurrent_pipeline(handler, ConcurrentConfig::default())
665 .await
666 .unwrap();
667 }
668
669 async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
670 indexer: &mut Indexer<MockStore>,
671 handler: H,
672 ) {
673 indexer
674 .sequential_pipeline(handler, SequentialConfig::default())
675 .await
676 .unwrap();
677 }
678
679 macro_rules! assert_out_of_order {
680 ($metrics:expr, $pipeline:expr, $expected:expr) => {
681 assert_eq!(
682 $metrics
683 .total_watermarks_out_of_order
684 .get_metric_with_label_values(&[$pipeline])
685 .unwrap()
686 .get(),
687 $expected,
688 );
689 };
690 }
691
692 async fn test_init_watermark(
693 first_checkpoint: Option<u64>,
694 is_concurrent: bool,
695 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
696 let registry = Registry::new();
697 let store = MockStore::default();
698
699 test_pipeline!(A, "pipeline_name");
700
701 let mut conn = store.connect().await.unwrap();
702
703 let indexer_args = IndexerArgs {
704 first_checkpoint,
705 ..IndexerArgs::default()
706 };
707 let (mut indexer, _temp_dir) =
708 create_test_indexer(store.clone(), indexer_args, ®istry, None).await;
709
710 if is_concurrent {
711 add_concurrent(&mut indexer, A).await;
712 } else {
713 add_sequential(&mut indexer, A).await;
714 }
715
716 (
717 conn.committer_watermark(A::NAME).await.unwrap(),
718 conn.pruner_watermark(A::NAME, Duration::ZERO)
719 .await
720 .unwrap(),
721 )
722 }
723
724 #[test]
725 fn test_arg_parsing() {
726 #[derive(Parser)]
727 struct Args {
728 #[clap(flatten)]
729 indexer: IndexerArgs,
730 }
731
732 let args = Args::try_parse_from([
733 "cmd",
734 "--first-checkpoint",
735 "10",
736 "--last-checkpoint",
737 "100",
738 "--pipeline",
739 "a",
740 "--pipeline",
741 "b",
742 "--task",
743 "t",
744 "--reader-interval-ms",
745 "5000",
746 ])
747 .unwrap();
748
749 assert_eq!(args.indexer.first_checkpoint, Some(10));
750 assert_eq!(args.indexer.last_checkpoint, Some(100));
751 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
752 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
753 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
754 }
755
756 #[tokio::test]
758 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
759 let registry = Registry::new();
760 let store = MockStore::default();
761
762 test_pipeline!(A, "concurrent_a");
763 test_pipeline!(B, "concurrent_b");
764 test_pipeline!(C, "sequential_c");
765 test_pipeline!(D, "sequential_d");
766
767 let mut conn = store.connect().await.unwrap();
768
769 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
770 set_committer_watermark(&mut conn, A::NAME, 100).await;
771
772 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
773 set_committer_watermark(&mut conn, B::NAME, 10).await;
774
775 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
776 set_committer_watermark(&mut conn, C::NAME, 1).await;
777
778 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
779 set_committer_watermark(&mut conn, D::NAME, 50).await;
780
781 let (mut indexer, _temp_dir) =
782 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
783
784 add_concurrent(&mut indexer, A).await;
785 add_concurrent(&mut indexer, B).await;
786 add_sequential(&mut indexer, C).await;
787 add_sequential(&mut indexer, D).await;
788
789 assert_eq!(indexer.first_checkpoint, None);
790 assert_eq!(indexer.last_checkpoint, None);
791 assert_eq!(indexer.next_checkpoint, 2);
792 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
793 }
794
795 #[tokio::test]
797 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
798 let registry = Registry::new();
799 let store = MockStore::default();
800
801 test_pipeline!(A, "concurrent_a");
802 test_pipeline!(B, "concurrent_b");
803 test_pipeline!(C, "sequential_c");
804 test_pipeline!(D, "sequential_d");
805
806 let mut conn = store.connect().await.unwrap();
807 set_committer_watermark(&mut conn, B::NAME, 10).await;
808 set_committer_watermark(&mut conn, C::NAME, 1).await;
809
810 let (mut indexer, _temp_dir) =
811 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
812
813 add_concurrent(&mut indexer, A).await;
814 add_concurrent(&mut indexer, B).await;
815 add_sequential(&mut indexer, C).await;
816 add_sequential(&mut indexer, D).await;
817
818 assert_eq!(indexer.first_checkpoint, None);
819 assert_eq!(indexer.last_checkpoint, None);
820 assert_eq!(indexer.next_checkpoint, 0);
821 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
822 }
823
824 #[tokio::test]
826 async fn test_next_checkpoint_smallest_is_0() {
827 let registry = Registry::new();
828 let store = MockStore::default();
829
830 test_pipeline!(A, "concurrent_a");
831 test_pipeline!(B, "concurrent_b");
832 test_pipeline!(C, "sequential_c");
833 test_pipeline!(D, "sequential_d");
834
835 let mut conn = store.connect().await.unwrap();
836 set_committer_watermark(&mut conn, A::NAME, 100).await;
837 set_committer_watermark(&mut conn, B::NAME, 10).await;
838 set_committer_watermark(&mut conn, C::NAME, 1).await;
839 set_committer_watermark(&mut conn, D::NAME, 0).await;
840
841 let (mut indexer, _temp_dir) =
842 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
843
844 add_concurrent(&mut indexer, A).await;
845 add_concurrent(&mut indexer, B).await;
846 add_sequential(&mut indexer, C).await;
847 add_sequential(&mut indexer, D).await;
848
849 assert_eq!(indexer.next_checkpoint, 1);
850 }
851
852 #[tokio::test]
855 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
856 let registry = Registry::new();
857 let store = MockStore::default();
858
859 test_pipeline!(A, "concurrent_a");
860 test_pipeline!(B, "concurrent_b");
861 test_pipeline!(C, "sequential_c");
862 test_pipeline!(D, "sequential_d");
863
864 let mut conn = store.connect().await.unwrap();
865 set_committer_watermark(&mut conn, B::NAME, 50).await;
866 set_committer_watermark(&mut conn, C::NAME, 10).await;
867
868 let indexer_args = IndexerArgs {
869 first_checkpoint: Some(5),
870 ..Default::default()
871 };
872 let (mut indexer, _temp_dir) =
873 create_test_indexer(store, indexer_args, ®istry, None).await;
874
875 add_concurrent(&mut indexer, A).await;
876 add_concurrent(&mut indexer, B).await;
877 add_sequential(&mut indexer, C).await;
878 add_sequential(&mut indexer, D).await;
879
880 assert_eq!(indexer.first_checkpoint, Some(5));
881 assert_eq!(indexer.last_checkpoint, None);
882 assert_eq!(indexer.next_checkpoint, 5);
883 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
884 }
885
886 #[tokio::test]
889 async fn test_next_checkpoint_ignore_first_checkpoint() {
890 let registry = Registry::new();
891 let store = MockStore::default();
892
893 test_pipeline!(B, "concurrent_b");
894 test_pipeline!(C, "sequential_c");
895
896 let mut conn = store.connect().await.unwrap();
897 set_committer_watermark(&mut conn, B::NAME, 50).await;
898 set_committer_watermark(&mut conn, C::NAME, 10).await;
899
900 let indexer_args = IndexerArgs {
901 first_checkpoint: Some(5),
902 ..Default::default()
903 };
904 let (mut indexer, _temp_dir) =
905 create_test_indexer(store, indexer_args, ®istry, None).await;
906
907 add_concurrent(&mut indexer, B).await;
908 add_sequential(&mut indexer, C).await;
909
910 assert_eq!(indexer.first_checkpoint, Some(5));
911 assert_eq!(indexer.last_checkpoint, None);
912 assert_eq!(indexer.next_checkpoint, 11);
913 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
914 }
915
916 #[tokio::test]
920 async fn test_next_checkpoint_large_first_checkpoint() {
921 let registry = Registry::new();
922 let store = MockStore::default();
923
924 test_pipeline!(A, "concurrent_a");
925 test_pipeline!(B, "concurrent_b");
926 test_pipeline!(C, "sequential_c");
927
928 let mut conn = store.connect().await.unwrap();
929 set_committer_watermark(&mut conn, B::NAME, 50).await;
930 set_committer_watermark(&mut conn, C::NAME, 10).await;
931
932 let indexer_args = IndexerArgs {
933 first_checkpoint: Some(24),
934 ..Default::default()
935 };
936 let (mut indexer, _temp_dir) =
937 create_test_indexer(store, indexer_args, ®istry, None).await;
938
939 add_concurrent(&mut indexer, A).await;
940 add_concurrent(&mut indexer, B).await;
941 add_sequential(&mut indexer, C).await;
942
943 assert_eq!(indexer.first_checkpoint, Some(24));
944 assert_eq!(indexer.last_checkpoint, None);
945 assert_eq!(indexer.next_checkpoint, 11);
946 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
947 }
948
949 #[tokio::test]
951 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
952 let registry = Registry::new();
953 let store = MockStore::default();
954
955 test_pipeline!(A, "concurrent_a");
956 test_pipeline!(B, "concurrent_b");
957 test_pipeline!(C, "sequential_c");
958 test_pipeline!(D, "sequential_d");
959
960 let mut conn = store.connect().await.unwrap();
961 set_committer_watermark(&mut conn, A::NAME, 5).await;
962 set_committer_watermark(&mut conn, B::NAME, 10).await;
963 set_committer_watermark(&mut conn, C::NAME, 15).await;
964 set_committer_watermark(&mut conn, D::NAME, 20).await;
965
966 let indexer_args = IndexerArgs {
967 last_checkpoint: Some(29),
968 ..Default::default()
969 };
970 let (mut indexer, _temp_dir) =
971 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
972
973 add_concurrent(&mut indexer, A).await;
974 add_concurrent(&mut indexer, B).await;
975 add_sequential(&mut indexer, C).await;
976 add_sequential(&mut indexer, D).await;
977
978 let ingestion_metrics = indexer.ingestion_metrics().clone();
979 let indexer_metrics = indexer.indexer_metrics().clone();
980
981 indexer.run().await.unwrap().join().await.unwrap();
982
983 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
984 assert_out_of_order!(indexer_metrics, A::NAME, 0);
985 assert_out_of_order!(indexer_metrics, B::NAME, 5);
986 assert_out_of_order!(indexer_metrics, C::NAME, 10);
987 assert_out_of_order!(indexer_metrics, D::NAME, 15);
988 }
989
990 #[tokio::test]
992 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
993 let registry = Registry::new();
994 let store = MockStore::default();
995
996 test_pipeline!(A, "concurrent_a");
997 test_pipeline!(B, "concurrent_b");
998 test_pipeline!(C, "sequential_c");
999 test_pipeline!(D, "sequential_d");
1000
1001 let mut conn = store.connect().await.unwrap();
1002 set_committer_watermark(&mut conn, B::NAME, 10).await;
1003 set_committer_watermark(&mut conn, C::NAME, 15).await;
1004 set_committer_watermark(&mut conn, D::NAME, 20).await;
1005
1006 let indexer_args = IndexerArgs {
1007 last_checkpoint: Some(29),
1008 ..Default::default()
1009 };
1010 let (mut indexer, _temp_dir) =
1011 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1012
1013 add_concurrent(&mut indexer, A).await;
1014 add_concurrent(&mut indexer, B).await;
1015 add_sequential(&mut indexer, C).await;
1016 add_sequential(&mut indexer, D).await;
1017
1018 let ingestion_metrics = indexer.ingestion_metrics().clone();
1019 let metrics = indexer.indexer_metrics().clone();
1020 indexer.run().await.unwrap().join().await.unwrap();
1021
1022 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1023 assert_out_of_order!(metrics, A::NAME, 0);
1024 assert_out_of_order!(metrics, B::NAME, 11);
1025 assert_out_of_order!(metrics, C::NAME, 16);
1026 assert_out_of_order!(metrics, D::NAME, 21);
1027 }
1028
1029 #[tokio::test]
1031 async fn test_indexer_ingestion_use_first_checkpoint() {
1032 let registry = Registry::new();
1033 let store = MockStore::default();
1034
1035 test_pipeline!(A, "concurrent_a");
1036 test_pipeline!(B, "concurrent_b");
1037 test_pipeline!(C, "sequential_c");
1038 test_pipeline!(D, "sequential_d");
1039
1040 let mut conn = store.connect().await.unwrap();
1041 set_committer_watermark(&mut conn, B::NAME, 10).await;
1042 set_committer_watermark(&mut conn, C::NAME, 15).await;
1043 set_committer_watermark(&mut conn, D::NAME, 20).await;
1044
1045 let indexer_args = IndexerArgs {
1046 first_checkpoint: Some(10),
1047 last_checkpoint: Some(29),
1048 ..Default::default()
1049 };
1050 let (mut indexer, _temp_dir) =
1051 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1052
1053 add_concurrent(&mut indexer, A).await;
1054 add_concurrent(&mut indexer, B).await;
1055 add_sequential(&mut indexer, C).await;
1056 add_sequential(&mut indexer, D).await;
1057
1058 let ingestion_metrics = indexer.ingestion_metrics().clone();
1059 let metrics = indexer.indexer_metrics().clone();
1060 indexer.run().await.unwrap().join().await.unwrap();
1061
1062 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1063 assert_out_of_order!(metrics, A::NAME, 0);
1064 assert_out_of_order!(metrics, B::NAME, 1);
1065 assert_out_of_order!(metrics, C::NAME, 6);
1066 assert_out_of_order!(metrics, D::NAME, 11);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1071 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1072 assert_eq!(committer_watermark, None);
1073 assert_eq!(pruner_watermark, None);
1074 }
1075
1076 #[tokio::test]
1077 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1078 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1079 assert_eq!(committer_watermark, None);
1080 assert_eq!(pruner_watermark, None);
1081 }
1082
1083 #[tokio::test]
1084 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1085 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1086
1087 let committer_watermark = committer_watermark.unwrap();
1088 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1089
1090 let pruner_watermark = pruner_watermark.unwrap();
1091 assert_eq!(pruner_watermark.reader_lo, 1);
1092 assert_eq!(pruner_watermark.pruner_hi, 1);
1093 }
1094
1095 #[tokio::test]
1096 async fn test_init_watermark_sequential() {
1097 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1098
1099 let committer_watermark = committer_watermark.unwrap();
1100 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1101
1102 let pruner_watermark = pruner_watermark.unwrap();
1103 assert_eq!(pruner_watermark.reader_lo, 1);
1104 assert_eq!(pruner_watermark.pruner_hi, 1);
1105 }
1106
1107 #[tokio::test]
1108 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1109 let registry = Registry::new();
1110 let store = MockStore::default();
1111
1112 let mut conn = store.connect().await.unwrap();
1113 set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1114 set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1115
1116 let indexer_args = IndexerArgs {
1117 first_checkpoint: None,
1118 last_checkpoint: Some(19),
1119 pipeline: vec![],
1120 ..Default::default()
1121 };
1122 let (mut indexer, _temp_dir) =
1123 create_test_indexer(store.clone(), indexer_args, ®istry, Some((20, 2))).await;
1124
1125 add_sequential(&mut indexer, MockHandler).await;
1127
1128 assert_eq!(
1130 indexer.next_sequential_checkpoint(),
1131 Some(11),
1132 "next_sequential_checkpoint should be 11"
1133 );
1134
1135 add_sequential(&mut indexer, SequentialHandler).await;
1137
1138 assert_eq!(
1140 indexer.next_sequential_checkpoint(),
1141 Some(6),
1142 "next_sequential_checkpoint should still be 6"
1143 );
1144
1145 indexer.run().await.unwrap().join().await.unwrap();
1147
1148 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1150 let watermark2 = conn
1151 .committer_watermark(SequentialHandler::NAME)
1152 .await
1153 .unwrap();
1154
1155 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1156 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1157 }
1158
1159 #[tokio::test]
1163 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1164 let registry = Registry::new();
1165 let store = MockStore::default();
1166
1167 let mut conn = store.connect().await.unwrap();
1170 set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1171 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1172 .await
1173 .unwrap();
1174
1175 let indexer_args = IndexerArgs {
1178 first_checkpoint: Some(0),
1179 last_checkpoint: Some(15),
1180 task: TaskArgs::tasked("task".to_string(), 10),
1181 ..Default::default()
1182 };
1183 let (mut tasked_indexer, _temp_dir) =
1184 create_test_indexer(store.clone(), indexer_args, ®istry, Some((16, 2))).await;
1185
1186 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1187
1188 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1189 let metrics = tasked_indexer.indexer_metrics().clone();
1190
1191 tasked_indexer.run().await.unwrap().join().await.unwrap();
1192
1193 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1194 assert_eq!(
1195 metrics
1196 .total_collector_skipped_checkpoints
1197 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1198 .unwrap()
1199 .get(),
1200 7
1201 );
1202 let data = store
1203 .data
1204 .get(MockCheckpointSequenceNumberHandler::NAME)
1205 .unwrap();
1206 assert_eq!(data.len(), 9);
1207 for i in 0..7 {
1208 assert!(data.get(&i).is_none());
1209 }
1210 for i in 7..16 {
1211 assert!(data.get(&i).is_some());
1212 }
1213 }
1214
1215 #[tokio::test]
1217 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1218 let registry = Registry::new();
1219 let store = MockStore::default();
1220
1221 let mut conn = store.connect().await.unwrap();
1222 set_committer_watermark(&mut conn, "test", 10).await;
1223 conn.set_reader_watermark("test", 5).await.unwrap();
1224
1225 let indexer_args = IndexerArgs {
1228 first_checkpoint: Some(9),
1229 last_checkpoint: Some(25),
1230 task: TaskArgs::tasked("task".to_string(), 10),
1231 ..Default::default()
1232 };
1233 let (mut tasked_indexer, _temp_dir) =
1234 create_test_indexer(store.clone(), indexer_args, ®istry, Some((26, 2))).await;
1235
1236 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1237
1238 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1239 let metrics = tasked_indexer.indexer_metrics().clone();
1240
1241 tasked_indexer.run().await.unwrap().join().await.unwrap();
1242
1243 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1244 assert_out_of_order!(metrics, "test", 0);
1245 assert_eq!(
1246 metrics
1247 .total_collector_skipped_checkpoints
1248 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1249 .unwrap()
1250 .get(),
1251 0
1252 );
1253
1254 let data = store.data.get("test").unwrap();
1255 assert_eq!(data.len(), 17);
1256 for i in 0..9 {
1257 assert!(data.get(&i).is_none());
1258 }
1259 for i in 9..26 {
1260 assert!(data.get(&i).is_some());
1261 }
1262 let main_pipeline_watermark = store.watermark("test").unwrap();
1263 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1265 assert_eq!(main_pipeline_watermark.reader_lo, 5);
1266 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1267 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1268 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1269 }
1270
1271 #[tokio::test]
1274 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1275 let registry = Registry::new();
1276 let store = MockStore::default();
1277 let mut conn = store.connect().await.unwrap();
1278 set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1280
1281 let indexer_args = IndexerArgs {
1283 first_checkpoint: Some(0),
1284 last_checkpoint: Some(500),
1285 task: TaskArgs::tasked("task".to_string(), 10 ),
1286 ..Default::default()
1287 };
1288 let (mut tasked_indexer, _temp_dir) =
1289 create_test_indexer(store.clone(), indexer_args, ®istry, Some((501, 2))).await;
1290 let mut allow_process = 10;
1291 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1293 let _ = tasked_indexer
1294 .concurrent_pipeline(
1295 controllable_handler,
1296 ConcurrentConfig {
1297 committer: CommitterConfig {
1298 collect_interval_ms: 10,
1299 watermark_interval_ms: 10,
1300 ..Default::default()
1301 },
1302 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1305 ..Default::default()
1306 },
1307 )
1308 .await;
1309 let metrics = tasked_indexer.indexer_metrics().clone();
1310
1311 let mut s_indexer = tasked_indexer.run().await.unwrap();
1312
1313 store
1317 .wait_for_watermark(
1318 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1319 10,
1320 Duration::from_secs(10),
1321 )
1322 .await;
1323
1324 conn.set_reader_watermark(ControllableHandler::NAME, 250)
1329 .await
1330 .unwrap();
1331
1332 let reader_lo = metrics
1333 .collector_reader_lo
1334 .with_label_values(&[ControllableHandler::NAME]);
1335
1336 let mut interval = tokio::time::interval(Duration::from_millis(10));
1340 while reader_lo.get() != 250 {
1341 interval.tick().await;
1342 allow_process += 1;
1344 assert!(
1345 allow_process <= 500,
1346 "Released all checkpoints but collector never observed new reader_lo"
1347 );
1348 process_below.send(allow_process).ok();
1349 }
1350
1351 process_below.send(500).ok();
1358
1359 s_indexer.join().await.unwrap();
1360
1361 let data = store.data.get(ControllableHandler::NAME).unwrap();
1362
1363 for chkpt in (allow_process + 1)..250 {
1365 assert!(
1366 data.get(&chkpt).is_none(),
1367 "Checkpoint {chkpt} should have been skipped"
1368 );
1369 }
1370
1371 for chkpt in 250..=500 {
1373 assert!(
1374 data.get(&chkpt).is_some(),
1375 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1376 );
1377 }
1378
1379 for chkpt in 0..=10 {
1381 assert!(
1382 data.get(&chkpt).is_some(),
1383 "Checkpoint {chkpt} should have been committed (baseline)"
1384 );
1385 }
1386 }
1387}