1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54 #[arg(long)]
61 pub first_checkpoint: Option<u64>,
62
63 #[arg(long)]
66 pub last_checkpoint: Option<u64>,
67
68 #[arg(long, action = clap::ArgAction::Append)]
71 pub pipeline: Vec<String>,
72
73 #[clap(flatten)]
75 pub task: TaskArgs,
76}
77
78#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81 #[arg(long, requires = "reader_interval_ms")]
93 task: Option<String>,
94
95 #[arg(long, requires = "task")]
106 reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110 store: S,
114
115 metrics: Arc<IndexerMetrics>,
117
118 ingestion_service: IngestionService,
120
121 first_checkpoint: Option<u64>,
124
125 last_checkpoint: Option<u64>,
128
129 latest_checkpoint: u64,
131
132 next_checkpoint: u64,
135
136 next_sequential_checkpoint: Option<u64>,
139
140 task: Option<Task>,
152
153 enabled_pipelines: Option<BTreeSet<String>>,
157
158 added_pipelines: BTreeSet<&'static str>,
161
162 pipelines: Vec<Service>,
164}
165
166#[derive(Clone)]
168pub(crate) struct Task {
169 task: String,
172 reader_interval: Duration,
175}
176
177impl TaskArgs {
178 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179 Self {
180 task: Some(task),
181 reader_interval_ms: Some(reader_interval_ms),
182 }
183 }
184
185 fn into_task(self) -> Option<Task> {
186 Some(Task {
187 task: self.task?,
188 reader_interval: Duration::from_millis(self.reader_interval_ms?),
189 })
190 }
191}
192
193impl<S: Store> Indexer<S> {
194 pub async fn new(
206 store: S,
207 indexer_args: IndexerArgs,
208 client_args: ClientArgs,
209 ingestion_config: IngestionConfig,
210 metrics_prefix: Option<&str>,
211 registry: &Registry,
212 ) -> anyhow::Result<Self> {
213 let IndexerArgs {
214 first_checkpoint,
215 last_checkpoint,
216 pipeline,
217 task,
218 } = indexer_args;
219
220 let metrics = IndexerMetrics::new(metrics_prefix, registry);
221
222 let ingestion_service =
223 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
224
225 let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
226
227 info!(latest_checkpoint);
228
229 Ok(Self {
230 store,
231 metrics,
232 ingestion_service,
233 first_checkpoint,
234 last_checkpoint,
235 latest_checkpoint,
236 next_checkpoint: u64::MAX,
237 next_sequential_checkpoint: None,
238 task: task.into_task(),
239 enabled_pipelines: if pipeline.is_empty() {
240 None
241 } else {
242 Some(pipeline.into_iter().collect())
243 },
244 added_pipelines: BTreeSet::new(),
245 pipelines: vec![],
246 })
247 }
248
249 pub fn store(&self) -> &S {
251 &self.store
252 }
253
254 pub fn ingestion_client(&self) -> &IngestionClient {
256 self.ingestion_service.ingestion_client()
257 }
258
259 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
261 &self.metrics
262 }
263
264 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
266 self.ingestion_service.metrics()
267 }
268
269 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
271 self.added_pipelines.iter().copied().filter(|p| {
272 self.enabled_pipelines
273 .as_ref()
274 .is_none_or(|e| e.contains(*p))
275 })
276 }
277
278 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
282 self.next_sequential_checkpoint
283 }
284
285 pub async fn run(self) -> anyhow::Result<Service> {
291 if let Some(enabled_pipelines) = self.enabled_pipelines {
292 ensure!(
293 enabled_pipelines.is_empty(),
294 "Tried to enable pipelines that this indexer does not know about: \
295 {enabled_pipelines:#?}",
296 );
297 }
298
299 let start = self.next_checkpoint;
300 let end = self.last_checkpoint;
301 info!(start, end, "Ingestion range");
302
303 let mut service = self
304 .ingestion_service
305 .run((
306 Bound::Included(start),
307 end.map_or(Bound::Unbounded, Bound::Included),
308 ))
309 .await
310 .context("Failed to start ingestion service")?;
311
312 for pipeline in self.pipelines {
313 service = service.merge(pipeline);
314 }
315
316 Ok(service)
317 }
318
319 async fn add_pipeline<P: Processor + 'static>(
328 &mut self,
329 pipeline_task: String,
330 retention: Option<u64>,
331 ) -> anyhow::Result<Option<u64>> {
332 ensure!(
333 self.added_pipelines.insert(P::NAME),
334 "Pipeline {:?} already added",
335 P::NAME,
336 );
337
338 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
339 && !enabled_pipelines.remove(P::NAME)
340 {
341 info!(pipeline = P::NAME, "Skipping");
342 return Ok(None);
343 }
344
345 let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
348 first_checkpoint
349 } else if let Some(retention) = retention {
350 self.latest_checkpoint.saturating_sub(retention)
351 } else {
352 0
353 };
354 let mut conn = self.store.connect().await?;
355 let init_watermark = conn
356 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
357 .await
358 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
359
360 let next_checkpoint = if let Some(init_watermark) = init_watermark {
361 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
362 checkpoint_hi_inclusive + 1
363 } else {
364 0
365 }
366 } else {
367 proposed_next_checkpoint
368 };
369
370 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
371
372 Ok(Some(next_checkpoint))
373 }
374}
375
376impl<S: ConcurrentStore> Indexer<S> {
377 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
384 &mut self,
385 handler: H,
386 config: ConcurrentConfig,
387 ) -> anyhow::Result<()> {
388 let pipeline_task =
389 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
390 let retention = config.pruner.as_ref().map(|p| p.retention);
391 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
392 return Ok(());
393 };
394
395 let checkpoint_rx = self
396 .ingestion_service
397 .subscribe_bounded(config.ingestion.subscriber_channel_size());
398
399 self.pipelines.push(concurrent::pipeline::<H>(
400 handler,
401 next_checkpoint,
402 config,
403 self.store.clone(),
404 self.task.clone(),
405 checkpoint_rx,
406 self.metrics.clone(),
407 ));
408
409 Ok(())
410 }
411}
412
413impl<T: SequentialStore> Indexer<T> {
414 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
422 &mut self,
423 handler: H,
424 config: SequentialConfig,
425 ) -> anyhow::Result<()> {
426 if self.task.is_some() {
427 bail!(
428 "Sequential pipelines do not support pipeline tasks. \
429 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
430 Running the same pipeline under a different task would violate these guarantees."
431 );
432 }
433
434 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
435 return Ok(());
436 };
437
438 self.next_sequential_checkpoint = Some(
440 self.next_sequential_checkpoint
441 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
442 );
443
444 let checkpoint_rx = self
445 .ingestion_service
446 .subscribe_bounded(config.ingestion.subscriber_channel_size());
447
448 self.pipelines.push(sequential::pipeline::<H>(
449 handler,
450 next_checkpoint,
451 config,
452 self.store.clone(),
453 checkpoint_rx,
454 self.metrics.clone(),
455 ));
456
457 Ok(())
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::sync::Arc;
464
465 use async_trait::async_trait;
466 use clap::Parser;
467 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
468 use sui_synthetic_ingestion::synthetic_ingestion;
469 use tokio::sync::watch;
470
471 use crate::FieldCount;
472 use crate::config::ConcurrencyConfig;
473 use crate::ingestion::ingestion_client::IngestionClientArgs;
474 use crate::ingestion::store_client::ObjectStoreWatermark;
475 use crate::ingestion::store_client::WATERMARK_PATH;
476 use crate::mocks::store::MockStore;
477 use crate::pipeline::CommitterConfig;
478 use crate::pipeline::Processor;
479 use crate::pipeline::concurrent::ConcurrentConfig;
480 use crate::store::CommitterWatermark;
481 use crate::store::ConcurrentConnection as _;
482 use crate::store::Connection as _;
483
484 use super::*;
485
486 #[allow(dead_code)]
487 #[derive(Clone, FieldCount)]
488 struct MockValue(u64);
489
490 struct ControllableHandler {
492 process_below: watch::Receiver<u64>,
494 }
495
496 impl ControllableHandler {
497 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
498 let (tx, rx) = watch::channel(limit);
499 (Self { process_below: rx }, tx)
500 }
501 }
502
503 #[async_trait]
504 impl Processor for ControllableHandler {
505 const NAME: &'static str = "controllable";
506 type Value = MockValue;
507
508 async fn process(
509 &self,
510 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
511 ) -> anyhow::Result<Vec<Self::Value>> {
512 let cp_num = checkpoint.summary.sequence_number;
513
514 self.process_below
516 .clone()
517 .wait_for(|&limit| cp_num <= limit)
518 .await
519 .ok();
520
521 Ok(vec![MockValue(cp_num)])
522 }
523 }
524
525 #[async_trait]
526 impl concurrent::Handler for ControllableHandler {
527 type Store = MockStore;
528 type Batch = Vec<MockValue>;
529
530 fn batch(
531 &self,
532 batch: &mut Self::Batch,
533 values: &mut std::vec::IntoIter<Self::Value>,
534 ) -> concurrent::BatchStatus {
535 batch.extend(values);
536 concurrent::BatchStatus::Ready
537 }
538
539 async fn commit<'a>(
540 &self,
541 batch: &Self::Batch,
542 conn: &mut <Self::Store as Store>::Connection<'a>,
543 ) -> anyhow::Result<usize> {
544 for value in batch {
545 conn.0
546 .commit_data(Self::NAME, value.0, vec![value.0])
547 .await?;
548 }
549 Ok(batch.len())
550 }
551 }
552
553 macro_rules! test_pipeline {
554 ($handler:ident, $name:literal) => {
555 struct $handler;
556
557 #[async_trait]
558 impl Processor for $handler {
559 const NAME: &'static str = $name;
560 type Value = MockValue;
561 async fn process(
562 &self,
563 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
564 ) -> anyhow::Result<Vec<Self::Value>> {
565 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
566 }
567 }
568
569 #[async_trait]
570 impl crate::pipeline::concurrent::Handler for $handler {
571 type Store = MockStore;
572 type Batch = Vec<Self::Value>;
573
574 fn batch(
575 &self,
576 batch: &mut Self::Batch,
577 values: &mut std::vec::IntoIter<Self::Value>,
578 ) -> crate::pipeline::concurrent::BatchStatus {
579 batch.extend(values);
580 crate::pipeline::concurrent::BatchStatus::Pending
581 }
582
583 async fn commit<'a>(
584 &self,
585 batch: &Self::Batch,
586 conn: &mut <Self::Store as Store>::Connection<'a>,
587 ) -> anyhow::Result<usize> {
588 for value in batch {
589 conn.0
590 .commit_data(Self::NAME, value.0, vec![value.0])
591 .await?;
592 }
593 Ok(batch.len())
594 }
595 }
596
597 #[async_trait]
598 impl crate::pipeline::sequential::Handler for $handler {
599 type Store = MockStore;
600 type Batch = Vec<Self::Value>;
601
602 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
603 batch.extend(values);
604 }
605
606 async fn commit<'a>(
607 &self,
608 _batch: &Self::Batch,
609 _conn: &mut <Self::Store as Store>::Connection<'a>,
610 ) -> anyhow::Result<usize> {
611 Ok(1)
612 }
613 }
614 };
615 }
616
617 test_pipeline!(MockHandler, "test_processor");
618 test_pipeline!(SequentialHandler, "sequential_handler");
619 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
620
621 fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
622 let dir = tempfile::tempdir().unwrap();
623 if let Some(cp) = latest_checkpoint {
624 let watermark_path = dir.path().join(WATERMARK_PATH);
625 std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
626 let watermark = ObjectStoreWatermark {
627 checkpoint_hi_inclusive: cp,
628 };
629 std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
630 }
631 dir
632 }
633
634 async fn create_test_indexer(
637 store: MockStore,
638 indexer_args: IndexerArgs,
639 registry: &Registry,
640 ingestion_data: Option<(u64, u64)>,
641 ) -> (Indexer<MockStore>, tempfile::TempDir) {
642 let temp_dir = init_ingestion_dir(None);
643 if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
644 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
645 ingestion_dir: temp_dir.path().to_owned(),
646 starting_checkpoint: 0,
647 num_checkpoints,
648 checkpoint_size,
649 })
650 .await;
651 }
652 let client_args = ClientArgs {
653 ingestion: IngestionClientArgs {
654 local_ingestion_path: Some(temp_dir.path().to_owned()),
655 ..Default::default()
656 },
657 ..Default::default()
658 };
659 let indexer = Indexer::new(
660 store,
661 indexer_args,
662 client_args,
663 IngestionConfig::default(),
664 None,
665 registry,
666 )
667 .await
668 .unwrap();
669 (indexer, temp_dir)
670 }
671
672 async fn set_committer_watermark(
673 conn: &mut <MockStore as Store>::Connection<'_>,
674 name: &str,
675 hi: u64,
676 ) {
677 conn.set_committer_watermark(
678 name,
679 CommitterWatermark {
680 checkpoint_hi_inclusive: hi,
681 ..Default::default()
682 },
683 )
684 .await
685 .unwrap();
686 }
687
688 async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
689 indexer: &mut Indexer<MockStore>,
690 handler: H,
691 ) {
692 indexer
693 .concurrent_pipeline(handler, ConcurrentConfig::default())
694 .await
695 .unwrap();
696 }
697
698 async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
699 indexer: &mut Indexer<MockStore>,
700 handler: H,
701 ) {
702 indexer
703 .sequential_pipeline(handler, SequentialConfig::default())
704 .await
705 .unwrap();
706 }
707
708 macro_rules! assert_out_of_order {
709 ($metrics:expr, $pipeline:expr, $expected:expr) => {
710 assert_eq!(
711 $metrics
712 .total_watermarks_out_of_order
713 .get_metric_with_label_values(&[$pipeline])
714 .unwrap()
715 .get(),
716 $expected,
717 );
718 };
719 }
720
721 async fn test_init_watermark(
722 first_checkpoint: Option<u64>,
723 is_concurrent: bool,
724 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
725 let registry = Registry::new();
726 let store = MockStore::default();
727
728 test_pipeline!(A, "pipeline_name");
729
730 let mut conn = store.connect().await.unwrap();
731
732 let indexer_args = IndexerArgs {
733 first_checkpoint,
734 ..IndexerArgs::default()
735 };
736 let (mut indexer, _temp_dir) =
737 create_test_indexer(store.clone(), indexer_args, ®istry, None).await;
738
739 if is_concurrent {
740 add_concurrent(&mut indexer, A).await;
741 } else {
742 add_sequential(&mut indexer, A).await;
743 }
744
745 (
746 conn.committer_watermark(A::NAME).await.unwrap(),
747 conn.pruner_watermark(A::NAME, Duration::ZERO)
748 .await
749 .unwrap(),
750 )
751 }
752
753 const LATEST_CHECKPOINT: u64 = 10;
754
755 async fn test_next_checkpoint(
760 watermark: Option<u64>,
761 first_checkpoint: Option<u64>,
762 concurrent_config: ConcurrentConfig,
763 ) -> Indexer<MockStore> {
764 let registry = Registry::new();
765 let store = MockStore::default();
766
767 test_pipeline!(A, "concurrent_a");
768
769 if let Some(checkpoint_hi_inclusive) = watermark {
770 let mut conn = store.connect().await.unwrap();
771 conn.set_committer_watermark(
772 A::NAME,
773 CommitterWatermark {
774 checkpoint_hi_inclusive,
775 ..Default::default()
776 },
777 )
778 .await
779 .unwrap();
780 }
781
782 let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
783 let mut indexer = Indexer::new(
784 store,
785 IndexerArgs {
786 first_checkpoint,
787 ..Default::default()
788 },
789 ClientArgs {
790 ingestion: IngestionClientArgs {
791 local_ingestion_path: Some(temp_dir.path().to_owned()),
792 ..Default::default()
793 },
794 ..Default::default()
795 },
796 IngestionConfig::default(),
797 None,
798 ®istry,
799 )
800 .await
801 .unwrap();
802
803 assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
804
805 indexer
806 .concurrent_pipeline::<A>(A, concurrent_config)
807 .await
808 .unwrap();
809
810 indexer
811 }
812
813 fn pruner_config(retention: u64) -> ConcurrentConfig {
814 ConcurrentConfig {
815 pruner: Some(concurrent::PrunerConfig {
816 retention,
817 ..Default::default()
818 }),
819 ..Default::default()
820 }
821 }
822
823 #[test]
824 fn test_arg_parsing() {
825 #[derive(Parser)]
826 struct Args {
827 #[clap(flatten)]
828 indexer: IndexerArgs,
829 }
830
831 let args = Args::try_parse_from([
832 "cmd",
833 "--first-checkpoint",
834 "10",
835 "--last-checkpoint",
836 "100",
837 "--pipeline",
838 "a",
839 "--pipeline",
840 "b",
841 "--task",
842 "t",
843 "--reader-interval-ms",
844 "5000",
845 ])
846 .unwrap();
847
848 assert_eq!(args.indexer.first_checkpoint, Some(10));
849 assert_eq!(args.indexer.last_checkpoint, Some(100));
850 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
851 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
852 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
853 }
854
855 #[tokio::test]
857 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
858 let registry = Registry::new();
859 let store = MockStore::default();
860
861 test_pipeline!(A, "concurrent_a");
862 test_pipeline!(B, "concurrent_b");
863 test_pipeline!(C, "sequential_c");
864 test_pipeline!(D, "sequential_d");
865
866 let mut conn = store.connect().await.unwrap();
867
868 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
869 set_committer_watermark(&mut conn, A::NAME, 100).await;
870
871 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
872 set_committer_watermark(&mut conn, B::NAME, 10).await;
873
874 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
875 set_committer_watermark(&mut conn, C::NAME, 1).await;
876
877 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
878 set_committer_watermark(&mut conn, D::NAME, 50).await;
879
880 let (mut indexer, _temp_dir) =
881 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
882
883 add_concurrent(&mut indexer, A).await;
884 add_concurrent(&mut indexer, B).await;
885 add_sequential(&mut indexer, C).await;
886 add_sequential(&mut indexer, D).await;
887
888 assert_eq!(indexer.first_checkpoint, None);
889 assert_eq!(indexer.last_checkpoint, None);
890 assert_eq!(indexer.latest_checkpoint, 0);
891 assert_eq!(indexer.next_checkpoint, 2);
892 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
893 }
894
895 #[tokio::test]
897 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
898 let registry = Registry::new();
899 let store = MockStore::default();
900
901 test_pipeline!(A, "concurrent_a");
902 test_pipeline!(B, "concurrent_b");
903 test_pipeline!(C, "sequential_c");
904 test_pipeline!(D, "sequential_d");
905
906 let mut conn = store.connect().await.unwrap();
907 set_committer_watermark(&mut conn, B::NAME, 10).await;
908 set_committer_watermark(&mut conn, C::NAME, 1).await;
909
910 let (mut indexer, _temp_dir) =
911 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
912
913 add_concurrent(&mut indexer, A).await;
914 add_concurrent(&mut indexer, B).await;
915 add_sequential(&mut indexer, C).await;
916 add_sequential(&mut indexer, D).await;
917
918 assert_eq!(indexer.first_checkpoint, None);
919 assert_eq!(indexer.last_checkpoint, None);
920 assert_eq!(indexer.latest_checkpoint, 0);
921 assert_eq!(indexer.next_checkpoint, 0);
922 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
923 }
924
925 #[tokio::test]
927 async fn test_next_checkpoint_smallest_is_0() {
928 let registry = Registry::new();
929 let store = MockStore::default();
930
931 test_pipeline!(A, "concurrent_a");
932 test_pipeline!(B, "concurrent_b");
933 test_pipeline!(C, "sequential_c");
934 test_pipeline!(D, "sequential_d");
935
936 let mut conn = store.connect().await.unwrap();
937 set_committer_watermark(&mut conn, A::NAME, 100).await;
938 set_committer_watermark(&mut conn, B::NAME, 10).await;
939 set_committer_watermark(&mut conn, C::NAME, 1).await;
940 set_committer_watermark(&mut conn, D::NAME, 0).await;
941
942 let (mut indexer, _temp_dir) =
943 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
944
945 add_concurrent(&mut indexer, A).await;
946 add_concurrent(&mut indexer, B).await;
947 add_sequential(&mut indexer, C).await;
948 add_sequential(&mut indexer, D).await;
949
950 assert_eq!(indexer.next_checkpoint, 1);
951 }
952
953 #[tokio::test]
956 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
957 let registry = Registry::new();
958 let store = MockStore::default();
959
960 test_pipeline!(A, "concurrent_a");
961 test_pipeline!(B, "concurrent_b");
962 test_pipeline!(C, "sequential_c");
963 test_pipeline!(D, "sequential_d");
964
965 let mut conn = store.connect().await.unwrap();
966 set_committer_watermark(&mut conn, B::NAME, 50).await;
967 set_committer_watermark(&mut conn, C::NAME, 10).await;
968
969 let indexer_args = IndexerArgs {
970 first_checkpoint: Some(5),
971 ..Default::default()
972 };
973 let (mut indexer, _temp_dir) =
974 create_test_indexer(store, indexer_args, ®istry, None).await;
975
976 add_concurrent(&mut indexer, A).await;
977 add_concurrent(&mut indexer, B).await;
978 add_sequential(&mut indexer, C).await;
979 add_sequential(&mut indexer, D).await;
980
981 assert_eq!(indexer.first_checkpoint, Some(5));
982 assert_eq!(indexer.last_checkpoint, None);
983 assert_eq!(indexer.latest_checkpoint, 0);
984 assert_eq!(indexer.next_checkpoint, 5);
985 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
986 }
987
988 #[tokio::test]
991 async fn test_next_checkpoint_ignore_first_checkpoint() {
992 let registry = Registry::new();
993 let store = MockStore::default();
994
995 test_pipeline!(B, "concurrent_b");
996 test_pipeline!(C, "sequential_c");
997
998 let mut conn = store.connect().await.unwrap();
999 set_committer_watermark(&mut conn, B::NAME, 50).await;
1000 set_committer_watermark(&mut conn, C::NAME, 10).await;
1001
1002 let indexer_args = IndexerArgs {
1003 first_checkpoint: Some(5),
1004 ..Default::default()
1005 };
1006 let (mut indexer, _temp_dir) =
1007 create_test_indexer(store, indexer_args, ®istry, None).await;
1008
1009 add_concurrent(&mut indexer, B).await;
1010 add_sequential(&mut indexer, C).await;
1011
1012 assert_eq!(indexer.first_checkpoint, Some(5));
1013 assert_eq!(indexer.last_checkpoint, None);
1014 assert_eq!(indexer.latest_checkpoint, 0);
1015 assert_eq!(indexer.next_checkpoint, 11);
1016 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1017 }
1018
1019 #[tokio::test]
1023 async fn test_next_checkpoint_large_first_checkpoint() {
1024 let registry = Registry::new();
1025 let store = MockStore::default();
1026
1027 test_pipeline!(A, "concurrent_a");
1028 test_pipeline!(B, "concurrent_b");
1029 test_pipeline!(C, "sequential_c");
1030
1031 let mut conn = store.connect().await.unwrap();
1032 set_committer_watermark(&mut conn, B::NAME, 50).await;
1033 set_committer_watermark(&mut conn, C::NAME, 10).await;
1034
1035 let indexer_args = IndexerArgs {
1036 first_checkpoint: Some(24),
1037 ..Default::default()
1038 };
1039 let (mut indexer, _temp_dir) =
1040 create_test_indexer(store, indexer_args, ®istry, None).await;
1041
1042 add_concurrent(&mut indexer, A).await;
1043 add_concurrent(&mut indexer, B).await;
1044 add_sequential(&mut indexer, C).await;
1045
1046 assert_eq!(indexer.first_checkpoint, Some(24));
1047 assert_eq!(indexer.last_checkpoint, None);
1048 assert_eq!(indexer.latest_checkpoint, 0);
1049 assert_eq!(indexer.next_checkpoint, 11);
1050 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1051 }
1052
1053 #[tokio::test]
1055 async fn test_latest_checkpoint_from_watermark() {
1056 let registry = Registry::new();
1057 let store = MockStore::default();
1058 let temp_dir = init_ingestion_dir(Some(30));
1059 let indexer = Indexer::new(
1060 store,
1061 IndexerArgs::default(),
1062 ClientArgs {
1063 ingestion: IngestionClientArgs {
1064 local_ingestion_path: Some(temp_dir.path().to_owned()),
1065 ..Default::default()
1066 },
1067 ..Default::default()
1068 },
1069 IngestionConfig::default(),
1070 None,
1071 ®istry,
1072 )
1073 .await
1074 .unwrap();
1075
1076 assert_eq!(indexer.latest_checkpoint, 30);
1077 }
1078
1079 #[tokio::test]
1082 async fn test_next_checkpoint_with_pruner_uses_retention() {
1083 let retention = LATEST_CHECKPOINT - 1;
1084 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1085 assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1086 }
1087
1088 #[tokio::test]
1090 async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1091 let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1092 assert_eq!(indexer.next_checkpoint, 0);
1093 }
1094
1095 #[tokio::test]
1097 async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1098 let retention = LATEST_CHECKPOINT - 1;
1099 let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1100 assert_eq!(indexer.next_checkpoint, 6);
1101 }
1102
1103 #[tokio::test]
1106 async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1107 let retention = LATEST_CHECKPOINT - 1;
1108 let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1109 assert_eq!(indexer.next_checkpoint, 2);
1110 }
1111
1112 #[tokio::test]
1114 async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1115 let retention = LATEST_CHECKPOINT + 1;
1116 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1117 assert_eq!(indexer.next_checkpoint, 0);
1118 }
1119
1120 #[tokio::test]
1122 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1123 let registry = Registry::new();
1124 let store = MockStore::default();
1125
1126 test_pipeline!(A, "concurrent_a");
1127 test_pipeline!(B, "concurrent_b");
1128 test_pipeline!(C, "sequential_c");
1129 test_pipeline!(D, "sequential_d");
1130
1131 let mut conn = store.connect().await.unwrap();
1132 set_committer_watermark(&mut conn, A::NAME, 5).await;
1133 set_committer_watermark(&mut conn, B::NAME, 10).await;
1134 set_committer_watermark(&mut conn, C::NAME, 15).await;
1135 set_committer_watermark(&mut conn, D::NAME, 20).await;
1136
1137 let indexer_args = IndexerArgs {
1138 last_checkpoint: Some(29),
1139 ..Default::default()
1140 };
1141 let (mut indexer, _temp_dir) =
1142 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1143
1144 add_concurrent(&mut indexer, A).await;
1145 add_concurrent(&mut indexer, B).await;
1146 add_sequential(&mut indexer, C).await;
1147 add_sequential(&mut indexer, D).await;
1148
1149 let ingestion_metrics = indexer.ingestion_metrics().clone();
1150 let indexer_metrics = indexer.indexer_metrics().clone();
1151
1152 indexer.run().await.unwrap().join().await.unwrap();
1153
1154 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1155 assert_out_of_order!(indexer_metrics, A::NAME, 0);
1156 assert_out_of_order!(indexer_metrics, B::NAME, 5);
1157 assert_out_of_order!(indexer_metrics, C::NAME, 10);
1158 assert_out_of_order!(indexer_metrics, D::NAME, 15);
1159 }
1160
1161 #[tokio::test]
1163 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1164 let registry = Registry::new();
1165 let store = MockStore::default();
1166
1167 test_pipeline!(A, "concurrent_a");
1168 test_pipeline!(B, "concurrent_b");
1169 test_pipeline!(C, "sequential_c");
1170 test_pipeline!(D, "sequential_d");
1171
1172 let mut conn = store.connect().await.unwrap();
1173 set_committer_watermark(&mut conn, A::NAME, 5).await;
1174 set_committer_watermark(&mut conn, B::NAME, 10).await;
1175 set_committer_watermark(&mut conn, C::NAME, 15).await;
1176 set_committer_watermark(&mut conn, D::NAME, 20).await;
1177
1178 let indexer_args = IndexerArgs {
1179 first_checkpoint: Some(3),
1180 last_checkpoint: Some(29),
1181 ..Default::default()
1182 };
1183 let (mut indexer, _temp_dir) =
1184 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1185
1186 add_concurrent(&mut indexer, A).await;
1187 add_concurrent(&mut indexer, B).await;
1188 add_sequential(&mut indexer, C).await;
1189 add_sequential(&mut indexer, D).await;
1190
1191 let ingestion_metrics = indexer.ingestion_metrics().clone();
1192 let metrics = indexer.indexer_metrics().clone();
1193 indexer.run().await.unwrap().join().await.unwrap();
1194
1195 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1196 assert_out_of_order!(metrics, A::NAME, 0);
1197 assert_out_of_order!(metrics, B::NAME, 5);
1198 assert_out_of_order!(metrics, C::NAME, 10);
1199 assert_out_of_order!(metrics, D::NAME, 15);
1200 }
1201
1202 #[tokio::test]
1204 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1205 let registry = Registry::new();
1206 let store = MockStore::default();
1207
1208 test_pipeline!(A, "concurrent_a");
1209 test_pipeline!(B, "concurrent_b");
1210 test_pipeline!(C, "sequential_c");
1211 test_pipeline!(D, "sequential_d");
1212
1213 let mut conn = store.connect().await.unwrap();
1214 set_committer_watermark(&mut conn, B::NAME, 10).await;
1215 set_committer_watermark(&mut conn, C::NAME, 15).await;
1216 set_committer_watermark(&mut conn, D::NAME, 20).await;
1217
1218 let indexer_args = IndexerArgs {
1219 last_checkpoint: Some(29),
1220 ..Default::default()
1221 };
1222 let (mut indexer, _temp_dir) =
1223 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1224
1225 add_concurrent(&mut indexer, A).await;
1226 add_concurrent(&mut indexer, B).await;
1227 add_sequential(&mut indexer, C).await;
1228 add_sequential(&mut indexer, D).await;
1229
1230 let ingestion_metrics = indexer.ingestion_metrics().clone();
1231 let metrics = indexer.indexer_metrics().clone();
1232 indexer.run().await.unwrap().join().await.unwrap();
1233
1234 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1235 assert_out_of_order!(metrics, A::NAME, 0);
1236 assert_out_of_order!(metrics, B::NAME, 11);
1237 assert_out_of_order!(metrics, C::NAME, 16);
1238 assert_out_of_order!(metrics, D::NAME, 21);
1239 }
1240
1241 #[tokio::test]
1243 async fn test_indexer_ingestion_use_first_checkpoint() {
1244 let registry = Registry::new();
1245 let store = MockStore::default();
1246
1247 test_pipeline!(A, "concurrent_a");
1248 test_pipeline!(B, "concurrent_b");
1249 test_pipeline!(C, "sequential_c");
1250 test_pipeline!(D, "sequential_d");
1251
1252 let mut conn = store.connect().await.unwrap();
1253 set_committer_watermark(&mut conn, B::NAME, 10).await;
1254 set_committer_watermark(&mut conn, C::NAME, 15).await;
1255 set_committer_watermark(&mut conn, D::NAME, 20).await;
1256
1257 let indexer_args = IndexerArgs {
1258 first_checkpoint: Some(10),
1259 last_checkpoint: Some(29),
1260 ..Default::default()
1261 };
1262 let (mut indexer, _temp_dir) =
1263 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1264
1265 add_concurrent(&mut indexer, A).await;
1266 add_concurrent(&mut indexer, B).await;
1267 add_sequential(&mut indexer, C).await;
1268 add_sequential(&mut indexer, D).await;
1269
1270 let ingestion_metrics = indexer.ingestion_metrics().clone();
1271 let metrics = indexer.indexer_metrics().clone();
1272 indexer.run().await.unwrap().join().await.unwrap();
1273
1274 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1275 assert_out_of_order!(metrics, A::NAME, 0);
1276 assert_out_of_order!(metrics, B::NAME, 1);
1277 assert_out_of_order!(metrics, C::NAME, 6);
1278 assert_out_of_order!(metrics, D::NAME, 11);
1279 }
1280
1281 #[tokio::test]
1282 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1283 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1284 assert_eq!(committer_watermark, None);
1285 assert_eq!(pruner_watermark, None);
1286 }
1287
1288 #[tokio::test]
1289 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1290 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1291 assert_eq!(committer_watermark, None);
1292 assert_eq!(pruner_watermark, None);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1297 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1298
1299 let committer_watermark = committer_watermark.unwrap();
1300 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1301
1302 let pruner_watermark = pruner_watermark.unwrap();
1303 assert_eq!(pruner_watermark.reader_lo, 1);
1304 assert_eq!(pruner_watermark.pruner_hi, 1);
1305 }
1306
1307 #[tokio::test]
1308 async fn test_init_watermark_sequential() {
1309 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1310
1311 let committer_watermark = committer_watermark.unwrap();
1312 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1313
1314 let pruner_watermark = pruner_watermark.unwrap();
1315 assert_eq!(pruner_watermark.reader_lo, 1);
1316 assert_eq!(pruner_watermark.pruner_hi, 1);
1317 }
1318
1319 #[tokio::test]
1320 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1321 let registry = Registry::new();
1322 let store = MockStore::default();
1323
1324 let mut conn = store.connect().await.unwrap();
1325 set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1326 set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1327
1328 let indexer_args = IndexerArgs {
1329 first_checkpoint: None,
1330 last_checkpoint: Some(19),
1331 pipeline: vec![],
1332 ..Default::default()
1333 };
1334 let (mut indexer, _temp_dir) =
1335 create_test_indexer(store.clone(), indexer_args, ®istry, Some((20, 2))).await;
1336
1337 add_sequential(&mut indexer, MockHandler).await;
1339
1340 assert_eq!(
1342 indexer.next_sequential_checkpoint(),
1343 Some(11),
1344 "next_sequential_checkpoint should be 11"
1345 );
1346
1347 add_sequential(&mut indexer, SequentialHandler).await;
1349
1350 assert_eq!(
1352 indexer.next_sequential_checkpoint(),
1353 Some(6),
1354 "next_sequential_checkpoint should still be 6"
1355 );
1356
1357 indexer.run().await.unwrap().join().await.unwrap();
1359
1360 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1362 let watermark2 = conn
1363 .committer_watermark(SequentialHandler::NAME)
1364 .await
1365 .unwrap();
1366
1367 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1368 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1369 }
1370
1371 #[tokio::test]
1375 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1376 let registry = Registry::new();
1377 let store = MockStore::default();
1378
1379 let mut conn = store.connect().await.unwrap();
1382 set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1383 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1384 .await
1385 .unwrap();
1386
1387 let indexer_args = IndexerArgs {
1390 first_checkpoint: Some(0),
1391 last_checkpoint: Some(15),
1392 task: TaskArgs::tasked("task".to_string(), 10),
1393 ..Default::default()
1394 };
1395 let (mut tasked_indexer, _temp_dir) =
1396 create_test_indexer(store.clone(), indexer_args, ®istry, Some((16, 2))).await;
1397
1398 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1399
1400 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1401 let metrics = tasked_indexer.indexer_metrics().clone();
1402
1403 tasked_indexer.run().await.unwrap().join().await.unwrap();
1404
1405 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1406 assert_eq!(
1407 metrics
1408 .total_collector_skipped_checkpoints
1409 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1410 .unwrap()
1411 .get(),
1412 7
1413 );
1414 let data = store
1415 .data
1416 .get(MockCheckpointSequenceNumberHandler::NAME)
1417 .unwrap();
1418 assert_eq!(data.len(), 9);
1419 for i in 0..7 {
1420 assert!(data.get(&i).is_none());
1421 }
1422 for i in 7..16 {
1423 assert!(data.get(&i).is_some());
1424 }
1425 }
1426
1427 #[tokio::test]
1429 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1430 let registry = Registry::new();
1431 let store = MockStore::default();
1432
1433 let mut conn = store.connect().await.unwrap();
1434 set_committer_watermark(&mut conn, "test", 10).await;
1435 conn.set_reader_watermark("test", 5).await.unwrap();
1436
1437 let indexer_args = IndexerArgs {
1440 first_checkpoint: Some(9),
1441 last_checkpoint: Some(25),
1442 task: TaskArgs::tasked("task".to_string(), 10),
1443 ..Default::default()
1444 };
1445 let (mut tasked_indexer, _temp_dir) =
1446 create_test_indexer(store.clone(), indexer_args, ®istry, Some((26, 2))).await;
1447
1448 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1449
1450 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1451 let metrics = tasked_indexer.indexer_metrics().clone();
1452
1453 tasked_indexer.run().await.unwrap().join().await.unwrap();
1454
1455 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1456 assert_out_of_order!(metrics, "test", 0);
1457 assert_eq!(
1458 metrics
1459 .total_collector_skipped_checkpoints
1460 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1461 .unwrap()
1462 .get(),
1463 0
1464 );
1465
1466 let data = store.data.get("test").unwrap();
1467 assert_eq!(data.len(), 17);
1468 for i in 0..9 {
1469 assert!(data.get(&i).is_none());
1470 }
1471 for i in 9..26 {
1472 assert!(data.get(&i).is_some());
1473 }
1474 let main_pipeline_watermark = store.watermark("test").unwrap();
1475 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1477 assert_eq!(main_pipeline_watermark.reader_lo, 5);
1478 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1479 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1480 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1481 }
1482
1483 #[tokio::test]
1486 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1487 let registry = Registry::new();
1488 let store = MockStore::default();
1489 let mut conn = store.connect().await.unwrap();
1490 set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1492
1493 let indexer_args = IndexerArgs {
1495 first_checkpoint: Some(0),
1496 last_checkpoint: Some(500),
1497 task: TaskArgs::tasked("task".to_string(), 10 ),
1498 ..Default::default()
1499 };
1500 let (mut tasked_indexer, _temp_dir) =
1501 create_test_indexer(store.clone(), indexer_args, ®istry, Some((501, 2))).await;
1502 let mut allow_process = 10;
1503 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1505 let _ = tasked_indexer
1506 .concurrent_pipeline(
1507 controllable_handler,
1508 ConcurrentConfig {
1509 committer: CommitterConfig {
1510 collect_interval_ms: 10,
1511 watermark_interval_ms: 10,
1512 ..Default::default()
1513 },
1514 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1517 ..Default::default()
1518 },
1519 )
1520 .await;
1521 let metrics = tasked_indexer.indexer_metrics().clone();
1522
1523 let mut s_indexer = tasked_indexer.run().await.unwrap();
1524
1525 store
1529 .wait_for_watermark(
1530 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1531 10,
1532 Duration::from_secs(10),
1533 )
1534 .await;
1535
1536 conn.set_reader_watermark(ControllableHandler::NAME, 250)
1541 .await
1542 .unwrap();
1543
1544 let reader_lo = metrics
1545 .collector_reader_lo
1546 .with_label_values(&[ControllableHandler::NAME]);
1547
1548 let mut interval = tokio::time::interval(Duration::from_millis(10));
1552 while reader_lo.get() != 250 {
1553 interval.tick().await;
1554 allow_process += 1;
1556 assert!(
1557 allow_process <= 500,
1558 "Released all checkpoints but collector never observed new reader_lo"
1559 );
1560 process_below.send(allow_process).ok();
1561 }
1562
1563 process_below.send(500).ok();
1570
1571 s_indexer.join().await.unwrap();
1572
1573 let data = store.data.get(ControllableHandler::NAME).unwrap();
1574
1575 for chkpt in (allow_process + 1)..250 {
1577 assert!(
1578 data.get(&chkpt).is_none(),
1579 "Checkpoint {chkpt} should have been skipped"
1580 );
1581 }
1582
1583 for chkpt in 250..=500 {
1585 assert!(
1586 data.get(&chkpt).is_some(),
1587 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1588 );
1589 }
1590
1591 for chkpt in 0..=10 {
1593 assert!(
1594 data.get(&chkpt).is_some(),
1595 "Checkpoint {chkpt} should have been committed (baseline)"
1596 );
1597 }
1598 }
1599}