1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54 #[arg(long)]
61 pub first_checkpoint: Option<u64>,
62
63 #[arg(long)]
66 pub last_checkpoint: Option<u64>,
67
68 #[arg(long, action = clap::ArgAction::Append)]
71 pub pipeline: Vec<String>,
72
73 #[clap(flatten)]
75 pub task: TaskArgs,
76}
77
78#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81 #[arg(long, requires = "reader_interval_ms")]
93 task: Option<String>,
94
95 #[arg(long, requires = "task")]
106 reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110 store: S,
114
115 metrics: Arc<IndexerMetrics>,
117
118 ingestion_service: IngestionService,
120
121 first_checkpoint: Option<u64>,
124
125 last_checkpoint: Option<u64>,
128
129 latest_checkpoint: u64,
131
132 next_checkpoint: u64,
135
136 next_sequential_checkpoint: Option<u64>,
139
140 task: Option<Task>,
152
153 enabled_pipelines: Option<BTreeSet<String>>,
157
158 added_pipelines: BTreeSet<&'static str>,
161
162 pipelines: Vec<Service>,
164}
165
166#[derive(Clone)]
168pub(crate) struct Task {
169 task: String,
172 reader_interval: Duration,
175}
176
177impl TaskArgs {
178 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179 Self {
180 task: Some(task),
181 reader_interval_ms: Some(reader_interval_ms),
182 }
183 }
184
185 fn into_task(self) -> Option<Task> {
186 Some(Task {
187 task: self.task?,
188 reader_interval: Duration::from_millis(self.reader_interval_ms?),
189 })
190 }
191}
192
193impl<S: Store> Indexer<S> {
194 pub async fn new(
206 store: S,
207 indexer_args: IndexerArgs,
208 client_args: ClientArgs,
209 ingestion_config: IngestionConfig,
210 metrics_prefix: Option<&str>,
211 registry: &Registry,
212 ) -> anyhow::Result<Self> {
213 let IndexerArgs {
214 first_checkpoint,
215 last_checkpoint,
216 pipeline,
217 task,
218 } = indexer_args;
219
220 let metrics = IndexerMetrics::new(metrics_prefix, registry);
221
222 let ingestion_service =
223 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
224
225 let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
226
227 info!(latest_checkpoint);
228
229 Ok(Self {
230 store,
231 metrics,
232 ingestion_service,
233 first_checkpoint,
234 last_checkpoint,
235 latest_checkpoint,
236 next_checkpoint: u64::MAX,
237 next_sequential_checkpoint: None,
238 task: task.into_task(),
239 enabled_pipelines: if pipeline.is_empty() {
240 None
241 } else {
242 Some(pipeline.into_iter().collect())
243 },
244 added_pipelines: BTreeSet::new(),
245 pipelines: vec![],
246 })
247 }
248
249 pub fn store(&self) -> &S {
251 &self.store
252 }
253
254 pub fn ingestion_client(&self) -> &IngestionClient {
256 self.ingestion_service.ingestion_client()
257 }
258
259 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
261 &self.metrics
262 }
263
264 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
266 self.ingestion_service.metrics()
267 }
268
269 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
271 self.added_pipelines.iter().copied().filter(|p| {
272 self.enabled_pipelines
273 .as_ref()
274 .is_none_or(|e| e.contains(*p))
275 })
276 }
277
278 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
282 self.next_sequential_checkpoint
283 }
284
285 pub async fn run(self) -> anyhow::Result<Service> {
291 if let Some(enabled_pipelines) = self.enabled_pipelines {
292 ensure!(
293 enabled_pipelines.is_empty(),
294 "Tried to enable pipelines that this indexer does not know about: \
295 {enabled_pipelines:#?}",
296 );
297 }
298
299 let start = self.next_checkpoint;
300 let end = self.last_checkpoint;
301 info!(start, end, "Ingestion range");
302
303 let mut service = self
304 .ingestion_service
305 .run(
306 (
307 Bound::Included(start),
308 end.map_or(Bound::Unbounded, Bound::Included),
309 ),
310 self.next_sequential_checkpoint,
311 )
312 .await
313 .context("Failed to start ingestion service")?;
314
315 for pipeline in self.pipelines {
316 service = service.merge(pipeline);
317 }
318
319 Ok(service)
320 }
321
322 async fn add_pipeline<P: Processor + 'static>(
331 &mut self,
332 pipeline_task: String,
333 retention: Option<u64>,
334 ) -> anyhow::Result<Option<u64>> {
335 ensure!(
336 self.added_pipelines.insert(P::NAME),
337 "Pipeline {:?} already added",
338 P::NAME,
339 );
340
341 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
342 && !enabled_pipelines.remove(P::NAME)
343 {
344 info!(pipeline = P::NAME, "Skipping");
345 return Ok(None);
346 }
347
348 let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
351 first_checkpoint
352 } else if let Some(retention) = retention {
353 self.latest_checkpoint.saturating_sub(retention)
354 } else {
355 0
356 };
357 let mut conn = self.store.connect().await?;
358 let init_watermark = conn
359 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
360 .await
361 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
362
363 let next_checkpoint = if let Some(init_watermark) = init_watermark {
364 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
365 checkpoint_hi_inclusive + 1
366 } else {
367 0
368 }
369 } else {
370 proposed_next_checkpoint
371 };
372
373 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
374
375 Ok(Some(next_checkpoint))
376 }
377}
378
379impl<S: ConcurrentStore> Indexer<S> {
380 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
387 &mut self,
388 handler: H,
389 config: ConcurrentConfig,
390 ) -> anyhow::Result<()> {
391 let pipeline_task =
392 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
393 let retention = config.pruner.as_ref().map(|p| p.retention);
394 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
395 return Ok(());
396 };
397
398 self.pipelines.push(concurrent::pipeline::<H>(
399 handler,
400 next_checkpoint,
401 config,
402 self.store.clone(),
403 self.task.clone(),
404 self.ingestion_service.subscribe().0,
405 self.metrics.clone(),
406 ));
407
408 Ok(())
409 }
410}
411
412impl<T: SequentialStore> Indexer<T> {
413 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
424 &mut self,
425 handler: H,
426 config: SequentialConfig,
427 ) -> anyhow::Result<()> {
428 if self.task.is_some() {
429 bail!(
430 "Sequential pipelines do not support pipeline tasks. \
431 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
432 Running the same pipeline under a different task would violate these guarantees."
433 );
434 }
435
436 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
437 return Ok(());
438 };
439
440 self.next_sequential_checkpoint = Some(
442 self.next_sequential_checkpoint
443 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
444 );
445
446 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
447
448 self.pipelines.push(sequential::pipeline::<H>(
449 handler,
450 next_checkpoint,
451 config,
452 self.store.clone(),
453 checkpoint_rx,
454 commit_hi_tx,
455 self.metrics.clone(),
456 ));
457
458 Ok(())
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::sync::Arc;
465
466 use async_trait::async_trait;
467 use clap::Parser;
468 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
469 use sui_synthetic_ingestion::synthetic_ingestion;
470 use tokio::sync::watch;
471
472 use crate::FieldCount;
473 use crate::config::ConcurrencyConfig;
474 use crate::ingestion::ingestion_client::IngestionClientArgs;
475 use crate::ingestion::store_client::ObjectStoreWatermark;
476 use crate::ingestion::store_client::WATERMARK_PATH;
477 use crate::mocks::store::MockStore;
478 use crate::pipeline::CommitterConfig;
479 use crate::pipeline::Processor;
480 use crate::pipeline::concurrent::ConcurrentConfig;
481 use crate::store::CommitterWatermark;
482 use crate::store::ConcurrentConnection as _;
483 use crate::store::Connection as _;
484
485 use super::*;
486
487 #[allow(dead_code)]
488 #[derive(Clone, FieldCount)]
489 struct MockValue(u64);
490
491 struct ControllableHandler {
493 process_below: watch::Receiver<u64>,
495 }
496
497 impl ControllableHandler {
498 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
499 let (tx, rx) = watch::channel(limit);
500 (Self { process_below: rx }, tx)
501 }
502 }
503
504 #[async_trait]
505 impl Processor for ControllableHandler {
506 const NAME: &'static str = "controllable";
507 type Value = MockValue;
508
509 async fn process(
510 &self,
511 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
512 ) -> anyhow::Result<Vec<Self::Value>> {
513 let cp_num = checkpoint.summary.sequence_number;
514
515 self.process_below
517 .clone()
518 .wait_for(|&limit| cp_num <= limit)
519 .await
520 .ok();
521
522 Ok(vec![MockValue(cp_num)])
523 }
524 }
525
526 #[async_trait]
527 impl concurrent::Handler for ControllableHandler {
528 type Store = MockStore;
529 type Batch = Vec<MockValue>;
530
531 fn batch(
532 &self,
533 batch: &mut Self::Batch,
534 values: &mut std::vec::IntoIter<Self::Value>,
535 ) -> concurrent::BatchStatus {
536 batch.extend(values);
537 concurrent::BatchStatus::Ready
538 }
539
540 async fn commit<'a>(
541 &self,
542 batch: &Self::Batch,
543 conn: &mut <Self::Store as Store>::Connection<'a>,
544 ) -> anyhow::Result<usize> {
545 for value in batch {
546 conn.0
547 .commit_data(Self::NAME, value.0, vec![value.0])
548 .await?;
549 }
550 Ok(batch.len())
551 }
552 }
553
554 macro_rules! test_pipeline {
555 ($handler:ident, $name:literal) => {
556 struct $handler;
557
558 #[async_trait]
559 impl Processor for $handler {
560 const NAME: &'static str = $name;
561 type Value = MockValue;
562 async fn process(
563 &self,
564 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
565 ) -> anyhow::Result<Vec<Self::Value>> {
566 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
567 }
568 }
569
570 #[async_trait]
571 impl crate::pipeline::concurrent::Handler for $handler {
572 type Store = MockStore;
573 type Batch = Vec<Self::Value>;
574
575 fn batch(
576 &self,
577 batch: &mut Self::Batch,
578 values: &mut std::vec::IntoIter<Self::Value>,
579 ) -> crate::pipeline::concurrent::BatchStatus {
580 batch.extend(values);
581 crate::pipeline::concurrent::BatchStatus::Pending
582 }
583
584 async fn commit<'a>(
585 &self,
586 batch: &Self::Batch,
587 conn: &mut <Self::Store as Store>::Connection<'a>,
588 ) -> anyhow::Result<usize> {
589 for value in batch {
590 conn.0
591 .commit_data(Self::NAME, value.0, vec![value.0])
592 .await?;
593 }
594 Ok(batch.len())
595 }
596 }
597
598 #[async_trait]
599 impl crate::pipeline::sequential::Handler for $handler {
600 type Store = MockStore;
601 type Batch = Vec<Self::Value>;
602
603 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
604 batch.extend(values);
605 }
606
607 async fn commit<'a>(
608 &self,
609 _batch: &Self::Batch,
610 _conn: &mut <Self::Store as Store>::Connection<'a>,
611 ) -> anyhow::Result<usize> {
612 Ok(1)
613 }
614 }
615 };
616 }
617
618 test_pipeline!(MockHandler, "test_processor");
619 test_pipeline!(SequentialHandler, "sequential_handler");
620 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
621
622 fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
623 let dir = tempfile::tempdir().unwrap();
624 if let Some(cp) = latest_checkpoint {
625 let watermark_path = dir.path().join(WATERMARK_PATH);
626 std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
627 let watermark = ObjectStoreWatermark {
628 checkpoint_hi_inclusive: cp,
629 };
630 std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
631 }
632 dir
633 }
634
635 async fn create_test_indexer(
638 store: MockStore,
639 indexer_args: IndexerArgs,
640 registry: &Registry,
641 ingestion_data: Option<(u64, u64)>,
642 ) -> (Indexer<MockStore>, tempfile::TempDir) {
643 let temp_dir = init_ingestion_dir(None);
644 if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
645 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
646 ingestion_dir: temp_dir.path().to_owned(),
647 starting_checkpoint: 0,
648 num_checkpoints,
649 checkpoint_size,
650 })
651 .await;
652 }
653 let client_args = ClientArgs {
654 ingestion: IngestionClientArgs {
655 local_ingestion_path: Some(temp_dir.path().to_owned()),
656 ..Default::default()
657 },
658 ..Default::default()
659 };
660 let indexer = Indexer::new(
661 store,
662 indexer_args,
663 client_args,
664 IngestionConfig::default(),
665 None,
666 registry,
667 )
668 .await
669 .unwrap();
670 (indexer, temp_dir)
671 }
672
673 async fn set_committer_watermark(
674 conn: &mut <MockStore as Store>::Connection<'_>,
675 name: &str,
676 hi: u64,
677 ) {
678 conn.set_committer_watermark(
679 name,
680 CommitterWatermark {
681 checkpoint_hi_inclusive: hi,
682 ..Default::default()
683 },
684 )
685 .await
686 .unwrap();
687 }
688
689 async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
690 indexer: &mut Indexer<MockStore>,
691 handler: H,
692 ) {
693 indexer
694 .concurrent_pipeline(handler, ConcurrentConfig::default())
695 .await
696 .unwrap();
697 }
698
699 async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
700 indexer: &mut Indexer<MockStore>,
701 handler: H,
702 ) {
703 indexer
704 .sequential_pipeline(handler, SequentialConfig::default())
705 .await
706 .unwrap();
707 }
708
709 macro_rules! assert_out_of_order {
710 ($metrics:expr, $pipeline:expr, $expected:expr) => {
711 assert_eq!(
712 $metrics
713 .total_watermarks_out_of_order
714 .get_metric_with_label_values(&[$pipeline])
715 .unwrap()
716 .get(),
717 $expected,
718 );
719 };
720 }
721
722 async fn test_init_watermark(
723 first_checkpoint: Option<u64>,
724 is_concurrent: bool,
725 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
726 let registry = Registry::new();
727 let store = MockStore::default();
728
729 test_pipeline!(A, "pipeline_name");
730
731 let mut conn = store.connect().await.unwrap();
732
733 let indexer_args = IndexerArgs {
734 first_checkpoint,
735 ..IndexerArgs::default()
736 };
737 let (mut indexer, _temp_dir) =
738 create_test_indexer(store.clone(), indexer_args, ®istry, None).await;
739
740 if is_concurrent {
741 add_concurrent(&mut indexer, A).await;
742 } else {
743 add_sequential(&mut indexer, A).await;
744 }
745
746 (
747 conn.committer_watermark(A::NAME).await.unwrap(),
748 conn.pruner_watermark(A::NAME, Duration::ZERO)
749 .await
750 .unwrap(),
751 )
752 }
753
754 const LATEST_CHECKPOINT: u64 = 10;
755
756 async fn test_next_checkpoint(
761 watermark: Option<u64>,
762 first_checkpoint: Option<u64>,
763 concurrent_config: ConcurrentConfig,
764 ) -> Indexer<MockStore> {
765 let registry = Registry::new();
766 let store = MockStore::default();
767
768 test_pipeline!(A, "concurrent_a");
769
770 if let Some(checkpoint_hi_inclusive) = watermark {
771 let mut conn = store.connect().await.unwrap();
772 conn.set_committer_watermark(
773 A::NAME,
774 CommitterWatermark {
775 checkpoint_hi_inclusive,
776 ..Default::default()
777 },
778 )
779 .await
780 .unwrap();
781 }
782
783 let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
784 let mut indexer = Indexer::new(
785 store,
786 IndexerArgs {
787 first_checkpoint,
788 ..Default::default()
789 },
790 ClientArgs {
791 ingestion: IngestionClientArgs {
792 local_ingestion_path: Some(temp_dir.path().to_owned()),
793 ..Default::default()
794 },
795 ..Default::default()
796 },
797 IngestionConfig::default(),
798 None,
799 ®istry,
800 )
801 .await
802 .unwrap();
803
804 assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
805
806 indexer
807 .concurrent_pipeline::<A>(A, concurrent_config)
808 .await
809 .unwrap();
810
811 indexer
812 }
813
814 fn pruner_config(retention: u64) -> ConcurrentConfig {
815 ConcurrentConfig {
816 pruner: Some(concurrent::PrunerConfig {
817 retention,
818 ..Default::default()
819 }),
820 ..Default::default()
821 }
822 }
823
824 #[test]
825 fn test_arg_parsing() {
826 #[derive(Parser)]
827 struct Args {
828 #[clap(flatten)]
829 indexer: IndexerArgs,
830 }
831
832 let args = Args::try_parse_from([
833 "cmd",
834 "--first-checkpoint",
835 "10",
836 "--last-checkpoint",
837 "100",
838 "--pipeline",
839 "a",
840 "--pipeline",
841 "b",
842 "--task",
843 "t",
844 "--reader-interval-ms",
845 "5000",
846 ])
847 .unwrap();
848
849 assert_eq!(args.indexer.first_checkpoint, Some(10));
850 assert_eq!(args.indexer.last_checkpoint, Some(100));
851 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
852 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
853 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
854 }
855
856 #[tokio::test]
858 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
859 let registry = Registry::new();
860 let store = MockStore::default();
861
862 test_pipeline!(A, "concurrent_a");
863 test_pipeline!(B, "concurrent_b");
864 test_pipeline!(C, "sequential_c");
865 test_pipeline!(D, "sequential_d");
866
867 let mut conn = store.connect().await.unwrap();
868
869 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
870 set_committer_watermark(&mut conn, A::NAME, 100).await;
871
872 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
873 set_committer_watermark(&mut conn, B::NAME, 10).await;
874
875 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
876 set_committer_watermark(&mut conn, C::NAME, 1).await;
877
878 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
879 set_committer_watermark(&mut conn, D::NAME, 50).await;
880
881 let (mut indexer, _temp_dir) =
882 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
883
884 add_concurrent(&mut indexer, A).await;
885 add_concurrent(&mut indexer, B).await;
886 add_sequential(&mut indexer, C).await;
887 add_sequential(&mut indexer, D).await;
888
889 assert_eq!(indexer.first_checkpoint, None);
890 assert_eq!(indexer.last_checkpoint, None);
891 assert_eq!(indexer.latest_checkpoint, 0);
892 assert_eq!(indexer.next_checkpoint, 2);
893 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
894 }
895
896 #[tokio::test]
898 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
899 let registry = Registry::new();
900 let store = MockStore::default();
901
902 test_pipeline!(A, "concurrent_a");
903 test_pipeline!(B, "concurrent_b");
904 test_pipeline!(C, "sequential_c");
905 test_pipeline!(D, "sequential_d");
906
907 let mut conn = store.connect().await.unwrap();
908 set_committer_watermark(&mut conn, B::NAME, 10).await;
909 set_committer_watermark(&mut conn, C::NAME, 1).await;
910
911 let (mut indexer, _temp_dir) =
912 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
913
914 add_concurrent(&mut indexer, A).await;
915 add_concurrent(&mut indexer, B).await;
916 add_sequential(&mut indexer, C).await;
917 add_sequential(&mut indexer, D).await;
918
919 assert_eq!(indexer.first_checkpoint, None);
920 assert_eq!(indexer.last_checkpoint, None);
921 assert_eq!(indexer.latest_checkpoint, 0);
922 assert_eq!(indexer.next_checkpoint, 0);
923 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
924 }
925
926 #[tokio::test]
928 async fn test_next_checkpoint_smallest_is_0() {
929 let registry = Registry::new();
930 let store = MockStore::default();
931
932 test_pipeline!(A, "concurrent_a");
933 test_pipeline!(B, "concurrent_b");
934 test_pipeline!(C, "sequential_c");
935 test_pipeline!(D, "sequential_d");
936
937 let mut conn = store.connect().await.unwrap();
938 set_committer_watermark(&mut conn, A::NAME, 100).await;
939 set_committer_watermark(&mut conn, B::NAME, 10).await;
940 set_committer_watermark(&mut conn, C::NAME, 1).await;
941 set_committer_watermark(&mut conn, D::NAME, 0).await;
942
943 let (mut indexer, _temp_dir) =
944 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
945
946 add_concurrent(&mut indexer, A).await;
947 add_concurrent(&mut indexer, B).await;
948 add_sequential(&mut indexer, C).await;
949 add_sequential(&mut indexer, D).await;
950
951 assert_eq!(indexer.next_checkpoint, 1);
952 }
953
954 #[tokio::test]
957 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
958 let registry = Registry::new();
959 let store = MockStore::default();
960
961 test_pipeline!(A, "concurrent_a");
962 test_pipeline!(B, "concurrent_b");
963 test_pipeline!(C, "sequential_c");
964 test_pipeline!(D, "sequential_d");
965
966 let mut conn = store.connect().await.unwrap();
967 set_committer_watermark(&mut conn, B::NAME, 50).await;
968 set_committer_watermark(&mut conn, C::NAME, 10).await;
969
970 let indexer_args = IndexerArgs {
971 first_checkpoint: Some(5),
972 ..Default::default()
973 };
974 let (mut indexer, _temp_dir) =
975 create_test_indexer(store, indexer_args, ®istry, None).await;
976
977 add_concurrent(&mut indexer, A).await;
978 add_concurrent(&mut indexer, B).await;
979 add_sequential(&mut indexer, C).await;
980 add_sequential(&mut indexer, D).await;
981
982 assert_eq!(indexer.first_checkpoint, Some(5));
983 assert_eq!(indexer.last_checkpoint, None);
984 assert_eq!(indexer.latest_checkpoint, 0);
985 assert_eq!(indexer.next_checkpoint, 5);
986 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
987 }
988
989 #[tokio::test]
992 async fn test_next_checkpoint_ignore_first_checkpoint() {
993 let registry = Registry::new();
994 let store = MockStore::default();
995
996 test_pipeline!(B, "concurrent_b");
997 test_pipeline!(C, "sequential_c");
998
999 let mut conn = store.connect().await.unwrap();
1000 set_committer_watermark(&mut conn, B::NAME, 50).await;
1001 set_committer_watermark(&mut conn, C::NAME, 10).await;
1002
1003 let indexer_args = IndexerArgs {
1004 first_checkpoint: Some(5),
1005 ..Default::default()
1006 };
1007 let (mut indexer, _temp_dir) =
1008 create_test_indexer(store, indexer_args, ®istry, None).await;
1009
1010 add_concurrent(&mut indexer, B).await;
1011 add_sequential(&mut indexer, C).await;
1012
1013 assert_eq!(indexer.first_checkpoint, Some(5));
1014 assert_eq!(indexer.last_checkpoint, None);
1015 assert_eq!(indexer.latest_checkpoint, 0);
1016 assert_eq!(indexer.next_checkpoint, 11);
1017 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1018 }
1019
1020 #[tokio::test]
1024 async fn test_next_checkpoint_large_first_checkpoint() {
1025 let registry = Registry::new();
1026 let store = MockStore::default();
1027
1028 test_pipeline!(A, "concurrent_a");
1029 test_pipeline!(B, "concurrent_b");
1030 test_pipeline!(C, "sequential_c");
1031
1032 let mut conn = store.connect().await.unwrap();
1033 set_committer_watermark(&mut conn, B::NAME, 50).await;
1034 set_committer_watermark(&mut conn, C::NAME, 10).await;
1035
1036 let indexer_args = IndexerArgs {
1037 first_checkpoint: Some(24),
1038 ..Default::default()
1039 };
1040 let (mut indexer, _temp_dir) =
1041 create_test_indexer(store, indexer_args, ®istry, None).await;
1042
1043 add_concurrent(&mut indexer, A).await;
1044 add_concurrent(&mut indexer, B).await;
1045 add_sequential(&mut indexer, C).await;
1046
1047 assert_eq!(indexer.first_checkpoint, Some(24));
1048 assert_eq!(indexer.last_checkpoint, None);
1049 assert_eq!(indexer.latest_checkpoint, 0);
1050 assert_eq!(indexer.next_checkpoint, 11);
1051 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1052 }
1053
1054 #[tokio::test]
1056 async fn test_latest_checkpoint_from_watermark() {
1057 let registry = Registry::new();
1058 let store = MockStore::default();
1059 let temp_dir = init_ingestion_dir(Some(30));
1060 let indexer = Indexer::new(
1061 store,
1062 IndexerArgs::default(),
1063 ClientArgs {
1064 ingestion: IngestionClientArgs {
1065 local_ingestion_path: Some(temp_dir.path().to_owned()),
1066 ..Default::default()
1067 },
1068 ..Default::default()
1069 },
1070 IngestionConfig::default(),
1071 None,
1072 ®istry,
1073 )
1074 .await
1075 .unwrap();
1076
1077 assert_eq!(indexer.latest_checkpoint, 30);
1078 }
1079
1080 #[tokio::test]
1083 async fn test_next_checkpoint_with_pruner_uses_retention() {
1084 let retention = LATEST_CHECKPOINT - 1;
1085 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1086 assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1087 }
1088
1089 #[tokio::test]
1091 async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1092 let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1093 assert_eq!(indexer.next_checkpoint, 0);
1094 }
1095
1096 #[tokio::test]
1098 async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1099 let retention = LATEST_CHECKPOINT - 1;
1100 let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1101 assert_eq!(indexer.next_checkpoint, 6);
1102 }
1103
1104 #[tokio::test]
1107 async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1108 let retention = LATEST_CHECKPOINT - 1;
1109 let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1110 assert_eq!(indexer.next_checkpoint, 2);
1111 }
1112
1113 #[tokio::test]
1115 async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1116 let retention = LATEST_CHECKPOINT + 1;
1117 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1118 assert_eq!(indexer.next_checkpoint, 0);
1119 }
1120
1121 #[tokio::test]
1123 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1124 let registry = Registry::new();
1125 let store = MockStore::default();
1126
1127 test_pipeline!(A, "concurrent_a");
1128 test_pipeline!(B, "concurrent_b");
1129 test_pipeline!(C, "sequential_c");
1130 test_pipeline!(D, "sequential_d");
1131
1132 let mut conn = store.connect().await.unwrap();
1133 set_committer_watermark(&mut conn, A::NAME, 5).await;
1134 set_committer_watermark(&mut conn, B::NAME, 10).await;
1135 set_committer_watermark(&mut conn, C::NAME, 15).await;
1136 set_committer_watermark(&mut conn, D::NAME, 20).await;
1137
1138 let indexer_args = IndexerArgs {
1139 last_checkpoint: Some(29),
1140 ..Default::default()
1141 };
1142 let (mut indexer, _temp_dir) =
1143 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1144
1145 add_concurrent(&mut indexer, A).await;
1146 add_concurrent(&mut indexer, B).await;
1147 add_sequential(&mut indexer, C).await;
1148 add_sequential(&mut indexer, D).await;
1149
1150 let ingestion_metrics = indexer.ingestion_metrics().clone();
1151 let indexer_metrics = indexer.indexer_metrics().clone();
1152
1153 indexer.run().await.unwrap().join().await.unwrap();
1154
1155 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1156 assert_out_of_order!(indexer_metrics, A::NAME, 0);
1157 assert_out_of_order!(indexer_metrics, B::NAME, 5);
1158 assert_out_of_order!(indexer_metrics, C::NAME, 10);
1159 assert_out_of_order!(indexer_metrics, D::NAME, 15);
1160 }
1161
1162 #[tokio::test]
1164 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1165 let registry = Registry::new();
1166 let store = MockStore::default();
1167
1168 test_pipeline!(A, "concurrent_a");
1169 test_pipeline!(B, "concurrent_b");
1170 test_pipeline!(C, "sequential_c");
1171 test_pipeline!(D, "sequential_d");
1172
1173 let mut conn = store.connect().await.unwrap();
1174 set_committer_watermark(&mut conn, A::NAME, 5).await;
1175 set_committer_watermark(&mut conn, B::NAME, 10).await;
1176 set_committer_watermark(&mut conn, C::NAME, 15).await;
1177 set_committer_watermark(&mut conn, D::NAME, 20).await;
1178
1179 let indexer_args = IndexerArgs {
1180 first_checkpoint: Some(3),
1181 last_checkpoint: Some(29),
1182 ..Default::default()
1183 };
1184 let (mut indexer, _temp_dir) =
1185 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1186
1187 add_concurrent(&mut indexer, A).await;
1188 add_concurrent(&mut indexer, B).await;
1189 add_sequential(&mut indexer, C).await;
1190 add_sequential(&mut indexer, D).await;
1191
1192 let ingestion_metrics = indexer.ingestion_metrics().clone();
1193 let metrics = indexer.indexer_metrics().clone();
1194 indexer.run().await.unwrap().join().await.unwrap();
1195
1196 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1197 assert_out_of_order!(metrics, A::NAME, 0);
1198 assert_out_of_order!(metrics, B::NAME, 5);
1199 assert_out_of_order!(metrics, C::NAME, 10);
1200 assert_out_of_order!(metrics, D::NAME, 15);
1201 }
1202
1203 #[tokio::test]
1205 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1206 let registry = Registry::new();
1207 let store = MockStore::default();
1208
1209 test_pipeline!(A, "concurrent_a");
1210 test_pipeline!(B, "concurrent_b");
1211 test_pipeline!(C, "sequential_c");
1212 test_pipeline!(D, "sequential_d");
1213
1214 let mut conn = store.connect().await.unwrap();
1215 set_committer_watermark(&mut conn, B::NAME, 10).await;
1216 set_committer_watermark(&mut conn, C::NAME, 15).await;
1217 set_committer_watermark(&mut conn, D::NAME, 20).await;
1218
1219 let indexer_args = IndexerArgs {
1220 last_checkpoint: Some(29),
1221 ..Default::default()
1222 };
1223 let (mut indexer, _temp_dir) =
1224 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1225
1226 add_concurrent(&mut indexer, A).await;
1227 add_concurrent(&mut indexer, B).await;
1228 add_sequential(&mut indexer, C).await;
1229 add_sequential(&mut indexer, D).await;
1230
1231 let ingestion_metrics = indexer.ingestion_metrics().clone();
1232 let metrics = indexer.indexer_metrics().clone();
1233 indexer.run().await.unwrap().join().await.unwrap();
1234
1235 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1236 assert_out_of_order!(metrics, A::NAME, 0);
1237 assert_out_of_order!(metrics, B::NAME, 11);
1238 assert_out_of_order!(metrics, C::NAME, 16);
1239 assert_out_of_order!(metrics, D::NAME, 21);
1240 }
1241
1242 #[tokio::test]
1244 async fn test_indexer_ingestion_use_first_checkpoint() {
1245 let registry = Registry::new();
1246 let store = MockStore::default();
1247
1248 test_pipeline!(A, "concurrent_a");
1249 test_pipeline!(B, "concurrent_b");
1250 test_pipeline!(C, "sequential_c");
1251 test_pipeline!(D, "sequential_d");
1252
1253 let mut conn = store.connect().await.unwrap();
1254 set_committer_watermark(&mut conn, B::NAME, 10).await;
1255 set_committer_watermark(&mut conn, C::NAME, 15).await;
1256 set_committer_watermark(&mut conn, D::NAME, 20).await;
1257
1258 let indexer_args = IndexerArgs {
1259 first_checkpoint: Some(10),
1260 last_checkpoint: Some(29),
1261 ..Default::default()
1262 };
1263 let (mut indexer, _temp_dir) =
1264 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1265
1266 add_concurrent(&mut indexer, A).await;
1267 add_concurrent(&mut indexer, B).await;
1268 add_sequential(&mut indexer, C).await;
1269 add_sequential(&mut indexer, D).await;
1270
1271 let ingestion_metrics = indexer.ingestion_metrics().clone();
1272 let metrics = indexer.indexer_metrics().clone();
1273 indexer.run().await.unwrap().join().await.unwrap();
1274
1275 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1276 assert_out_of_order!(metrics, A::NAME, 0);
1277 assert_out_of_order!(metrics, B::NAME, 1);
1278 assert_out_of_order!(metrics, C::NAME, 6);
1279 assert_out_of_order!(metrics, D::NAME, 11);
1280 }
1281
1282 #[tokio::test]
1283 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1284 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1285 assert_eq!(committer_watermark, None);
1286 assert_eq!(pruner_watermark, None);
1287 }
1288
1289 #[tokio::test]
1290 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1291 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1292 assert_eq!(committer_watermark, None);
1293 assert_eq!(pruner_watermark, None);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1298 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1299
1300 let committer_watermark = committer_watermark.unwrap();
1301 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1302
1303 let pruner_watermark = pruner_watermark.unwrap();
1304 assert_eq!(pruner_watermark.reader_lo, 1);
1305 assert_eq!(pruner_watermark.pruner_hi, 1);
1306 }
1307
1308 #[tokio::test]
1309 async fn test_init_watermark_sequential() {
1310 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1311
1312 let committer_watermark = committer_watermark.unwrap();
1313 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1314
1315 let pruner_watermark = pruner_watermark.unwrap();
1316 assert_eq!(pruner_watermark.reader_lo, 1);
1317 assert_eq!(pruner_watermark.pruner_hi, 1);
1318 }
1319
1320 #[tokio::test]
1321 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1322 let registry = Registry::new();
1323 let store = MockStore::default();
1324
1325 let mut conn = store.connect().await.unwrap();
1326 set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1327 set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1328
1329 let indexer_args = IndexerArgs {
1330 first_checkpoint: None,
1331 last_checkpoint: Some(19),
1332 pipeline: vec![],
1333 ..Default::default()
1334 };
1335 let (mut indexer, _temp_dir) =
1336 create_test_indexer(store.clone(), indexer_args, ®istry, Some((20, 2))).await;
1337
1338 add_sequential(&mut indexer, MockHandler).await;
1340
1341 assert_eq!(
1343 indexer.next_sequential_checkpoint(),
1344 Some(11),
1345 "next_sequential_checkpoint should be 11"
1346 );
1347
1348 add_sequential(&mut indexer, SequentialHandler).await;
1350
1351 assert_eq!(
1353 indexer.next_sequential_checkpoint(),
1354 Some(6),
1355 "next_sequential_checkpoint should still be 6"
1356 );
1357
1358 indexer.run().await.unwrap().join().await.unwrap();
1360
1361 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1363 let watermark2 = conn
1364 .committer_watermark(SequentialHandler::NAME)
1365 .await
1366 .unwrap();
1367
1368 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1369 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1370 }
1371
1372 #[tokio::test]
1376 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1377 let registry = Registry::new();
1378 let store = MockStore::default();
1379
1380 let mut conn = store.connect().await.unwrap();
1383 set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1384 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1385 .await
1386 .unwrap();
1387
1388 let indexer_args = IndexerArgs {
1391 first_checkpoint: Some(0),
1392 last_checkpoint: Some(15),
1393 task: TaskArgs::tasked("task".to_string(), 10),
1394 ..Default::default()
1395 };
1396 let (mut tasked_indexer, _temp_dir) =
1397 create_test_indexer(store.clone(), indexer_args, ®istry, Some((16, 2))).await;
1398
1399 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1400
1401 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1402 let metrics = tasked_indexer.indexer_metrics().clone();
1403
1404 tasked_indexer.run().await.unwrap().join().await.unwrap();
1405
1406 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1407 assert_eq!(
1408 metrics
1409 .total_collector_skipped_checkpoints
1410 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1411 .unwrap()
1412 .get(),
1413 7
1414 );
1415 let data = store
1416 .data
1417 .get(MockCheckpointSequenceNumberHandler::NAME)
1418 .unwrap();
1419 assert_eq!(data.len(), 9);
1420 for i in 0..7 {
1421 assert!(data.get(&i).is_none());
1422 }
1423 for i in 7..16 {
1424 assert!(data.get(&i).is_some());
1425 }
1426 }
1427
1428 #[tokio::test]
1430 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1431 let registry = Registry::new();
1432 let store = MockStore::default();
1433
1434 let mut conn = store.connect().await.unwrap();
1435 set_committer_watermark(&mut conn, "test", 10).await;
1436 conn.set_reader_watermark("test", 5).await.unwrap();
1437
1438 let indexer_args = IndexerArgs {
1441 first_checkpoint: Some(9),
1442 last_checkpoint: Some(25),
1443 task: TaskArgs::tasked("task".to_string(), 10),
1444 ..Default::default()
1445 };
1446 let (mut tasked_indexer, _temp_dir) =
1447 create_test_indexer(store.clone(), indexer_args, ®istry, Some((26, 2))).await;
1448
1449 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1450
1451 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1452 let metrics = tasked_indexer.indexer_metrics().clone();
1453
1454 tasked_indexer.run().await.unwrap().join().await.unwrap();
1455
1456 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1457 assert_out_of_order!(metrics, "test", 0);
1458 assert_eq!(
1459 metrics
1460 .total_collector_skipped_checkpoints
1461 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1462 .unwrap()
1463 .get(),
1464 0
1465 );
1466
1467 let data = store.data.get("test").unwrap();
1468 assert_eq!(data.len(), 17);
1469 for i in 0..9 {
1470 assert!(data.get(&i).is_none());
1471 }
1472 for i in 9..26 {
1473 assert!(data.get(&i).is_some());
1474 }
1475 let main_pipeline_watermark = store.watermark("test").unwrap();
1476 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1478 assert_eq!(main_pipeline_watermark.reader_lo, 5);
1479 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1480 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1481 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1482 }
1483
1484 #[tokio::test]
1487 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1488 let registry = Registry::new();
1489 let store = MockStore::default();
1490 let mut conn = store.connect().await.unwrap();
1491 set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1493
1494 let indexer_args = IndexerArgs {
1496 first_checkpoint: Some(0),
1497 last_checkpoint: Some(500),
1498 task: TaskArgs::tasked("task".to_string(), 10 ),
1499 ..Default::default()
1500 };
1501 let (mut tasked_indexer, _temp_dir) =
1502 create_test_indexer(store.clone(), indexer_args, ®istry, Some((501, 2))).await;
1503 let mut allow_process = 10;
1504 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1506 let _ = tasked_indexer
1507 .concurrent_pipeline(
1508 controllable_handler,
1509 ConcurrentConfig {
1510 committer: CommitterConfig {
1511 collect_interval_ms: 10,
1512 watermark_interval_ms: 10,
1513 ..Default::default()
1514 },
1515 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1518 ..Default::default()
1519 },
1520 )
1521 .await;
1522 let metrics = tasked_indexer.indexer_metrics().clone();
1523
1524 let mut s_indexer = tasked_indexer.run().await.unwrap();
1525
1526 store
1530 .wait_for_watermark(
1531 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1532 10,
1533 Duration::from_secs(10),
1534 )
1535 .await;
1536
1537 conn.set_reader_watermark(ControllableHandler::NAME, 250)
1542 .await
1543 .unwrap();
1544
1545 let reader_lo = metrics
1546 .collector_reader_lo
1547 .with_label_values(&[ControllableHandler::NAME]);
1548
1549 let mut interval = tokio::time::interval(Duration::from_millis(10));
1553 while reader_lo.get() != 250 {
1554 interval.tick().await;
1555 allow_process += 1;
1557 assert!(
1558 allow_process <= 500,
1559 "Released all checkpoints but collector never observed new reader_lo"
1560 );
1561 process_below.send(allow_process).ok();
1562 }
1563
1564 process_below.send(500).ok();
1571
1572 s_indexer.join().await.unwrap();
1573
1574 let data = store.data.get(ControllableHandler::NAME).unwrap();
1575
1576 for chkpt in (allow_process + 1)..250 {
1578 assert!(
1579 data.get(&chkpt).is_none(),
1580 "Checkpoint {chkpt} should have been skipped"
1581 );
1582 }
1583
1584 for chkpt in 250..=500 {
1586 assert!(
1587 data.get(&chkpt).is_some(),
1588 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1589 );
1590 }
1591
1592 for chkpt in 0..=10 {
1594 assert!(
1595 data.get(&chkpt).is_some(),
1596 "Checkpoint {chkpt} should have been committed (baseline)"
1597 );
1598 }
1599 }
1600}