1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54 #[arg(long)]
61 pub first_checkpoint: Option<u64>,
62
63 #[arg(long)]
66 pub last_checkpoint: Option<u64>,
67
68 #[arg(long, action = clap::ArgAction::Append)]
71 pub pipeline: Vec<String>,
72
73 #[clap(flatten)]
75 pub task: TaskArgs,
76}
77
78#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81 #[arg(long, requires = "reader_interval_ms")]
93 task: Option<String>,
94
95 #[arg(long, requires = "task")]
106 reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110 store: S,
114
115 metrics: Arc<IndexerMetrics>,
117
118 ingestion_service: IngestionService,
120
121 first_checkpoint: Option<u64>,
124
125 last_checkpoint: Option<u64>,
128
129 latest_checkpoint: u64,
131
132 next_checkpoint: u64,
135
136 next_sequential_checkpoint: Option<u64>,
139
140 task: Option<Task>,
152
153 enabled_pipelines: Option<BTreeSet<String>>,
157
158 added_pipelines: BTreeSet<&'static str>,
161
162 pipelines: Vec<Service>,
164}
165
166#[derive(Clone)]
168pub(crate) struct Task {
169 task: String,
172 reader_interval: Duration,
175}
176
177impl TaskArgs {
178 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179 Self {
180 task: Some(task),
181 reader_interval_ms: Some(reader_interval_ms),
182 }
183 }
184
185 fn into_task(self) -> Option<Task> {
186 Some(Task {
187 task: self.task?,
188 reader_interval: Duration::from_millis(self.reader_interval_ms?),
189 })
190 }
191}
192
193impl<S: Store> Indexer<S> {
194 pub async fn new(
206 store: S,
207 indexer_args: IndexerArgs,
208 client_args: ClientArgs,
209 ingestion_config: IngestionConfig,
210 metrics_prefix: Option<&str>,
211 registry: &Registry,
212 ) -> anyhow::Result<Self> {
213 let ingestion_service =
214 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
215 Self::with_ingestion_service(
216 store,
217 indexer_args,
218 ingestion_service,
219 metrics_prefix,
220 registry,
221 )
222 .await
223 }
224
225 pub async fn with_ingestion_service(
233 store: S,
234 indexer_args: IndexerArgs,
235 mut ingestion_service: IngestionService,
236 metrics_prefix: Option<&str>,
237 registry: &Registry,
238 ) -> anyhow::Result<Self> {
239 let IndexerArgs {
240 first_checkpoint,
241 last_checkpoint,
242 pipeline,
243 task,
244 } = indexer_args;
245
246 let metrics = IndexerMetrics::new(metrics_prefix, registry);
247
248 let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
249
250 info!(latest_checkpoint);
251
252 Ok(Self {
253 store,
254 metrics,
255 ingestion_service,
256 first_checkpoint,
257 last_checkpoint,
258 latest_checkpoint,
259 next_checkpoint: u64::MAX,
260 next_sequential_checkpoint: None,
261 task: task.into_task(),
262 enabled_pipelines: if pipeline.is_empty() {
263 None
264 } else {
265 Some(pipeline.into_iter().collect())
266 },
267 added_pipelines: BTreeSet::new(),
268 pipelines: vec![],
269 })
270 }
271
272 pub fn store(&self) -> &S {
274 &self.store
275 }
276
277 pub fn ingestion_client(&self) -> &IngestionClient {
279 self.ingestion_service.ingestion_client()
280 }
281
282 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
284 &self.metrics
285 }
286
287 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
289 self.ingestion_service.metrics()
290 }
291
292 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
294 self.added_pipelines.iter().copied().filter(|p| {
295 self.enabled_pipelines
296 .as_ref()
297 .is_none_or(|e| e.contains(*p))
298 })
299 }
300
301 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
305 self.next_sequential_checkpoint
306 }
307
308 pub async fn run(self) -> anyhow::Result<Service> {
314 if let Some(enabled_pipelines) = self.enabled_pipelines {
315 ensure!(
316 enabled_pipelines.is_empty(),
317 "Tried to enable pipelines that this indexer does not know about: \
318 {enabled_pipelines:#?}",
319 );
320 }
321
322 let start = self.next_checkpoint;
323 let end = self.last_checkpoint;
324 info!(start, end, "Ingestion range");
325
326 let mut service = self
327 .ingestion_service
328 .run((
329 Bound::Included(start),
330 end.map_or(Bound::Unbounded, Bound::Included),
331 ))
332 .await
333 .context("Failed to start ingestion service")?;
334
335 for pipeline in self.pipelines {
336 service = service.merge(pipeline);
337 }
338
339 Ok(service)
340 }
341
342 async fn add_pipeline<P: Processor + 'static>(
351 &mut self,
352 pipeline_task: String,
353 retention: Option<u64>,
354 ) -> anyhow::Result<Option<u64>> {
355 ensure!(
356 self.added_pipelines.insert(P::NAME),
357 "Pipeline {:?} already added",
358 P::NAME,
359 );
360
361 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
362 && !enabled_pipelines.remove(P::NAME)
363 {
364 info!(pipeline = P::NAME, "Skipping");
365 return Ok(None);
366 }
367
368 let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
371 first_checkpoint
372 } else if let Some(retention) = retention {
373 self.latest_checkpoint.saturating_sub(retention)
374 } else {
375 0
376 };
377 let mut conn = self.store.connect().await?;
378 let init_watermark = conn
379 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
380 .await
381 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
382
383 let next_checkpoint = if let Some(init_watermark) = init_watermark {
384 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
385 checkpoint_hi_inclusive + 1
386 } else {
387 0
388 }
389 } else {
390 proposed_next_checkpoint
391 };
392
393 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
394
395 Ok(Some(next_checkpoint))
396 }
397}
398
399impl<S: ConcurrentStore> Indexer<S> {
400 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
407 &mut self,
408 handler: H,
409 config: ConcurrentConfig,
410 ) -> anyhow::Result<()> {
411 let pipeline_task =
412 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
413 let retention = config.pruner.as_ref().map(|p| p.retention);
414 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
415 return Ok(());
416 };
417
418 let checkpoint_rx = self
419 .ingestion_service
420 .subscribe_bounded(config.ingestion.subscriber_channel_size());
421
422 self.pipelines.push(concurrent::pipeline::<H>(
423 handler,
424 next_checkpoint,
425 config,
426 self.store.clone(),
427 self.task.clone(),
428 checkpoint_rx,
429 self.metrics.clone(),
430 ));
431
432 Ok(())
433 }
434}
435
436impl<T: SequentialStore> Indexer<T> {
437 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
445 &mut self,
446 handler: H,
447 config: SequentialConfig,
448 ) -> anyhow::Result<()> {
449 if self.task.is_some() {
450 bail!(
451 "Sequential pipelines do not support pipeline tasks. \
452 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
453 Running the same pipeline under a different task would violate these guarantees."
454 );
455 }
456
457 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
458 return Ok(());
459 };
460
461 self.next_sequential_checkpoint = Some(
463 self.next_sequential_checkpoint
464 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
465 );
466
467 let checkpoint_rx = self
468 .ingestion_service
469 .subscribe_bounded(config.ingestion.subscriber_channel_size());
470
471 self.pipelines.push(sequential::pipeline::<H>(
472 handler,
473 next_checkpoint,
474 config,
475 self.store.clone(),
476 checkpoint_rx,
477 self.metrics.clone(),
478 ));
479
480 Ok(())
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use std::sync::Arc;
487
488 use async_trait::async_trait;
489 use clap::Parser;
490 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
491 use sui_synthetic_ingestion::synthetic_ingestion;
492 use tokio::sync::watch;
493
494 use crate::FieldCount;
495 use crate::config::ConcurrencyConfig;
496 use crate::ingestion::ingestion_client::IngestionClientArgs;
497 use crate::ingestion::store_client::ObjectStoreWatermark;
498 use crate::ingestion::store_client::WATERMARK_PATH;
499 use crate::mocks::store::FallibleMockStore;
500 use crate::pipeline::CommitterConfig;
501 use crate::pipeline::Processor;
502 use crate::pipeline::concurrent::ConcurrentConfig;
503 use crate::store::CommitterWatermark;
504 use crate::store::ConcurrentConnection as _;
505 use crate::store::Connection as _;
506
507 use super::*;
508
509 #[allow(dead_code)]
510 #[derive(Clone, FieldCount)]
511 struct MockValue(u64);
512
513 struct ControllableHandler {
515 process_below: watch::Receiver<u64>,
517 }
518
519 impl ControllableHandler {
520 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
521 let (tx, rx) = watch::channel(limit);
522 (Self { process_below: rx }, tx)
523 }
524 }
525
526 #[async_trait]
527 impl Processor for ControllableHandler {
528 const NAME: &'static str = "controllable";
529 type Value = MockValue;
530
531 async fn process(
532 &self,
533 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
534 ) -> anyhow::Result<Vec<Self::Value>> {
535 let cp_num = checkpoint.summary.sequence_number;
536
537 self.process_below
539 .clone()
540 .wait_for(|&limit| cp_num <= limit)
541 .await
542 .ok();
543
544 Ok(vec![MockValue(cp_num)])
545 }
546 }
547
548 #[async_trait]
549 impl concurrent::Handler for ControllableHandler {
550 type Store = FallibleMockStore;
551 type Batch = Vec<MockValue>;
552
553 fn batch(
554 &self,
555 batch: &mut Self::Batch,
556 values: &mut std::vec::IntoIter<Self::Value>,
557 ) -> concurrent::BatchStatus {
558 batch.extend(values);
559 concurrent::BatchStatus::Ready
560 }
561
562 async fn commit<'a>(
563 &self,
564 batch: &Self::Batch,
565 conn: &mut <Self::Store as Store>::Connection<'a>,
566 ) -> anyhow::Result<usize> {
567 for value in batch {
568 conn.0
569 .commit_data(Self::NAME, value.0, vec![value.0])
570 .await?;
571 }
572 Ok(batch.len())
573 }
574 }
575
576 macro_rules! test_pipeline {
577 ($handler:ident, $name:literal) => {
578 struct $handler;
579
580 #[async_trait]
581 impl Processor for $handler {
582 const NAME: &'static str = $name;
583 type Value = MockValue;
584 async fn process(
585 &self,
586 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
587 ) -> anyhow::Result<Vec<Self::Value>> {
588 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
589 }
590 }
591
592 #[async_trait]
593 impl crate::pipeline::concurrent::Handler for $handler {
594 type Store = FallibleMockStore;
595 type Batch = Vec<Self::Value>;
596
597 fn batch(
598 &self,
599 batch: &mut Self::Batch,
600 values: &mut std::vec::IntoIter<Self::Value>,
601 ) -> crate::pipeline::concurrent::BatchStatus {
602 batch.extend(values);
603 crate::pipeline::concurrent::BatchStatus::Pending
604 }
605
606 async fn commit<'a>(
607 &self,
608 batch: &Self::Batch,
609 conn: &mut <Self::Store as Store>::Connection<'a>,
610 ) -> anyhow::Result<usize> {
611 for value in batch {
612 conn.0
613 .commit_data(Self::NAME, value.0, vec![value.0])
614 .await?;
615 }
616 Ok(batch.len())
617 }
618 }
619
620 #[async_trait]
621 impl crate::pipeline::sequential::Handler for $handler {
622 type Store = FallibleMockStore;
623 type Batch = Vec<Self::Value>;
624
625 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
626 batch.extend(values);
627 }
628
629 async fn commit<'a>(
630 &self,
631 _batch: &Self::Batch,
632 _conn: &mut <Self::Store as Store>::Connection<'a>,
633 ) -> anyhow::Result<usize> {
634 Ok(1)
635 }
636 }
637 };
638 }
639
640 test_pipeline!(MockHandler, "test_processor");
641 test_pipeline!(SequentialHandler, "sequential_handler");
642 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
643
644 fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
645 let dir = tempfile::tempdir().unwrap();
646 if let Some(cp) = latest_checkpoint {
647 let watermark_path = dir.path().join(WATERMARK_PATH);
648 std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
649 let watermark = ObjectStoreWatermark {
650 checkpoint_hi_inclusive: cp,
651 };
652 std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
653 }
654 dir
655 }
656
657 async fn create_test_indexer(
660 store: FallibleMockStore,
661 indexer_args: IndexerArgs,
662 registry: &Registry,
663 ingestion_data: Option<(u64, u64)>,
664 ) -> (Indexer<FallibleMockStore>, tempfile::TempDir) {
665 let temp_dir = init_ingestion_dir(None);
666 if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
667 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
668 ingestion_dir: temp_dir.path().to_owned(),
669 starting_checkpoint: 0,
670 num_checkpoints,
671 checkpoint_size,
672 })
673 .await;
674 }
675 let client_args = ClientArgs {
676 ingestion: IngestionClientArgs {
677 local_ingestion_path: Some(temp_dir.path().to_owned()),
678 ..Default::default()
679 },
680 ..Default::default()
681 };
682 let indexer = Indexer::new(
683 store,
684 indexer_args,
685 client_args,
686 IngestionConfig::default(),
687 None,
688 registry,
689 )
690 .await
691 .unwrap();
692 (indexer, temp_dir)
693 }
694
695 async fn set_committer_watermark(
696 conn: &mut <FallibleMockStore as Store>::Connection<'_>,
697 name: &str,
698 hi: u64,
699 ) {
700 conn.set_committer_watermark(
701 name,
702 CommitterWatermark {
703 checkpoint_hi_inclusive: hi,
704 ..Default::default()
705 },
706 )
707 .await
708 .unwrap();
709 }
710
711 async fn add_concurrent<H: concurrent::Handler<Store = FallibleMockStore>>(
712 indexer: &mut Indexer<FallibleMockStore>,
713 handler: H,
714 ) {
715 indexer
716 .concurrent_pipeline(handler, ConcurrentConfig::default())
717 .await
718 .unwrap();
719 }
720
721 async fn add_sequential<H: sequential::Handler<Store = FallibleMockStore>>(
722 indexer: &mut Indexer<FallibleMockStore>,
723 handler: H,
724 ) {
725 indexer
726 .sequential_pipeline(handler, SequentialConfig::default())
727 .await
728 .unwrap();
729 }
730
731 macro_rules! assert_out_of_order {
732 ($metrics:expr, $pipeline:expr, $expected:expr) => {
733 assert_eq!(
734 $metrics
735 .total_watermarks_out_of_order
736 .get_metric_with_label_values(&[$pipeline])
737 .unwrap()
738 .get(),
739 $expected,
740 );
741 };
742 }
743
744 async fn test_init_watermark(
745 first_checkpoint: Option<u64>,
746 is_concurrent: bool,
747 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
748 let registry = Registry::new();
749 let store = FallibleMockStore::default();
750
751 test_pipeline!(A, "pipeline_name");
752
753 let mut conn = store.connect().await.unwrap();
754
755 let indexer_args = IndexerArgs {
756 first_checkpoint,
757 ..IndexerArgs::default()
758 };
759 let (mut indexer, _temp_dir) =
760 create_test_indexer(store.clone(), indexer_args, ®istry, None).await;
761
762 if is_concurrent {
763 add_concurrent(&mut indexer, A).await;
764 } else {
765 add_sequential(&mut indexer, A).await;
766 }
767
768 (
769 conn.committer_watermark(A::NAME).await.unwrap(),
770 conn.pruner_watermark(A::NAME, Duration::ZERO)
771 .await
772 .unwrap(),
773 )
774 }
775
776 const LATEST_CHECKPOINT: u64 = 10;
777
778 async fn test_next_checkpoint(
783 watermark: Option<u64>,
784 first_checkpoint: Option<u64>,
785 concurrent_config: ConcurrentConfig,
786 ) -> Indexer<FallibleMockStore> {
787 let registry = Registry::new();
788 let store = FallibleMockStore::default();
789
790 test_pipeline!(A, "concurrent_a");
791
792 if let Some(checkpoint_hi_inclusive) = watermark {
793 let mut conn = store.connect().await.unwrap();
794 conn.set_committer_watermark(
795 A::NAME,
796 CommitterWatermark {
797 checkpoint_hi_inclusive,
798 ..Default::default()
799 },
800 )
801 .await
802 .unwrap();
803 }
804
805 let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
806 let mut indexer = Indexer::new(
807 store,
808 IndexerArgs {
809 first_checkpoint,
810 ..Default::default()
811 },
812 ClientArgs {
813 ingestion: IngestionClientArgs {
814 local_ingestion_path: Some(temp_dir.path().to_owned()),
815 ..Default::default()
816 },
817 ..Default::default()
818 },
819 IngestionConfig::default(),
820 None,
821 ®istry,
822 )
823 .await
824 .unwrap();
825
826 assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
827
828 indexer
829 .concurrent_pipeline::<A>(A, concurrent_config)
830 .await
831 .unwrap();
832
833 indexer
834 }
835
836 fn pruner_config(retention: u64) -> ConcurrentConfig {
837 ConcurrentConfig {
838 pruner: Some(concurrent::PrunerConfig {
839 retention,
840 ..Default::default()
841 }),
842 ..Default::default()
843 }
844 }
845
846 #[test]
847 fn test_arg_parsing() {
848 #[derive(Parser)]
849 struct Args {
850 #[clap(flatten)]
851 indexer: IndexerArgs,
852 }
853
854 let args = Args::try_parse_from([
855 "cmd",
856 "--first-checkpoint",
857 "10",
858 "--last-checkpoint",
859 "100",
860 "--pipeline",
861 "a",
862 "--pipeline",
863 "b",
864 "--task",
865 "t",
866 "--reader-interval-ms",
867 "5000",
868 ])
869 .unwrap();
870
871 assert_eq!(args.indexer.first_checkpoint, Some(10));
872 assert_eq!(args.indexer.last_checkpoint, Some(100));
873 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
874 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
875 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
876 }
877
878 #[tokio::test]
880 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
881 let registry = Registry::new();
882 let store = FallibleMockStore::default();
883
884 test_pipeline!(A, "concurrent_a");
885 test_pipeline!(B, "concurrent_b");
886 test_pipeline!(C, "sequential_c");
887 test_pipeline!(D, "sequential_d");
888
889 let mut conn = store.connect().await.unwrap();
890
891 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
892 set_committer_watermark(&mut conn, A::NAME, 100).await;
893
894 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
895 set_committer_watermark(&mut conn, B::NAME, 10).await;
896
897 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
898 set_committer_watermark(&mut conn, C::NAME, 1).await;
899
900 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
901 set_committer_watermark(&mut conn, D::NAME, 50).await;
902
903 let (mut indexer, _temp_dir) =
904 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
905
906 add_concurrent(&mut indexer, A).await;
907 add_concurrent(&mut indexer, B).await;
908 add_sequential(&mut indexer, C).await;
909 add_sequential(&mut indexer, D).await;
910
911 assert_eq!(indexer.first_checkpoint, None);
912 assert_eq!(indexer.last_checkpoint, None);
913 assert_eq!(indexer.latest_checkpoint, 0);
914 assert_eq!(indexer.next_checkpoint, 2);
915 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
916 }
917
918 #[tokio::test]
920 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
921 let registry = Registry::new();
922 let store = FallibleMockStore::default();
923
924 test_pipeline!(A, "concurrent_a");
925 test_pipeline!(B, "concurrent_b");
926 test_pipeline!(C, "sequential_c");
927 test_pipeline!(D, "sequential_d");
928
929 let mut conn = store.connect().await.unwrap();
930 set_committer_watermark(&mut conn, B::NAME, 10).await;
931 set_committer_watermark(&mut conn, C::NAME, 1).await;
932
933 let (mut indexer, _temp_dir) =
934 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
935
936 add_concurrent(&mut indexer, A).await;
937 add_concurrent(&mut indexer, B).await;
938 add_sequential(&mut indexer, C).await;
939 add_sequential(&mut indexer, D).await;
940
941 assert_eq!(indexer.first_checkpoint, None);
942 assert_eq!(indexer.last_checkpoint, None);
943 assert_eq!(indexer.latest_checkpoint, 0);
944 assert_eq!(indexer.next_checkpoint, 0);
945 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
946 }
947
948 #[tokio::test]
950 async fn test_next_checkpoint_smallest_is_0() {
951 let registry = Registry::new();
952 let store = FallibleMockStore::default();
953
954 test_pipeline!(A, "concurrent_a");
955 test_pipeline!(B, "concurrent_b");
956 test_pipeline!(C, "sequential_c");
957 test_pipeline!(D, "sequential_d");
958
959 let mut conn = store.connect().await.unwrap();
960 set_committer_watermark(&mut conn, A::NAME, 100).await;
961 set_committer_watermark(&mut conn, B::NAME, 10).await;
962 set_committer_watermark(&mut conn, C::NAME, 1).await;
963 set_committer_watermark(&mut conn, D::NAME, 0).await;
964
965 let (mut indexer, _temp_dir) =
966 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
967
968 add_concurrent(&mut indexer, A).await;
969 add_concurrent(&mut indexer, B).await;
970 add_sequential(&mut indexer, C).await;
971 add_sequential(&mut indexer, D).await;
972
973 assert_eq!(indexer.next_checkpoint, 1);
974 }
975
976 #[tokio::test]
979 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
980 let registry = Registry::new();
981 let store = FallibleMockStore::default();
982
983 test_pipeline!(A, "concurrent_a");
984 test_pipeline!(B, "concurrent_b");
985 test_pipeline!(C, "sequential_c");
986 test_pipeline!(D, "sequential_d");
987
988 let mut conn = store.connect().await.unwrap();
989 set_committer_watermark(&mut conn, B::NAME, 50).await;
990 set_committer_watermark(&mut conn, C::NAME, 10).await;
991
992 let indexer_args = IndexerArgs {
993 first_checkpoint: Some(5),
994 ..Default::default()
995 };
996 let (mut indexer, _temp_dir) =
997 create_test_indexer(store, indexer_args, ®istry, None).await;
998
999 add_concurrent(&mut indexer, A).await;
1000 add_concurrent(&mut indexer, B).await;
1001 add_sequential(&mut indexer, C).await;
1002 add_sequential(&mut indexer, D).await;
1003
1004 assert_eq!(indexer.first_checkpoint, Some(5));
1005 assert_eq!(indexer.last_checkpoint, None);
1006 assert_eq!(indexer.latest_checkpoint, 0);
1007 assert_eq!(indexer.next_checkpoint, 5);
1008 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1009 }
1010
1011 #[tokio::test]
1014 async fn test_next_checkpoint_ignore_first_checkpoint() {
1015 let registry = Registry::new();
1016 let store = FallibleMockStore::default();
1017
1018 test_pipeline!(B, "concurrent_b");
1019 test_pipeline!(C, "sequential_c");
1020
1021 let mut conn = store.connect().await.unwrap();
1022 set_committer_watermark(&mut conn, B::NAME, 50).await;
1023 set_committer_watermark(&mut conn, C::NAME, 10).await;
1024
1025 let indexer_args = IndexerArgs {
1026 first_checkpoint: Some(5),
1027 ..Default::default()
1028 };
1029 let (mut indexer, _temp_dir) =
1030 create_test_indexer(store, indexer_args, ®istry, None).await;
1031
1032 add_concurrent(&mut indexer, B).await;
1033 add_sequential(&mut indexer, C).await;
1034
1035 assert_eq!(indexer.first_checkpoint, Some(5));
1036 assert_eq!(indexer.last_checkpoint, None);
1037 assert_eq!(indexer.latest_checkpoint, 0);
1038 assert_eq!(indexer.next_checkpoint, 11);
1039 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1040 }
1041
1042 #[tokio::test]
1046 async fn test_next_checkpoint_large_first_checkpoint() {
1047 let registry = Registry::new();
1048 let store = FallibleMockStore::default();
1049
1050 test_pipeline!(A, "concurrent_a");
1051 test_pipeline!(B, "concurrent_b");
1052 test_pipeline!(C, "sequential_c");
1053
1054 let mut conn = store.connect().await.unwrap();
1055 set_committer_watermark(&mut conn, B::NAME, 50).await;
1056 set_committer_watermark(&mut conn, C::NAME, 10).await;
1057
1058 let indexer_args = IndexerArgs {
1059 first_checkpoint: Some(24),
1060 ..Default::default()
1061 };
1062 let (mut indexer, _temp_dir) =
1063 create_test_indexer(store, indexer_args, ®istry, None).await;
1064
1065 add_concurrent(&mut indexer, A).await;
1066 add_concurrent(&mut indexer, B).await;
1067 add_sequential(&mut indexer, C).await;
1068
1069 assert_eq!(indexer.first_checkpoint, Some(24));
1070 assert_eq!(indexer.last_checkpoint, None);
1071 assert_eq!(indexer.latest_checkpoint, 0);
1072 assert_eq!(indexer.next_checkpoint, 11);
1073 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1074 }
1075
1076 #[tokio::test]
1078 async fn test_latest_checkpoint_from_watermark() {
1079 let registry = Registry::new();
1080 let store = FallibleMockStore::default();
1081 let temp_dir = init_ingestion_dir(Some(30));
1082 let indexer = Indexer::new(
1083 store,
1084 IndexerArgs::default(),
1085 ClientArgs {
1086 ingestion: IngestionClientArgs {
1087 local_ingestion_path: Some(temp_dir.path().to_owned()),
1088 ..Default::default()
1089 },
1090 ..Default::default()
1091 },
1092 IngestionConfig::default(),
1093 None,
1094 ®istry,
1095 )
1096 .await
1097 .unwrap();
1098
1099 assert_eq!(indexer.latest_checkpoint, 30);
1100 }
1101
1102 #[tokio::test]
1105 async fn test_next_checkpoint_with_pruner_uses_retention() {
1106 let retention = LATEST_CHECKPOINT - 1;
1107 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1108 assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1109 }
1110
1111 #[tokio::test]
1113 async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1114 let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1115 assert_eq!(indexer.next_checkpoint, 0);
1116 }
1117
1118 #[tokio::test]
1120 async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1121 let retention = LATEST_CHECKPOINT - 1;
1122 let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1123 assert_eq!(indexer.next_checkpoint, 6);
1124 }
1125
1126 #[tokio::test]
1129 async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1130 let retention = LATEST_CHECKPOINT - 1;
1131 let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1132 assert_eq!(indexer.next_checkpoint, 2);
1133 }
1134
1135 #[tokio::test]
1137 async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1138 let retention = LATEST_CHECKPOINT + 1;
1139 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1140 assert_eq!(indexer.next_checkpoint, 0);
1141 }
1142
1143 #[tokio::test]
1145 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1146 let registry = Registry::new();
1147 let store = FallibleMockStore::default();
1148
1149 test_pipeline!(A, "concurrent_a");
1150 test_pipeline!(B, "concurrent_b");
1151 test_pipeline!(C, "sequential_c");
1152 test_pipeline!(D, "sequential_d");
1153
1154 let mut conn = store.connect().await.unwrap();
1155 set_committer_watermark(&mut conn, A::NAME, 5).await;
1156 set_committer_watermark(&mut conn, B::NAME, 10).await;
1157 set_committer_watermark(&mut conn, C::NAME, 15).await;
1158 set_committer_watermark(&mut conn, D::NAME, 20).await;
1159
1160 let indexer_args = IndexerArgs {
1161 last_checkpoint: Some(29),
1162 ..Default::default()
1163 };
1164 let (mut indexer, _temp_dir) =
1165 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1166
1167 add_concurrent(&mut indexer, A).await;
1168 add_concurrent(&mut indexer, B).await;
1169 add_sequential(&mut indexer, C).await;
1170 add_sequential(&mut indexer, D).await;
1171
1172 let ingestion_metrics = indexer.ingestion_metrics().clone();
1173 let indexer_metrics = indexer.indexer_metrics().clone();
1174
1175 indexer.run().await.unwrap().join().await.unwrap();
1176
1177 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1178 assert_out_of_order!(indexer_metrics, A::NAME, 0);
1179 assert_out_of_order!(indexer_metrics, B::NAME, 5);
1180 assert_out_of_order!(indexer_metrics, C::NAME, 10);
1181 assert_out_of_order!(indexer_metrics, D::NAME, 15);
1182 }
1183
1184 #[tokio::test]
1186 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1187 let registry = Registry::new();
1188 let store = FallibleMockStore::default();
1189
1190 test_pipeline!(A, "concurrent_a");
1191 test_pipeline!(B, "concurrent_b");
1192 test_pipeline!(C, "sequential_c");
1193 test_pipeline!(D, "sequential_d");
1194
1195 let mut conn = store.connect().await.unwrap();
1196 set_committer_watermark(&mut conn, A::NAME, 5).await;
1197 set_committer_watermark(&mut conn, B::NAME, 10).await;
1198 set_committer_watermark(&mut conn, C::NAME, 15).await;
1199 set_committer_watermark(&mut conn, D::NAME, 20).await;
1200
1201 let indexer_args = IndexerArgs {
1202 first_checkpoint: Some(3),
1203 last_checkpoint: Some(29),
1204 ..Default::default()
1205 };
1206 let (mut indexer, _temp_dir) =
1207 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1208
1209 add_concurrent(&mut indexer, A).await;
1210 add_concurrent(&mut indexer, B).await;
1211 add_sequential(&mut indexer, C).await;
1212 add_sequential(&mut indexer, D).await;
1213
1214 let ingestion_metrics = indexer.ingestion_metrics().clone();
1215 let metrics = indexer.indexer_metrics().clone();
1216 indexer.run().await.unwrap().join().await.unwrap();
1217
1218 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1219 assert_out_of_order!(metrics, A::NAME, 0);
1220 assert_out_of_order!(metrics, B::NAME, 5);
1221 assert_out_of_order!(metrics, C::NAME, 10);
1222 assert_out_of_order!(metrics, D::NAME, 15);
1223 }
1224
1225 #[tokio::test]
1227 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1228 let registry = Registry::new();
1229 let store = FallibleMockStore::default();
1230
1231 test_pipeline!(A, "concurrent_a");
1232 test_pipeline!(B, "concurrent_b");
1233 test_pipeline!(C, "sequential_c");
1234 test_pipeline!(D, "sequential_d");
1235
1236 let mut conn = store.connect().await.unwrap();
1237 set_committer_watermark(&mut conn, B::NAME, 10).await;
1238 set_committer_watermark(&mut conn, C::NAME, 15).await;
1239 set_committer_watermark(&mut conn, D::NAME, 20).await;
1240
1241 let indexer_args = IndexerArgs {
1242 last_checkpoint: Some(29),
1243 ..Default::default()
1244 };
1245 let (mut indexer, _temp_dir) =
1246 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1247
1248 add_concurrent(&mut indexer, A).await;
1249 add_concurrent(&mut indexer, B).await;
1250 add_sequential(&mut indexer, C).await;
1251 add_sequential(&mut indexer, D).await;
1252
1253 let ingestion_metrics = indexer.ingestion_metrics().clone();
1254 let metrics = indexer.indexer_metrics().clone();
1255 indexer.run().await.unwrap().join().await.unwrap();
1256
1257 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1258 assert_out_of_order!(metrics, A::NAME, 0);
1259 assert_out_of_order!(metrics, B::NAME, 11);
1260 assert_out_of_order!(metrics, C::NAME, 16);
1261 assert_out_of_order!(metrics, D::NAME, 21);
1262 }
1263
1264 #[tokio::test]
1266 async fn test_indexer_ingestion_use_first_checkpoint() {
1267 let registry = Registry::new();
1268 let store = FallibleMockStore::default();
1269
1270 test_pipeline!(A, "concurrent_a");
1271 test_pipeline!(B, "concurrent_b");
1272 test_pipeline!(C, "sequential_c");
1273 test_pipeline!(D, "sequential_d");
1274
1275 let mut conn = store.connect().await.unwrap();
1276 set_committer_watermark(&mut conn, B::NAME, 10).await;
1277 set_committer_watermark(&mut conn, C::NAME, 15).await;
1278 set_committer_watermark(&mut conn, D::NAME, 20).await;
1279
1280 let indexer_args = IndexerArgs {
1281 first_checkpoint: Some(10),
1282 last_checkpoint: Some(29),
1283 ..Default::default()
1284 };
1285 let (mut indexer, _temp_dir) =
1286 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1287
1288 add_concurrent(&mut indexer, A).await;
1289 add_concurrent(&mut indexer, B).await;
1290 add_sequential(&mut indexer, C).await;
1291 add_sequential(&mut indexer, D).await;
1292
1293 let ingestion_metrics = indexer.ingestion_metrics().clone();
1294 let metrics = indexer.indexer_metrics().clone();
1295 indexer.run().await.unwrap().join().await.unwrap();
1296
1297 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1298 assert_out_of_order!(metrics, A::NAME, 0);
1299 assert_out_of_order!(metrics, B::NAME, 1);
1300 assert_out_of_order!(metrics, C::NAME, 6);
1301 assert_out_of_order!(metrics, D::NAME, 11);
1302 }
1303
1304 #[tokio::test]
1305 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1306 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1307 assert_eq!(committer_watermark, None);
1308 assert_eq!(pruner_watermark, None);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1313 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1314 assert_eq!(committer_watermark, None);
1315 assert_eq!(pruner_watermark, None);
1316 }
1317
1318 #[tokio::test]
1319 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1320 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1321
1322 let committer_watermark = committer_watermark.unwrap();
1323 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1324
1325 let pruner_watermark = pruner_watermark.unwrap();
1326 assert_eq!(pruner_watermark.reader_lo, 1);
1327 assert_eq!(pruner_watermark.pruner_hi, 1);
1328 }
1329
1330 #[tokio::test]
1331 async fn test_init_watermark_sequential() {
1332 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1333
1334 let committer_watermark = committer_watermark.unwrap();
1335 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1336
1337 let pruner_watermark = pruner_watermark.unwrap();
1338 assert_eq!(pruner_watermark.reader_lo, 1);
1339 assert_eq!(pruner_watermark.pruner_hi, 1);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1344 let registry = Registry::new();
1345 let store = FallibleMockStore::default();
1346
1347 let mut conn = store.connect().await.unwrap();
1348 set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1349 set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1350
1351 let indexer_args = IndexerArgs {
1352 first_checkpoint: None,
1353 last_checkpoint: Some(19),
1354 pipeline: vec![],
1355 ..Default::default()
1356 };
1357 let (mut indexer, _temp_dir) =
1358 create_test_indexer(store.clone(), indexer_args, ®istry, Some((20, 2))).await;
1359
1360 add_sequential(&mut indexer, MockHandler).await;
1362
1363 assert_eq!(
1365 indexer.next_sequential_checkpoint(),
1366 Some(11),
1367 "next_sequential_checkpoint should be 11"
1368 );
1369
1370 add_sequential(&mut indexer, SequentialHandler).await;
1372
1373 assert_eq!(
1375 indexer.next_sequential_checkpoint(),
1376 Some(6),
1377 "next_sequential_checkpoint should still be 6"
1378 );
1379
1380 indexer.run().await.unwrap().join().await.unwrap();
1382
1383 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1385 let watermark2 = conn
1386 .committer_watermark(SequentialHandler::NAME)
1387 .await
1388 .unwrap();
1389
1390 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1391 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1392 }
1393
1394 #[tokio::test]
1398 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1399 let registry = Registry::new();
1400 let store = FallibleMockStore::default();
1401
1402 let mut conn = store.connect().await.unwrap();
1405 set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1406 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1407 .await
1408 .unwrap();
1409
1410 let indexer_args = IndexerArgs {
1413 first_checkpoint: Some(0),
1414 last_checkpoint: Some(15),
1415 task: TaskArgs::tasked("task".to_string(), 10),
1416 ..Default::default()
1417 };
1418 let (mut tasked_indexer, _temp_dir) =
1419 create_test_indexer(store.clone(), indexer_args, ®istry, Some((16, 2))).await;
1420
1421 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1422
1423 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1424 let metrics = tasked_indexer.indexer_metrics().clone();
1425
1426 tasked_indexer.run().await.unwrap().join().await.unwrap();
1427
1428 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1429 assert_eq!(
1430 metrics
1431 .total_collector_skipped_checkpoints
1432 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1433 .unwrap()
1434 .get(),
1435 7
1436 );
1437 let data = store
1438 .data
1439 .get(MockCheckpointSequenceNumberHandler::NAME)
1440 .unwrap();
1441 assert_eq!(data.len(), 9);
1442 for i in 0..7 {
1443 assert!(data.get(&i).is_none());
1444 }
1445 for i in 7..16 {
1446 assert!(data.get(&i).is_some());
1447 }
1448 }
1449
1450 #[tokio::test]
1452 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1453 let registry = Registry::new();
1454 let store = FallibleMockStore::default();
1455
1456 let mut conn = store.connect().await.unwrap();
1457 set_committer_watermark(&mut conn, "test", 10).await;
1458 conn.set_reader_watermark("test", 5).await.unwrap();
1459
1460 let indexer_args = IndexerArgs {
1463 first_checkpoint: Some(9),
1464 last_checkpoint: Some(25),
1465 task: TaskArgs::tasked("task".to_string(), 10),
1466 ..Default::default()
1467 };
1468 let (mut tasked_indexer, _temp_dir) =
1469 create_test_indexer(store.clone(), indexer_args, ®istry, Some((26, 2))).await;
1470
1471 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1472
1473 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1474 let metrics = tasked_indexer.indexer_metrics().clone();
1475
1476 tasked_indexer.run().await.unwrap().join().await.unwrap();
1477
1478 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1479 assert_out_of_order!(metrics, "test", 0);
1480 assert_eq!(
1481 metrics
1482 .total_collector_skipped_checkpoints
1483 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1484 .unwrap()
1485 .get(),
1486 0
1487 );
1488
1489 let data = store.data.get("test").unwrap();
1490 assert_eq!(data.len(), 17);
1491 for i in 0..9 {
1492 assert!(data.get(&i).is_none());
1493 }
1494 for i in 9..26 {
1495 assert!(data.get(&i).is_some());
1496 }
1497 let main_pipeline_watermark = store.watermark("test").unwrap();
1498 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1500 assert_eq!(main_pipeline_watermark.reader_lo, 5);
1501 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1502 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1503 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1504 }
1505
1506 #[tokio::test]
1509 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1510 let registry = Registry::new();
1511 let store = FallibleMockStore::default();
1512 let mut conn = store.connect().await.unwrap();
1513 set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1515
1516 let indexer_args = IndexerArgs {
1518 first_checkpoint: Some(0),
1519 last_checkpoint: Some(500),
1520 task: TaskArgs::tasked("task".to_string(), 10 ),
1521 ..Default::default()
1522 };
1523 let (mut tasked_indexer, _temp_dir) =
1524 create_test_indexer(store.clone(), indexer_args, ®istry, Some((501, 2))).await;
1525 let mut allow_process = 10;
1526 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1528 let _ = tasked_indexer
1529 .concurrent_pipeline(
1530 controllable_handler,
1531 ConcurrentConfig {
1532 committer: CommitterConfig {
1533 collect_interval_ms: 10,
1534 watermark_interval_ms: 10,
1535 ..Default::default()
1536 },
1537 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1540 ..Default::default()
1541 },
1542 )
1543 .await;
1544 let metrics = tasked_indexer.indexer_metrics().clone();
1545
1546 let mut s_indexer = tasked_indexer.run().await.unwrap();
1547
1548 store
1552 .wait_for_watermark(
1553 &pipeline_task::<FallibleMockStore>(ControllableHandler::NAME, Some("task"))
1554 .unwrap(),
1555 10,
1556 Duration::from_secs(10),
1557 )
1558 .await;
1559
1560 conn.set_reader_watermark(ControllableHandler::NAME, 250)
1565 .await
1566 .unwrap();
1567
1568 let reader_lo = metrics
1569 .collector_reader_lo
1570 .with_label_values(&[ControllableHandler::NAME]);
1571
1572 let mut interval = tokio::time::interval(Duration::from_millis(10));
1576 while reader_lo.get() != 250 {
1577 interval.tick().await;
1578 allow_process += 1;
1580 assert!(
1581 allow_process <= 500,
1582 "Released all checkpoints but collector never observed new reader_lo"
1583 );
1584 process_below.send(allow_process).ok();
1585 }
1586
1587 process_below.send(500).ok();
1594
1595 s_indexer.join().await.unwrap();
1596
1597 let data = store.data.get(ControllableHandler::NAME).unwrap();
1598
1599 for chkpt in (allow_process + 1)..250 {
1601 assert!(
1602 data.get(&chkpt).is_none(),
1603 "Checkpoint {chkpt} should have been skipped"
1604 );
1605 }
1606
1607 for chkpt in 250..=500 {
1609 assert!(
1610 data.get(&chkpt).is_some(),
1611 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1612 );
1613 }
1614
1615 for chkpt in 0..=10 {
1617 assert!(
1618 data.get(&chkpt).is_some(),
1619 "Checkpoint {chkpt} should have been committed (baseline)"
1620 );
1621 }
1622 }
1623}