1use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55 #[arg(long)]
62 pub first_checkpoint: Option<u64>,
63
64 #[arg(long)]
67 pub last_checkpoint: Option<u64>,
68
69 #[arg(long, action = clap::ArgAction::Append)]
72 pub pipeline: Vec<String>,
73
74 #[clap(flatten)]
76 pub task: TaskArgs,
77}
78
79#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82 #[arg(long, requires = "reader_interval_ms")]
94 task: Option<String>,
95
96 #[arg(long, requires = "task")]
107 reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111 store: S,
115
116 metrics: Arc<IndexerMetrics>,
118
119 ingestion_service: IngestionService,
121
122 first_checkpoint: Option<u64>,
125
126 last_checkpoint: Option<u64>,
129
130 latest_checkpoint: u64,
132
133 next_checkpoint: u64,
136
137 next_sequential_checkpoint: Option<u64>,
140
141 task: Option<Task>,
153
154 enabled_pipelines: Option<BTreeSet<String>>,
158
159 added_pipelines: BTreeSet<&'static str>,
162
163 pipelines: Vec<Service>,
165}
166
167#[derive(Clone)]
169pub(crate) struct Task {
170 task: String,
173 reader_interval: Duration,
176}
177
178impl TaskArgs {
179 pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
180 Self {
181 task: Some(task),
182 reader_interval_ms: Some(reader_interval_ms),
183 }
184 }
185
186 fn into_task(self) -> Option<Task> {
187 Some(Task {
188 task: self.task?,
189 reader_interval: Duration::from_millis(self.reader_interval_ms?),
190 })
191 }
192}
193
194impl<S: Store> Indexer<S> {
195 pub async fn new(
207 store: S,
208 indexer_args: IndexerArgs,
209 client_args: ClientArgs,
210 ingestion_config: IngestionConfig,
211 metrics_prefix: Option<&str>,
212 registry: &Registry,
213 ) -> Result<Self> {
214 let IndexerArgs {
215 first_checkpoint,
216 last_checkpoint,
217 pipeline,
218 task,
219 } = indexer_args;
220
221 let metrics = IndexerMetrics::new(metrics_prefix, registry);
222
223 let ingestion_service =
224 IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
225
226 let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
227
228 info!(latest_checkpoint, "Ingestion store state");
229
230 Ok(Self {
231 store,
232 metrics,
233 ingestion_service,
234 first_checkpoint,
235 last_checkpoint,
236 latest_checkpoint,
237 next_checkpoint: u64::MAX,
238 next_sequential_checkpoint: None,
239 task: task.into_task(),
240 enabled_pipelines: if pipeline.is_empty() {
241 None
242 } else {
243 Some(pipeline.into_iter().collect())
244 },
245 added_pipelines: BTreeSet::new(),
246 pipelines: vec![],
247 })
248 }
249
250 pub fn store(&self) -> &S {
252 &self.store
253 }
254
255 pub fn ingestion_client(&self) -> &IngestionClient {
257 self.ingestion_service.ingestion_client()
258 }
259
260 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
262 &self.metrics
263 }
264
265 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
267 self.ingestion_service.metrics()
268 }
269
270 pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
272 self.added_pipelines.iter().copied().filter(|p| {
273 self.enabled_pipelines
274 .as_ref()
275 .is_none_or(|e| e.contains(*p))
276 })
277 }
278
279 pub fn next_sequential_checkpoint(&self) -> Option<u64> {
283 self.next_sequential_checkpoint
284 }
285
286 pub async fn run(self) -> Result<Service> {
292 if let Some(enabled_pipelines) = self.enabled_pipelines {
293 ensure!(
294 enabled_pipelines.is_empty(),
295 "Tried to enable pipelines that this indexer does not know about: \
296 {enabled_pipelines:#?}",
297 );
298 }
299
300 let start = self.next_checkpoint;
301 let end = self.last_checkpoint;
302 info!(start, end, "Ingestion range");
303
304 let mut service = self
305 .ingestion_service
306 .run(
307 (
308 Bound::Included(start),
309 end.map_or(Bound::Unbounded, Bound::Included),
310 ),
311 self.next_sequential_checkpoint,
312 )
313 .await
314 .context("Failed to start ingestion service")?;
315
316 for pipeline in self.pipelines {
317 service = service.merge(pipeline);
318 }
319
320 Ok(service)
321 }
322
323 async fn add_pipeline<P: Processor + 'static>(
332 &mut self,
333 pipeline_task: String,
334 retention: Option<u64>,
335 ) -> Result<Option<u64>> {
336 ensure!(
337 self.added_pipelines.insert(P::NAME),
338 "Pipeline {:?} already added",
339 P::NAME,
340 );
341
342 if let Some(enabled_pipelines) = &mut self.enabled_pipelines
343 && !enabled_pipelines.remove(P::NAME)
344 {
345 info!(pipeline = P::NAME, "Skipping");
346 return Ok(None);
347 }
348
349 let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
352 first_checkpoint
353 } else if let Some(retention) = retention {
354 self.latest_checkpoint.saturating_sub(retention)
355 } else {
356 0
357 };
358 let mut conn = self.store.connect().await?;
359 let init_watermark = conn
360 .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
361 .await
362 .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
363
364 let next_checkpoint = if let Some(init_watermark) = init_watermark {
365 if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
366 checkpoint_hi_inclusive + 1
367 } else {
368 0
369 }
370 } else {
371 proposed_next_checkpoint
372 };
373
374 self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
375
376 Ok(Some(next_checkpoint))
377 }
378}
379
380impl<S: ConcurrentStore> Indexer<S> {
381 pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
388 &mut self,
389 handler: H,
390 config: ConcurrentConfig,
391 ) -> Result<()> {
392 let pipeline_task =
393 pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
394 let retention = config.pruner.as_ref().map(|p| p.retention);
395 let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
396 return Ok(());
397 };
398
399 self.pipelines.push(concurrent::pipeline::<H>(
400 handler,
401 next_checkpoint,
402 config,
403 self.store.clone(),
404 self.task.clone(),
405 self.ingestion_service.subscribe().0,
406 self.metrics.clone(),
407 ));
408
409 Ok(())
410 }
411}
412
413impl<T: SequentialStore> Indexer<T> {
414 pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
425 &mut self,
426 handler: H,
427 config: SequentialConfig,
428 ) -> Result<()> {
429 if self.task.is_some() {
430 bail!(
431 "Sequential pipelines do not support pipeline tasks. \
432 These pipelines guarantee that each checkpoint is committed exactly once and in order. \
433 Running the same pipeline under a different task would violate these guarantees."
434 );
435 }
436
437 let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
438 return Ok(());
439 };
440
441 self.next_sequential_checkpoint = Some(
443 self.next_sequential_checkpoint
444 .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
445 );
446
447 let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
448
449 self.pipelines.push(sequential::pipeline::<H>(
450 handler,
451 next_checkpoint,
452 config,
453 self.store.clone(),
454 checkpoint_rx,
455 commit_hi_tx,
456 self.metrics.clone(),
457 ));
458
459 Ok(())
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use std::sync::Arc;
466
467 use async_trait::async_trait;
468 use clap::Parser;
469 use sui_indexer_alt_framework_store_traits::PrunerWatermark;
470 use sui_synthetic_ingestion::synthetic_ingestion;
471 use tokio::sync::watch;
472
473 use crate::FieldCount;
474 use crate::config::ConcurrencyConfig;
475 use crate::ingestion::ingestion_client::IngestionClientArgs;
476 use crate::ingestion::store_client::ObjectStoreWatermark;
477 use crate::ingestion::store_client::WATERMARK_PATH;
478 use crate::mocks::store::MockStore;
479 use crate::pipeline::CommitterConfig;
480 use crate::pipeline::Processor;
481 use crate::pipeline::concurrent::ConcurrentConfig;
482 use crate::store::CommitterWatermark;
483 use crate::store::ConcurrentConnection as _;
484 use crate::store::Connection as _;
485
486 use super::*;
487
488 #[allow(dead_code)]
489 #[derive(Clone, FieldCount)]
490 struct MockValue(u64);
491
492 struct ControllableHandler {
494 process_below: watch::Receiver<u64>,
496 }
497
498 impl ControllableHandler {
499 fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
500 let (tx, rx) = watch::channel(limit);
501 (Self { process_below: rx }, tx)
502 }
503 }
504
505 #[async_trait]
506 impl Processor for ControllableHandler {
507 const NAME: &'static str = "controllable";
508 type Value = MockValue;
509
510 async fn process(
511 &self,
512 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
513 ) -> Result<Vec<Self::Value>> {
514 let cp_num = checkpoint.summary.sequence_number;
515
516 self.process_below
518 .clone()
519 .wait_for(|&limit| cp_num <= limit)
520 .await
521 .ok();
522
523 Ok(vec![MockValue(cp_num)])
524 }
525 }
526
527 #[async_trait]
528 impl concurrent::Handler for ControllableHandler {
529 type Store = MockStore;
530 type Batch = Vec<MockValue>;
531
532 fn batch(
533 &self,
534 batch: &mut Self::Batch,
535 values: &mut std::vec::IntoIter<Self::Value>,
536 ) -> concurrent::BatchStatus {
537 batch.extend(values);
538 concurrent::BatchStatus::Ready
539 }
540
541 async fn commit<'a>(
542 &self,
543 batch: &Self::Batch,
544 conn: &mut <Self::Store as Store>::Connection<'a>,
545 ) -> Result<usize> {
546 for value in batch {
547 conn.0
548 .commit_data(Self::NAME, value.0, vec![value.0])
549 .await?;
550 }
551 Ok(batch.len())
552 }
553 }
554
555 macro_rules! test_pipeline {
556 ($handler:ident, $name:literal) => {
557 struct $handler;
558
559 #[async_trait]
560 impl Processor for $handler {
561 const NAME: &'static str = $name;
562 type Value = MockValue;
563 async fn process(
564 &self,
565 checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
566 ) -> Result<Vec<Self::Value>> {
567 Ok(vec![MockValue(checkpoint.summary.sequence_number)])
568 }
569 }
570
571 #[async_trait]
572 impl crate::pipeline::concurrent::Handler for $handler {
573 type Store = MockStore;
574 type Batch = Vec<Self::Value>;
575
576 fn batch(
577 &self,
578 batch: &mut Self::Batch,
579 values: &mut std::vec::IntoIter<Self::Value>,
580 ) -> crate::pipeline::concurrent::BatchStatus {
581 batch.extend(values);
582 crate::pipeline::concurrent::BatchStatus::Pending
583 }
584
585 async fn commit<'a>(
586 &self,
587 batch: &Self::Batch,
588 conn: &mut <Self::Store as Store>::Connection<'a>,
589 ) -> Result<usize> {
590 for value in batch {
591 conn.0
592 .commit_data(Self::NAME, value.0, vec![value.0])
593 .await?;
594 }
595 Ok(batch.len())
596 }
597 }
598
599 #[async_trait]
600 impl crate::pipeline::sequential::Handler for $handler {
601 type Store = MockStore;
602 type Batch = Vec<Self::Value>;
603
604 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
605 batch.extend(values);
606 }
607
608 async fn commit<'a>(
609 &self,
610 _batch: &Self::Batch,
611 _conn: &mut <Self::Store as Store>::Connection<'a>,
612 ) -> Result<usize> {
613 Ok(1)
614 }
615 }
616 };
617 }
618
619 test_pipeline!(MockHandler, "test_processor");
620 test_pipeline!(SequentialHandler, "sequential_handler");
621 test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
622
623 fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
624 let dir = tempfile::tempdir().unwrap();
625 if let Some(cp) = latest_checkpoint {
626 let watermark_path = dir.path().join(WATERMARK_PATH);
627 std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
628 let watermark = ObjectStoreWatermark {
629 checkpoint_hi_inclusive: cp,
630 };
631 std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
632 }
633 dir
634 }
635
636 async fn create_test_indexer(
639 store: MockStore,
640 indexer_args: IndexerArgs,
641 registry: &Registry,
642 ingestion_data: Option<(u64, u64)>,
643 ) -> (Indexer<MockStore>, tempfile::TempDir) {
644 let temp_dir = init_ingestion_dir(None);
645 if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
646 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
647 ingestion_dir: temp_dir.path().to_owned(),
648 starting_checkpoint: 0,
649 num_checkpoints,
650 checkpoint_size,
651 })
652 .await;
653 }
654 let client_args = ClientArgs {
655 ingestion: IngestionClientArgs {
656 local_ingestion_path: Some(temp_dir.path().to_owned()),
657 ..Default::default()
658 },
659 ..Default::default()
660 };
661 let indexer = Indexer::new(
662 store,
663 indexer_args,
664 client_args,
665 IngestionConfig::default(),
666 None,
667 registry,
668 )
669 .await
670 .unwrap();
671 (indexer, temp_dir)
672 }
673
674 async fn set_committer_watermark(
675 conn: &mut <MockStore as Store>::Connection<'_>,
676 name: &str,
677 hi: u64,
678 ) {
679 conn.set_committer_watermark(
680 name,
681 CommitterWatermark {
682 checkpoint_hi_inclusive: hi,
683 ..Default::default()
684 },
685 )
686 .await
687 .unwrap();
688 }
689
690 async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
691 indexer: &mut Indexer<MockStore>,
692 handler: H,
693 ) {
694 indexer
695 .concurrent_pipeline(handler, ConcurrentConfig::default())
696 .await
697 .unwrap();
698 }
699
700 async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
701 indexer: &mut Indexer<MockStore>,
702 handler: H,
703 ) {
704 indexer
705 .sequential_pipeline(handler, SequentialConfig::default())
706 .await
707 .unwrap();
708 }
709
710 macro_rules! assert_out_of_order {
711 ($metrics:expr, $pipeline:expr, $expected:expr) => {
712 assert_eq!(
713 $metrics
714 .total_watermarks_out_of_order
715 .get_metric_with_label_values(&[$pipeline])
716 .unwrap()
717 .get(),
718 $expected,
719 );
720 };
721 }
722
723 async fn test_init_watermark(
724 first_checkpoint: Option<u64>,
725 is_concurrent: bool,
726 ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
727 let registry = Registry::new();
728 let store = MockStore::default();
729
730 test_pipeline!(A, "pipeline_name");
731
732 let mut conn = store.connect().await.unwrap();
733
734 let indexer_args = IndexerArgs {
735 first_checkpoint,
736 ..IndexerArgs::default()
737 };
738 let (mut indexer, _temp_dir) =
739 create_test_indexer(store.clone(), indexer_args, ®istry, None).await;
740
741 if is_concurrent {
742 add_concurrent(&mut indexer, A).await;
743 } else {
744 add_sequential(&mut indexer, A).await;
745 }
746
747 (
748 conn.committer_watermark(A::NAME).await.unwrap(),
749 conn.pruner_watermark(A::NAME, Duration::ZERO)
750 .await
751 .unwrap(),
752 )
753 }
754
755 const LATEST_CHECKPOINT: u64 = 10;
756
757 async fn test_next_checkpoint(
762 watermark: Option<u64>,
763 first_checkpoint: Option<u64>,
764 concurrent_config: ConcurrentConfig,
765 ) -> Indexer<MockStore> {
766 let registry = Registry::new();
767 let store = MockStore::default();
768
769 test_pipeline!(A, "concurrent_a");
770
771 if let Some(checkpoint_hi_inclusive) = watermark {
772 let mut conn = store.connect().await.unwrap();
773 conn.set_committer_watermark(
774 A::NAME,
775 CommitterWatermark {
776 checkpoint_hi_inclusive,
777 ..Default::default()
778 },
779 )
780 .await
781 .unwrap();
782 }
783
784 let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
785 let mut indexer = Indexer::new(
786 store,
787 IndexerArgs {
788 first_checkpoint,
789 ..Default::default()
790 },
791 ClientArgs {
792 ingestion: IngestionClientArgs {
793 local_ingestion_path: Some(temp_dir.path().to_owned()),
794 ..Default::default()
795 },
796 ..Default::default()
797 },
798 IngestionConfig::default(),
799 None,
800 ®istry,
801 )
802 .await
803 .unwrap();
804
805 assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
806
807 indexer
808 .concurrent_pipeline::<A>(A, concurrent_config)
809 .await
810 .unwrap();
811
812 indexer
813 }
814
815 fn pruner_config(retention: u64) -> ConcurrentConfig {
816 ConcurrentConfig {
817 pruner: Some(concurrent::PrunerConfig {
818 retention,
819 ..Default::default()
820 }),
821 ..Default::default()
822 }
823 }
824
825 #[test]
826 fn test_arg_parsing() {
827 #[derive(Parser)]
828 struct Args {
829 #[clap(flatten)]
830 indexer: IndexerArgs,
831 }
832
833 let args = Args::try_parse_from([
834 "cmd",
835 "--first-checkpoint",
836 "10",
837 "--last-checkpoint",
838 "100",
839 "--pipeline",
840 "a",
841 "--pipeline",
842 "b",
843 "--task",
844 "t",
845 "--reader-interval-ms",
846 "5000",
847 ])
848 .unwrap();
849
850 assert_eq!(args.indexer.first_checkpoint, Some(10));
851 assert_eq!(args.indexer.last_checkpoint, Some(100));
852 assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
853 assert_eq!(args.indexer.task.task, Some("t".to_owned()));
854 assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
855 }
856
857 #[tokio::test]
859 async fn test_next_checkpoint_all_pipelines_have_watermarks() {
860 let registry = Registry::new();
861 let store = MockStore::default();
862
863 test_pipeline!(A, "concurrent_a");
864 test_pipeline!(B, "concurrent_b");
865 test_pipeline!(C, "sequential_c");
866 test_pipeline!(D, "sequential_d");
867
868 let mut conn = store.connect().await.unwrap();
869
870 conn.init_watermark(A::NAME, Some(0)).await.unwrap();
871 set_committer_watermark(&mut conn, A::NAME, 100).await;
872
873 conn.init_watermark(B::NAME, Some(0)).await.unwrap();
874 set_committer_watermark(&mut conn, B::NAME, 10).await;
875
876 conn.init_watermark(C::NAME, Some(0)).await.unwrap();
877 set_committer_watermark(&mut conn, C::NAME, 1).await;
878
879 conn.init_watermark(D::NAME, Some(0)).await.unwrap();
880 set_committer_watermark(&mut conn, D::NAME, 50).await;
881
882 let (mut indexer, _temp_dir) =
883 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
884
885 add_concurrent(&mut indexer, A).await;
886 add_concurrent(&mut indexer, B).await;
887 add_sequential(&mut indexer, C).await;
888 add_sequential(&mut indexer, D).await;
889
890 assert_eq!(indexer.first_checkpoint, None);
891 assert_eq!(indexer.last_checkpoint, None);
892 assert_eq!(indexer.latest_checkpoint, 0);
893 assert_eq!(indexer.next_checkpoint, 2);
894 assert_eq!(indexer.next_sequential_checkpoint, Some(2));
895 }
896
897 #[tokio::test]
899 async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
900 let registry = Registry::new();
901 let store = MockStore::default();
902
903 test_pipeline!(A, "concurrent_a");
904 test_pipeline!(B, "concurrent_b");
905 test_pipeline!(C, "sequential_c");
906 test_pipeline!(D, "sequential_d");
907
908 let mut conn = store.connect().await.unwrap();
909 set_committer_watermark(&mut conn, B::NAME, 10).await;
910 set_committer_watermark(&mut conn, C::NAME, 1).await;
911
912 let (mut indexer, _temp_dir) =
913 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
914
915 add_concurrent(&mut indexer, A).await;
916 add_concurrent(&mut indexer, B).await;
917 add_sequential(&mut indexer, C).await;
918 add_sequential(&mut indexer, D).await;
919
920 assert_eq!(indexer.first_checkpoint, None);
921 assert_eq!(indexer.last_checkpoint, None);
922 assert_eq!(indexer.latest_checkpoint, 0);
923 assert_eq!(indexer.next_checkpoint, 0);
924 assert_eq!(indexer.next_sequential_checkpoint, Some(0));
925 }
926
927 #[tokio::test]
929 async fn test_next_checkpoint_smallest_is_0() {
930 let registry = Registry::new();
931 let store = MockStore::default();
932
933 test_pipeline!(A, "concurrent_a");
934 test_pipeline!(B, "concurrent_b");
935 test_pipeline!(C, "sequential_c");
936 test_pipeline!(D, "sequential_d");
937
938 let mut conn = store.connect().await.unwrap();
939 set_committer_watermark(&mut conn, A::NAME, 100).await;
940 set_committer_watermark(&mut conn, B::NAME, 10).await;
941 set_committer_watermark(&mut conn, C::NAME, 1).await;
942 set_committer_watermark(&mut conn, D::NAME, 0).await;
943
944 let (mut indexer, _temp_dir) =
945 create_test_indexer(store, IndexerArgs::default(), ®istry, None).await;
946
947 add_concurrent(&mut indexer, A).await;
948 add_concurrent(&mut indexer, B).await;
949 add_sequential(&mut indexer, C).await;
950 add_sequential(&mut indexer, D).await;
951
952 assert_eq!(indexer.next_checkpoint, 1);
953 }
954
955 #[tokio::test]
958 async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
959 let registry = Registry::new();
960 let store = MockStore::default();
961
962 test_pipeline!(A, "concurrent_a");
963 test_pipeline!(B, "concurrent_b");
964 test_pipeline!(C, "sequential_c");
965 test_pipeline!(D, "sequential_d");
966
967 let mut conn = store.connect().await.unwrap();
968 set_committer_watermark(&mut conn, B::NAME, 50).await;
969 set_committer_watermark(&mut conn, C::NAME, 10).await;
970
971 let indexer_args = IndexerArgs {
972 first_checkpoint: Some(5),
973 ..Default::default()
974 };
975 let (mut indexer, _temp_dir) =
976 create_test_indexer(store, indexer_args, ®istry, None).await;
977
978 add_concurrent(&mut indexer, A).await;
979 add_concurrent(&mut indexer, B).await;
980 add_sequential(&mut indexer, C).await;
981 add_sequential(&mut indexer, D).await;
982
983 assert_eq!(indexer.first_checkpoint, Some(5));
984 assert_eq!(indexer.last_checkpoint, None);
985 assert_eq!(indexer.latest_checkpoint, 0);
986 assert_eq!(indexer.next_checkpoint, 5);
987 assert_eq!(indexer.next_sequential_checkpoint, Some(5));
988 }
989
990 #[tokio::test]
993 async fn test_next_checkpoint_ignore_first_checkpoint() {
994 let registry = Registry::new();
995 let store = MockStore::default();
996
997 test_pipeline!(B, "concurrent_b");
998 test_pipeline!(C, "sequential_c");
999
1000 let mut conn = store.connect().await.unwrap();
1001 set_committer_watermark(&mut conn, B::NAME, 50).await;
1002 set_committer_watermark(&mut conn, C::NAME, 10).await;
1003
1004 let indexer_args = IndexerArgs {
1005 first_checkpoint: Some(5),
1006 ..Default::default()
1007 };
1008 let (mut indexer, _temp_dir) =
1009 create_test_indexer(store, indexer_args, ®istry, None).await;
1010
1011 add_concurrent(&mut indexer, B).await;
1012 add_sequential(&mut indexer, C).await;
1013
1014 assert_eq!(indexer.first_checkpoint, Some(5));
1015 assert_eq!(indexer.last_checkpoint, None);
1016 assert_eq!(indexer.latest_checkpoint, 0);
1017 assert_eq!(indexer.next_checkpoint, 11);
1018 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1019 }
1020
1021 #[tokio::test]
1025 async fn test_next_checkpoint_large_first_checkpoint() {
1026 let registry = Registry::new();
1027 let store = MockStore::default();
1028
1029 test_pipeline!(A, "concurrent_a");
1030 test_pipeline!(B, "concurrent_b");
1031 test_pipeline!(C, "sequential_c");
1032
1033 let mut conn = store.connect().await.unwrap();
1034 set_committer_watermark(&mut conn, B::NAME, 50).await;
1035 set_committer_watermark(&mut conn, C::NAME, 10).await;
1036
1037 let indexer_args = IndexerArgs {
1038 first_checkpoint: Some(24),
1039 ..Default::default()
1040 };
1041 let (mut indexer, _temp_dir) =
1042 create_test_indexer(store, indexer_args, ®istry, None).await;
1043
1044 add_concurrent(&mut indexer, A).await;
1045 add_concurrent(&mut indexer, B).await;
1046 add_sequential(&mut indexer, C).await;
1047
1048 assert_eq!(indexer.first_checkpoint, Some(24));
1049 assert_eq!(indexer.last_checkpoint, None);
1050 assert_eq!(indexer.latest_checkpoint, 0);
1051 assert_eq!(indexer.next_checkpoint, 11);
1052 assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1053 }
1054
1055 #[tokio::test]
1057 async fn test_latest_checkpoint_from_watermark() {
1058 let registry = Registry::new();
1059 let store = MockStore::default();
1060 let temp_dir = init_ingestion_dir(Some(30));
1061 let indexer = Indexer::new(
1062 store,
1063 IndexerArgs::default(),
1064 ClientArgs {
1065 ingestion: IngestionClientArgs {
1066 local_ingestion_path: Some(temp_dir.path().to_owned()),
1067 ..Default::default()
1068 },
1069 ..Default::default()
1070 },
1071 IngestionConfig::default(),
1072 None,
1073 ®istry,
1074 )
1075 .await
1076 .unwrap();
1077
1078 assert_eq!(indexer.latest_checkpoint, 30);
1079 }
1080
1081 #[tokio::test]
1084 async fn test_next_checkpoint_with_pruner_uses_retention() {
1085 let retention = LATEST_CHECKPOINT - 1;
1086 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1087 assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1088 }
1089
1090 #[tokio::test]
1092 async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1093 let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1094 assert_eq!(indexer.next_checkpoint, 0);
1095 }
1096
1097 #[tokio::test]
1099 async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1100 let retention = LATEST_CHECKPOINT - 1;
1101 let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1102 assert_eq!(indexer.next_checkpoint, 6);
1103 }
1104
1105 #[tokio::test]
1108 async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1109 let retention = LATEST_CHECKPOINT - 1;
1110 let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1111 assert_eq!(indexer.next_checkpoint, 2);
1112 }
1113
1114 #[tokio::test]
1116 async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1117 let retention = LATEST_CHECKPOINT + 1;
1118 let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1119 assert_eq!(indexer.next_checkpoint, 0);
1120 }
1121
1122 #[tokio::test]
1124 async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1125 let registry = Registry::new();
1126 let store = MockStore::default();
1127
1128 test_pipeline!(A, "concurrent_a");
1129 test_pipeline!(B, "concurrent_b");
1130 test_pipeline!(C, "sequential_c");
1131 test_pipeline!(D, "sequential_d");
1132
1133 let mut conn = store.connect().await.unwrap();
1134 set_committer_watermark(&mut conn, A::NAME, 5).await;
1135 set_committer_watermark(&mut conn, B::NAME, 10).await;
1136 set_committer_watermark(&mut conn, C::NAME, 15).await;
1137 set_committer_watermark(&mut conn, D::NAME, 20).await;
1138
1139 let indexer_args = IndexerArgs {
1140 last_checkpoint: Some(29),
1141 ..Default::default()
1142 };
1143 let (mut indexer, _temp_dir) =
1144 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1145
1146 add_concurrent(&mut indexer, A).await;
1147 add_concurrent(&mut indexer, B).await;
1148 add_sequential(&mut indexer, C).await;
1149 add_sequential(&mut indexer, D).await;
1150
1151 let ingestion_metrics = indexer.ingestion_metrics().clone();
1152 let indexer_metrics = indexer.indexer_metrics().clone();
1153
1154 indexer.run().await.unwrap().join().await.unwrap();
1155
1156 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1157 assert_out_of_order!(indexer_metrics, A::NAME, 0);
1158 assert_out_of_order!(indexer_metrics, B::NAME, 5);
1159 assert_out_of_order!(indexer_metrics, C::NAME, 10);
1160 assert_out_of_order!(indexer_metrics, D::NAME, 15);
1161 }
1162
1163 #[tokio::test]
1165 async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1166 let registry = Registry::new();
1167 let store = MockStore::default();
1168
1169 test_pipeline!(A, "concurrent_a");
1170 test_pipeline!(B, "concurrent_b");
1171 test_pipeline!(C, "sequential_c");
1172 test_pipeline!(D, "sequential_d");
1173
1174 let mut conn = store.connect().await.unwrap();
1175 set_committer_watermark(&mut conn, A::NAME, 5).await;
1176 set_committer_watermark(&mut conn, B::NAME, 10).await;
1177 set_committer_watermark(&mut conn, C::NAME, 15).await;
1178 set_committer_watermark(&mut conn, D::NAME, 20).await;
1179
1180 let indexer_args = IndexerArgs {
1181 first_checkpoint: Some(3),
1182 last_checkpoint: Some(29),
1183 ..Default::default()
1184 };
1185 let (mut indexer, _temp_dir) =
1186 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1187
1188 add_concurrent(&mut indexer, A).await;
1189 add_concurrent(&mut indexer, B).await;
1190 add_sequential(&mut indexer, C).await;
1191 add_sequential(&mut indexer, D).await;
1192
1193 let ingestion_metrics = indexer.ingestion_metrics().clone();
1194 let metrics = indexer.indexer_metrics().clone();
1195 indexer.run().await.unwrap().join().await.unwrap();
1196
1197 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1198 assert_out_of_order!(metrics, A::NAME, 0);
1199 assert_out_of_order!(metrics, B::NAME, 5);
1200 assert_out_of_order!(metrics, C::NAME, 10);
1201 assert_out_of_order!(metrics, D::NAME, 15);
1202 }
1203
1204 #[tokio::test]
1206 async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1207 let registry = Registry::new();
1208 let store = MockStore::default();
1209
1210 test_pipeline!(A, "concurrent_a");
1211 test_pipeline!(B, "concurrent_b");
1212 test_pipeline!(C, "sequential_c");
1213 test_pipeline!(D, "sequential_d");
1214
1215 let mut conn = store.connect().await.unwrap();
1216 set_committer_watermark(&mut conn, B::NAME, 10).await;
1217 set_committer_watermark(&mut conn, C::NAME, 15).await;
1218 set_committer_watermark(&mut conn, D::NAME, 20).await;
1219
1220 let indexer_args = IndexerArgs {
1221 last_checkpoint: Some(29),
1222 ..Default::default()
1223 };
1224 let (mut indexer, _temp_dir) =
1225 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1226
1227 add_concurrent(&mut indexer, A).await;
1228 add_concurrent(&mut indexer, B).await;
1229 add_sequential(&mut indexer, C).await;
1230 add_sequential(&mut indexer, D).await;
1231
1232 let ingestion_metrics = indexer.ingestion_metrics().clone();
1233 let metrics = indexer.indexer_metrics().clone();
1234 indexer.run().await.unwrap().join().await.unwrap();
1235
1236 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1237 assert_out_of_order!(metrics, A::NAME, 0);
1238 assert_out_of_order!(metrics, B::NAME, 11);
1239 assert_out_of_order!(metrics, C::NAME, 16);
1240 assert_out_of_order!(metrics, D::NAME, 21);
1241 }
1242
1243 #[tokio::test]
1245 async fn test_indexer_ingestion_use_first_checkpoint() {
1246 let registry = Registry::new();
1247 let store = MockStore::default();
1248
1249 test_pipeline!(A, "concurrent_a");
1250 test_pipeline!(B, "concurrent_b");
1251 test_pipeline!(C, "sequential_c");
1252 test_pipeline!(D, "sequential_d");
1253
1254 let mut conn = store.connect().await.unwrap();
1255 set_committer_watermark(&mut conn, B::NAME, 10).await;
1256 set_committer_watermark(&mut conn, C::NAME, 15).await;
1257 set_committer_watermark(&mut conn, D::NAME, 20).await;
1258
1259 let indexer_args = IndexerArgs {
1260 first_checkpoint: Some(10),
1261 last_checkpoint: Some(29),
1262 ..Default::default()
1263 };
1264 let (mut indexer, _temp_dir) =
1265 create_test_indexer(store.clone(), indexer_args, ®istry, Some((30, 1))).await;
1266
1267 add_concurrent(&mut indexer, A).await;
1268 add_concurrent(&mut indexer, B).await;
1269 add_sequential(&mut indexer, C).await;
1270 add_sequential(&mut indexer, D).await;
1271
1272 let ingestion_metrics = indexer.ingestion_metrics().clone();
1273 let metrics = indexer.indexer_metrics().clone();
1274 indexer.run().await.unwrap().join().await.unwrap();
1275
1276 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1277 assert_out_of_order!(metrics, A::NAME, 0);
1278 assert_out_of_order!(metrics, B::NAME, 1);
1279 assert_out_of_order!(metrics, C::NAME, 6);
1280 assert_out_of_order!(metrics, D::NAME, 11);
1281 }
1282
1283 #[tokio::test]
1284 async fn test_init_watermark_concurrent_no_first_checkpoint() {
1285 let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1286 assert_eq!(committer_watermark, None);
1287 assert_eq!(pruner_watermark, None);
1288 }
1289
1290 #[tokio::test]
1291 async fn test_init_watermark_concurrent_first_checkpoint_0() {
1292 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1293 assert_eq!(committer_watermark, None);
1294 assert_eq!(pruner_watermark, None);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_init_watermark_concurrent_first_checkpoint_1() {
1299 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1300
1301 let committer_watermark = committer_watermark.unwrap();
1302 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1303
1304 let pruner_watermark = pruner_watermark.unwrap();
1305 assert_eq!(pruner_watermark.reader_lo, 1);
1306 assert_eq!(pruner_watermark.pruner_hi, 1);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_init_watermark_sequential() {
1311 let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1312
1313 let committer_watermark = committer_watermark.unwrap();
1314 assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1315
1316 let pruner_watermark = pruner_watermark.unwrap();
1317 assert_eq!(pruner_watermark.reader_lo, 1);
1318 assert_eq!(pruner_watermark.pruner_hi, 1);
1319 }
1320
1321 #[tokio::test]
1322 async fn test_multiple_sequential_pipelines_next_checkpoint() {
1323 let registry = Registry::new();
1324 let store = MockStore::default();
1325
1326 let mut conn = store.connect().await.unwrap();
1327 set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1328 set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1329
1330 let indexer_args = IndexerArgs {
1331 first_checkpoint: None,
1332 last_checkpoint: Some(19),
1333 pipeline: vec![],
1334 ..Default::default()
1335 };
1336 let (mut indexer, _temp_dir) =
1337 create_test_indexer(store.clone(), indexer_args, ®istry, Some((20, 2))).await;
1338
1339 add_sequential(&mut indexer, MockHandler).await;
1341
1342 assert_eq!(
1344 indexer.next_sequential_checkpoint(),
1345 Some(11),
1346 "next_sequential_checkpoint should be 11"
1347 );
1348
1349 add_sequential(&mut indexer, SequentialHandler).await;
1351
1352 assert_eq!(
1354 indexer.next_sequential_checkpoint(),
1355 Some(6),
1356 "next_sequential_checkpoint should still be 6"
1357 );
1358
1359 indexer.run().await.unwrap().join().await.unwrap();
1361
1362 let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1364 let watermark2 = conn
1365 .committer_watermark(SequentialHandler::NAME)
1366 .await
1367 .unwrap();
1368
1369 assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1370 assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1371 }
1372
1373 #[tokio::test]
1377 async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1378 let registry = Registry::new();
1379 let store = MockStore::default();
1380
1381 let mut conn = store.connect().await.unwrap();
1384 set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1385 conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1386 .await
1387 .unwrap();
1388
1389 let indexer_args = IndexerArgs {
1392 first_checkpoint: Some(0),
1393 last_checkpoint: Some(15),
1394 task: TaskArgs::tasked("task".to_string(), 10),
1395 ..Default::default()
1396 };
1397 let (mut tasked_indexer, _temp_dir) =
1398 create_test_indexer(store.clone(), indexer_args, ®istry, Some((16, 2))).await;
1399
1400 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1401
1402 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1403 let metrics = tasked_indexer.indexer_metrics().clone();
1404
1405 tasked_indexer.run().await.unwrap().join().await.unwrap();
1406
1407 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1408 assert_eq!(
1409 metrics
1410 .total_collector_skipped_checkpoints
1411 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1412 .unwrap()
1413 .get(),
1414 7
1415 );
1416 let data = store
1417 .data
1418 .get(MockCheckpointSequenceNumberHandler::NAME)
1419 .unwrap();
1420 assert_eq!(data.len(), 9);
1421 for i in 0..7 {
1422 assert!(data.get(&i).is_none());
1423 }
1424 for i in 7..16 {
1425 assert!(data.get(&i).is_some());
1426 }
1427 }
1428
1429 #[tokio::test]
1431 async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1432 let registry = Registry::new();
1433 let store = MockStore::default();
1434
1435 let mut conn = store.connect().await.unwrap();
1436 set_committer_watermark(&mut conn, "test", 10).await;
1437 conn.set_reader_watermark("test", 5).await.unwrap();
1438
1439 let indexer_args = IndexerArgs {
1442 first_checkpoint: Some(9),
1443 last_checkpoint: Some(25),
1444 task: TaskArgs::tasked("task".to_string(), 10),
1445 ..Default::default()
1446 };
1447 let (mut tasked_indexer, _temp_dir) =
1448 create_test_indexer(store.clone(), indexer_args, ®istry, Some((26, 2))).await;
1449
1450 add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1451
1452 let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1453 let metrics = tasked_indexer.indexer_metrics().clone();
1454
1455 tasked_indexer.run().await.unwrap().join().await.unwrap();
1456
1457 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1458 assert_out_of_order!(metrics, "test", 0);
1459 assert_eq!(
1460 metrics
1461 .total_collector_skipped_checkpoints
1462 .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1463 .unwrap()
1464 .get(),
1465 0
1466 );
1467
1468 let data = store.data.get("test").unwrap();
1469 assert_eq!(data.len(), 17);
1470 for i in 0..9 {
1471 assert!(data.get(&i).is_none());
1472 }
1473 for i in 9..26 {
1474 assert!(data.get(&i).is_some());
1475 }
1476 let main_pipeline_watermark = store.watermark("test").unwrap();
1477 assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1479 assert_eq!(main_pipeline_watermark.reader_lo, 5);
1480 let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1481 assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1482 assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1483 }
1484
1485 #[tokio::test]
1488 async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1489 let registry = Registry::new();
1490 let store = MockStore::default();
1491 let mut conn = store.connect().await.unwrap();
1492 set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1494
1495 let indexer_args = IndexerArgs {
1497 first_checkpoint: Some(0),
1498 last_checkpoint: Some(500),
1499 task: TaskArgs::tasked("task".to_string(), 10 ),
1500 ..Default::default()
1501 };
1502 let (mut tasked_indexer, _temp_dir) =
1503 create_test_indexer(store.clone(), indexer_args, ®istry, Some((501, 2))).await;
1504 let mut allow_process = 10;
1505 let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1507 let _ = tasked_indexer
1508 .concurrent_pipeline(
1509 controllable_handler,
1510 ConcurrentConfig {
1511 committer: CommitterConfig {
1512 collect_interval_ms: 10,
1513 watermark_interval_ms: 10,
1514 ..Default::default()
1515 },
1516 fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1519 ..Default::default()
1520 },
1521 )
1522 .await;
1523 let metrics = tasked_indexer.indexer_metrics().clone();
1524
1525 let mut s_indexer = tasked_indexer.run().await.unwrap();
1526
1527 store
1531 .wait_for_watermark(
1532 &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1533 10,
1534 Duration::from_secs(10),
1535 )
1536 .await;
1537
1538 conn.set_reader_watermark(ControllableHandler::NAME, 250)
1543 .await
1544 .unwrap();
1545
1546 let reader_lo = metrics
1547 .collector_reader_lo
1548 .with_label_values(&[ControllableHandler::NAME]);
1549
1550 let mut interval = tokio::time::interval(Duration::from_millis(10));
1554 while reader_lo.get() != 250 {
1555 interval.tick().await;
1556 allow_process += 1;
1558 assert!(
1559 allow_process <= 500,
1560 "Released all checkpoints but collector never observed new reader_lo"
1561 );
1562 process_below.send(allow_process).ok();
1563 }
1564
1565 process_below.send(500).ok();
1572
1573 s_indexer.join().await.unwrap();
1574
1575 let data = store.data.get(ControllableHandler::NAME).unwrap();
1576
1577 for chkpt in (allow_process + 1)..250 {
1579 assert!(
1580 data.get(&chkpt).is_none(),
1581 "Checkpoint {chkpt} should have been skipped"
1582 );
1583 }
1584
1585 for chkpt in 250..=500 {
1587 assert!(
1588 data.get(&chkpt).is_some(),
1589 "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1590 );
1591 }
1592
1593 for chkpt in 0..=10 {
1595 assert!(
1596 data.get(&chkpt).is_some(),
1597 "Checkpoint {chkpt} should have been committed (baseline)"
1598 );
1599 }
1600 }
1601}