sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36/// External users access the store trait through framework::store
37pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52/// Command-line arguments for the indexer
53#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55    /// Override the next checkpoint for all pipelines without a committer watermark to start
56    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
57    /// setting and always resume from their committer watermark + 1.
58    ///
59    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
60    /// is the minimum across all pipelines' next checkpoints.
61    #[arg(long)]
62    pub first_checkpoint: Option<u64>,
63
64    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
65    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
66    #[arg(long)]
67    pub last_checkpoint: Option<u64>,
68
69    /// Only run the following pipelines. If not provided, all pipelines found in the
70    /// configuration file will be run.
71    #[arg(long, action = clap::ArgAction::Append)]
72    pub pipeline: Vec<String>,
73
74    /// Additional configurations for running a tasked indexer.
75    #[clap(flatten)]
76    pub task: TaskArgs,
77}
78
79/// Command-line arguments for configuring a tasked indexer.
80#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
83    /// delimiter defined on the store. This allows the same pipelines to run under multiple
84    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
85    /// entries in the database.
86    ///
87    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
88    ///
89    /// Sequential pipelines cannot be attached to a tasked indexer.
90    ///
91    /// The framework ensures that tasked pipelines never commit checkpoints below the main
92    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
93    #[arg(long, requires = "reader_interval_ms")]
94    task: Option<String>,
95
96    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
97    /// refetch its main pipeline's reader watermark.
98    ///
99    /// This is required when `--task` is set and should should ideally be set to a value that is
100    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
101    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
102    /// it.
103    ///
104    /// If the main pipeline does not have pruning enabled, this value can be set to some high
105    /// value, as the tasked pipeline will never see an updated reader watermark.
106    #[arg(long, requires = "task")]
107    reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111    /// The storage backend that the indexer uses to write and query indexed data. This
112    /// generic implementation allows for plugging in different storage solutions that implement the
113    /// `Store` trait.
114    store: S,
115
116    /// Prometheus Metrics.
117    metrics: Arc<IndexerMetrics>,
118
119    /// Service for downloading and disseminating checkpoint data.
120    ingestion_service: IngestionService,
121
122    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
123    /// watermark will start processing at this checkpoint.
124    first_checkpoint: Option<u64>,
125
126    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
127    /// this checkpoint.
128    last_checkpoint: Option<u64>,
129
130    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
131    /// to start ingesting from.
132    next_checkpoint: u64,
133
134    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
135    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
136    next_sequential_checkpoint: Option<u64>,
137
138    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
139    /// delimiter defined on the store. This allows the same pipelines to run under multiple
140    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
141    /// entries in the database.
142    ///
143    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
144    ///
145    /// Sequential pipelines cannot be attached to a tasked indexer.
146    ///
147    /// The framework ensures that tasked pipelines never commit checkpoints below the main
148    /// pipeline’s pruner watermark.
149    task: Option<Task>,
150
151    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
152    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
153    /// a warning when the indexer is run.
154    enabled_pipelines: Option<BTreeSet<String>>,
155
156    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
157    /// with the same name isn't added twice.
158    added_pipelines: BTreeSet<&'static str>,
159
160    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
161    pipelines: Vec<Service>,
162}
163
164/// Configuration for a tasked indexer.
165#[derive(Clone)]
166pub(crate) struct Task {
167    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
168    /// record pipeline watermarks.
169    task: String,
170    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
171    /// pipeline's reader watermark.
172    reader_interval: Duration,
173}
174
175impl TaskArgs {
176    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
177        Self {
178            task: Some(task),
179            reader_interval_ms: Some(reader_interval_ms),
180        }
181    }
182
183    fn into_task(self) -> Option<Task> {
184        Some(Task {
185            task: self.task?,
186            reader_interval: Duration::from_millis(self.reader_interval_ms?),
187        })
188    }
189}
190
191impl<S: Store> Indexer<S> {
192    /// Create a new instance of the indexer framework from a store that implements the `Store`
193    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
194    /// arguments configure the following:
195    ///
196    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
197    ///   watermarks) and where to serve metrics from,
198    /// - Where to download checkpoints from,
199    /// - Concurrency and buffering parameters for downloading checkpoints.
200    ///
201    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
202    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
203    pub async fn new(
204        store: S,
205        indexer_args: IndexerArgs,
206        client_args: ClientArgs,
207        ingestion_config: IngestionConfig,
208        metrics_prefix: Option<&str>,
209        registry: &Registry,
210    ) -> Result<Self> {
211        let IndexerArgs {
212            first_checkpoint,
213            last_checkpoint,
214            pipeline,
215            task,
216        } = indexer_args;
217
218        let metrics = IndexerMetrics::new(metrics_prefix, registry);
219
220        let ingestion_service =
221            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
222
223        Ok(Self {
224            store,
225            metrics,
226            ingestion_service,
227            first_checkpoint,
228            last_checkpoint,
229            next_checkpoint: u64::MAX,
230            next_sequential_checkpoint: None,
231            task: task.into_task(),
232            enabled_pipelines: if pipeline.is_empty() {
233                None
234            } else {
235                Some(pipeline.into_iter().collect())
236            },
237            added_pipelines: BTreeSet::new(),
238            pipelines: vec![],
239        })
240    }
241
242    /// The store used by the indexer.
243    pub fn store(&self) -> &S {
244        &self.store
245    }
246
247    /// The ingestion client used by the indexer to fetch checkpoints.
248    pub fn ingestion_client(&self) -> &IngestionClient {
249        self.ingestion_service.ingestion_client()
250    }
251
252    /// The indexer's metrics.
253    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
254        &self.metrics
255    }
256
257    /// The ingestion service's metrics.
258    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
259        self.ingestion_service.metrics()
260    }
261
262    /// The pipelines that this indexer will run.
263    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
264        self.added_pipelines.iter().copied().filter(|p| {
265            self.enabled_pipelines
266                .as_ref()
267                .is_none_or(|e| e.contains(*p))
268        })
269    }
270
271    /// The minimum next checkpoint across all sequential pipelines. This value is used to
272    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
273    /// too far ahead of sequential pipelines.
274    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
275        self.next_sequential_checkpoint
276    }
277
278    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
279    /// will start processing and committing once the ingestion service has caught up to their
280    /// respective watermarks.
281    ///
282    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
283    pub async fn run(self) -> anyhow::Result<Service> {
284        if let Some(enabled_pipelines) = self.enabled_pipelines {
285            ensure!(
286                enabled_pipelines.is_empty(),
287                "Tried to enable pipelines that this indexer does not know about: \
288                {enabled_pipelines:#?}",
289            );
290        }
291
292        let start = self.next_checkpoint;
293        let end = self.last_checkpoint;
294        info!(start, end, "Ingestion range");
295
296        let mut service = self
297            .ingestion_service
298            .run(
299                (
300                    Bound::Included(start),
301                    end.map_or(Bound::Unbounded, Bound::Included),
302                ),
303                self.next_sequential_checkpoint,
304            )
305            .await
306            .context("Failed to start ingestion service")?;
307
308        for pipeline in self.pipelines {
309            service = service.merge(pipeline);
310        }
311
312        Ok(service)
313    }
314
315    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
316    /// checkpoint after its watermark, or if that doesn't exist, then the provided
317    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
318    ///
319    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
320    /// calculated above.
321    ///
322    /// Returns `Ok(None)` if the pipeline is disabled.
323    async fn add_pipeline<P: Processor + 'static>(
324        &mut self,
325        pipeline_task: String,
326    ) -> Result<Option<u64>> {
327        ensure!(
328            self.added_pipelines.insert(P::NAME),
329            "Pipeline {:?} already added",
330            P::NAME,
331        );
332
333        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
334            && !enabled_pipelines.remove(P::NAME)
335        {
336            info!(pipeline = P::NAME, "Skipping");
337            return Ok(None);
338        }
339
340        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
341        // Otherwise, use the existing record and disregard the proposed value.
342        let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
343        let mut conn = self.store.connect().await?;
344        let init_watermark = conn
345            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
346            .await
347            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
348
349        let next_checkpoint = if let Some(init_watermark) = init_watermark {
350            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
351                checkpoint_hi_inclusive + 1
352            } else {
353                0
354            }
355        } else {
356            proposed_next_checkpoint
357        };
358
359        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
360
361        Ok(Some(next_checkpoint))
362    }
363}
364
365impl<S: ConcurrentStore> Indexer<S> {
366    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
367    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
368    ///
369    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
370    /// keep the watermark table up-to-date with the highest point they can guarantee all data
371    /// exists for, for their pipeline.
372    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
373        &mut self,
374        handler: H,
375        config: ConcurrentConfig,
376    ) -> Result<()> {
377        let pipeline_task =
378            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
379        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task).await? else {
380            return Ok(());
381        };
382
383        self.pipelines.push(concurrent::pipeline::<H>(
384            handler,
385            next_checkpoint,
386            config,
387            self.store.clone(),
388            self.task.clone(),
389            self.ingestion_service.subscribe().0,
390            self.metrics.clone(),
391        ));
392
393        Ok(())
394    }
395}
396
397impl<T: SequentialStore> Indexer<T> {
398    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
399    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
400    ///
401    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
402    /// required to handle pipelines that modify data in-place (where each update is not an insert,
403    /// but could be a modification of an existing row, where ordering between updates is
404    /// important).
405    ///
406    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
407    /// number of checkpoints (configured by `checkpoint_lag`).
408    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
409        &mut self,
410        handler: H,
411        config: SequentialConfig,
412    ) -> Result<()> {
413        if self.task.is_some() {
414            bail!(
415                "Sequential pipelines do not support pipeline tasks. \
416                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417                Running the same pipeline under a different task would violate these guarantees."
418            );
419        }
420
421        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned()).await? else {
422            return Ok(());
423        };
424
425        // Track the minimum next_checkpoint across all sequential pipelines
426        self.next_sequential_checkpoint = Some(
427            self.next_sequential_checkpoint
428                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
429        );
430
431        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
432
433        self.pipelines.push(sequential::pipeline::<H>(
434            handler,
435            next_checkpoint,
436            config,
437            self.store.clone(),
438            checkpoint_rx,
439            commit_hi_tx,
440            self.metrics.clone(),
441        ));
442
443        Ok(())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use std::sync::Arc;
450
451    use async_trait::async_trait;
452    use clap::Parser;
453    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
454    use sui_synthetic_ingestion::synthetic_ingestion;
455    use tokio::sync::watch;
456
457    use crate::FieldCount;
458    use crate::config::ConcurrencyConfig;
459    use crate::ingestion::ingestion_client::IngestionClientArgs;
460    use crate::mocks::store::MockStore;
461    use crate::pipeline::CommitterConfig;
462    use crate::pipeline::Processor;
463    use crate::pipeline::concurrent::ConcurrentConfig;
464    use crate::store::CommitterWatermark;
465    use crate::store::ConcurrentConnection as _;
466    use crate::store::Connection as _;
467
468    use super::*;
469
470    #[allow(dead_code)]
471    #[derive(Clone, FieldCount)]
472    struct MockValue(u64);
473
474    /// A handler that can be controlled externally to block checkpoint processing.
475    struct ControllableHandler {
476        /// Process checkpoints less than or equal to this value.
477        process_below: watch::Receiver<u64>,
478    }
479
480    impl ControllableHandler {
481        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
482            let (tx, rx) = watch::channel(limit);
483            (Self { process_below: rx }, tx)
484        }
485    }
486
487    #[async_trait]
488    impl Processor for ControllableHandler {
489        const NAME: &'static str = "controllable";
490        type Value = MockValue;
491
492        async fn process(
493            &self,
494            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
495        ) -> anyhow::Result<Vec<Self::Value>> {
496            let cp_num = checkpoint.summary.sequence_number;
497
498            // Wait until the checkpoint is allowed to be processed
499            self.process_below
500                .clone()
501                .wait_for(|&limit| cp_num <= limit)
502                .await
503                .ok();
504
505            Ok(vec![MockValue(cp_num)])
506        }
507    }
508
509    #[async_trait]
510    impl concurrent::Handler for ControllableHandler {
511        type Store = MockStore;
512        type Batch = Vec<MockValue>;
513
514        fn batch(
515            &self,
516            batch: &mut Self::Batch,
517            values: &mut std::vec::IntoIter<Self::Value>,
518        ) -> concurrent::BatchStatus {
519            batch.extend(values);
520            concurrent::BatchStatus::Ready
521        }
522
523        async fn commit<'a>(
524            &self,
525            batch: &Self::Batch,
526            conn: &mut <Self::Store as Store>::Connection<'a>,
527        ) -> anyhow::Result<usize> {
528            for value in batch {
529                conn.0
530                    .commit_data(Self::NAME, value.0, vec![value.0])
531                    .await?;
532            }
533            Ok(batch.len())
534        }
535    }
536
537    macro_rules! test_pipeline {
538        ($handler:ident, $name:literal) => {
539            struct $handler;
540
541            #[async_trait]
542            impl Processor for $handler {
543                const NAME: &'static str = $name;
544                type Value = MockValue;
545                async fn process(
546                    &self,
547                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
548                ) -> anyhow::Result<Vec<Self::Value>> {
549                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
550                }
551            }
552
553            #[async_trait]
554            impl crate::pipeline::concurrent::Handler for $handler {
555                type Store = MockStore;
556                type Batch = Vec<Self::Value>;
557
558                fn batch(
559                    &self,
560                    batch: &mut Self::Batch,
561                    values: &mut std::vec::IntoIter<Self::Value>,
562                ) -> crate::pipeline::concurrent::BatchStatus {
563                    batch.extend(values);
564                    crate::pipeline::concurrent::BatchStatus::Pending
565                }
566
567                async fn commit<'a>(
568                    &self,
569                    batch: &Self::Batch,
570                    conn: &mut <Self::Store as Store>::Connection<'a>,
571                ) -> anyhow::Result<usize> {
572                    for value in batch {
573                        conn.0
574                            .commit_data(Self::NAME, value.0, vec![value.0])
575                            .await?;
576                    }
577                    Ok(batch.len())
578                }
579            }
580
581            #[async_trait]
582            impl crate::pipeline::sequential::Handler for $handler {
583                type Store = MockStore;
584                type Batch = Vec<Self::Value>;
585
586                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
587                    batch.extend(values);
588                }
589
590                async fn commit<'a>(
591                    &self,
592                    _batch: &Self::Batch,
593                    _conn: &mut <Self::Store as Store>::Connection<'a>,
594                ) -> anyhow::Result<usize> {
595                    Ok(1)
596                }
597            }
598        };
599    }
600
601    test_pipeline!(MockHandler, "test_processor");
602    test_pipeline!(SequentialHandler, "sequential_handler");
603    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
604
605    /// If `ingestion_data` is `Some((num_checkpoints, checkpoint_size))`, synthetic ingestion
606    /// data will be generated in the temp directory before creating the indexer.
607    async fn create_test_indexer(
608        store: MockStore,
609        indexer_args: IndexerArgs,
610        registry: &Registry,
611        ingestion_data: Option<(u64, u64)>,
612    ) -> (Indexer<MockStore>, tempfile::TempDir) {
613        let temp_dir = tempfile::tempdir().unwrap();
614        if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
615            synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
616                ingestion_dir: temp_dir.path().to_owned(),
617                starting_checkpoint: 0,
618                num_checkpoints,
619                checkpoint_size,
620            })
621            .await;
622        }
623        let client_args = ClientArgs {
624            ingestion: IngestionClientArgs {
625                local_ingestion_path: Some(temp_dir.path().to_owned()),
626                ..Default::default()
627            },
628            ..Default::default()
629        };
630        let indexer = Indexer::new(
631            store,
632            indexer_args,
633            client_args,
634            IngestionConfig::default(),
635            None,
636            registry,
637        )
638        .await
639        .unwrap();
640        (indexer, temp_dir)
641    }
642
643    async fn set_committer_watermark(
644        conn: &mut <MockStore as Store>::Connection<'_>,
645        name: &str,
646        hi: u64,
647    ) {
648        conn.set_committer_watermark(
649            name,
650            CommitterWatermark {
651                checkpoint_hi_inclusive: hi,
652                ..Default::default()
653            },
654        )
655        .await
656        .unwrap();
657    }
658
659    async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
660        indexer: &mut Indexer<MockStore>,
661        handler: H,
662    ) {
663        indexer
664            .concurrent_pipeline(handler, ConcurrentConfig::default())
665            .await
666            .unwrap();
667    }
668
669    async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
670        indexer: &mut Indexer<MockStore>,
671        handler: H,
672    ) {
673        indexer
674            .sequential_pipeline(handler, SequentialConfig::default())
675            .await
676            .unwrap();
677    }
678
679    macro_rules! assert_out_of_order {
680        ($metrics:expr, $pipeline:expr, $expected:expr) => {
681            assert_eq!(
682                $metrics
683                    .total_watermarks_out_of_order
684                    .get_metric_with_label_values(&[$pipeline])
685                    .unwrap()
686                    .get(),
687                $expected,
688            );
689        };
690    }
691
692    async fn test_init_watermark(
693        first_checkpoint: Option<u64>,
694        is_concurrent: bool,
695    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
696        let registry = Registry::new();
697        let store = MockStore::default();
698
699        test_pipeline!(A, "pipeline_name");
700
701        let mut conn = store.connect().await.unwrap();
702
703        let indexer_args = IndexerArgs {
704            first_checkpoint,
705            ..IndexerArgs::default()
706        };
707        let (mut indexer, _temp_dir) =
708            create_test_indexer(store.clone(), indexer_args, &registry, None).await;
709
710        if is_concurrent {
711            add_concurrent(&mut indexer, A).await;
712        } else {
713            add_sequential(&mut indexer, A).await;
714        }
715
716        (
717            conn.committer_watermark(A::NAME).await.unwrap(),
718            conn.pruner_watermark(A::NAME, Duration::ZERO)
719                .await
720                .unwrap(),
721        )
722    }
723
724    #[test]
725    fn test_arg_parsing() {
726        #[derive(Parser)]
727        struct Args {
728            #[clap(flatten)]
729            indexer: IndexerArgs,
730        }
731
732        let args = Args::try_parse_from([
733            "cmd",
734            "--first-checkpoint",
735            "10",
736            "--last-checkpoint",
737            "100",
738            "--pipeline",
739            "a",
740            "--pipeline",
741            "b",
742            "--task",
743            "t",
744            "--reader-interval-ms",
745            "5000",
746        ])
747        .unwrap();
748
749        assert_eq!(args.indexer.first_checkpoint, Some(10));
750        assert_eq!(args.indexer.last_checkpoint, Some(100));
751        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
752        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
753        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
754    }
755
756    /// next_checkpoint is smallest among existing watermarks + 1.
757    #[tokio::test]
758    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
759        let registry = Registry::new();
760        let store = MockStore::default();
761
762        test_pipeline!(A, "concurrent_a");
763        test_pipeline!(B, "concurrent_b");
764        test_pipeline!(C, "sequential_c");
765        test_pipeline!(D, "sequential_d");
766
767        let mut conn = store.connect().await.unwrap();
768
769        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
770        set_committer_watermark(&mut conn, A::NAME, 100).await;
771
772        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
773        set_committer_watermark(&mut conn, B::NAME, 10).await;
774
775        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
776        set_committer_watermark(&mut conn, C::NAME, 1).await;
777
778        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
779        set_committer_watermark(&mut conn, D::NAME, 50).await;
780
781        let (mut indexer, _temp_dir) =
782            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
783
784        add_concurrent(&mut indexer, A).await;
785        add_concurrent(&mut indexer, B).await;
786        add_sequential(&mut indexer, C).await;
787        add_sequential(&mut indexer, D).await;
788
789        assert_eq!(indexer.first_checkpoint, None);
790        assert_eq!(indexer.last_checkpoint, None);
791        assert_eq!(indexer.next_checkpoint, 2);
792        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
793    }
794
795    /// next_checkpoint is 0 when at least one pipeline has no watermark.
796    #[tokio::test]
797    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
798        let registry = Registry::new();
799        let store = MockStore::default();
800
801        test_pipeline!(A, "concurrent_a");
802        test_pipeline!(B, "concurrent_b");
803        test_pipeline!(C, "sequential_c");
804        test_pipeline!(D, "sequential_d");
805
806        let mut conn = store.connect().await.unwrap();
807        set_committer_watermark(&mut conn, B::NAME, 10).await;
808        set_committer_watermark(&mut conn, C::NAME, 1).await;
809
810        let (mut indexer, _temp_dir) =
811            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
812
813        add_concurrent(&mut indexer, A).await;
814        add_concurrent(&mut indexer, B).await;
815        add_sequential(&mut indexer, C).await;
816        add_sequential(&mut indexer, D).await;
817
818        assert_eq!(indexer.first_checkpoint, None);
819        assert_eq!(indexer.last_checkpoint, None);
820        assert_eq!(indexer.next_checkpoint, 0);
821        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
822    }
823
824    /// next_checkpoint is 1 when smallest committer watermark is 0.
825    #[tokio::test]
826    async fn test_next_checkpoint_smallest_is_0() {
827        let registry = Registry::new();
828        let store = MockStore::default();
829
830        test_pipeline!(A, "concurrent_a");
831        test_pipeline!(B, "concurrent_b");
832        test_pipeline!(C, "sequential_c");
833        test_pipeline!(D, "sequential_d");
834
835        let mut conn = store.connect().await.unwrap();
836        set_committer_watermark(&mut conn, A::NAME, 100).await;
837        set_committer_watermark(&mut conn, B::NAME, 10).await;
838        set_committer_watermark(&mut conn, C::NAME, 1).await;
839        set_committer_watermark(&mut conn, D::NAME, 0).await;
840
841        let (mut indexer, _temp_dir) =
842            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
843
844        add_concurrent(&mut indexer, A).await;
845        add_concurrent(&mut indexer, B).await;
846        add_sequential(&mut indexer, C).await;
847        add_sequential(&mut indexer, D).await;
848
849        assert_eq!(indexer.next_checkpoint, 1);
850    }
851
852    /// next_checkpoint is first_checkpoint when at least one pipeline has no
853    /// watermark, and first_checkpoint is smallest.
854    #[tokio::test]
855    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
856        let registry = Registry::new();
857        let store = MockStore::default();
858
859        test_pipeline!(A, "concurrent_a");
860        test_pipeline!(B, "concurrent_b");
861        test_pipeline!(C, "sequential_c");
862        test_pipeline!(D, "sequential_d");
863
864        let mut conn = store.connect().await.unwrap();
865        set_committer_watermark(&mut conn, B::NAME, 50).await;
866        set_committer_watermark(&mut conn, C::NAME, 10).await;
867
868        let indexer_args = IndexerArgs {
869            first_checkpoint: Some(5),
870            ..Default::default()
871        };
872        let (mut indexer, _temp_dir) =
873            create_test_indexer(store, indexer_args, &registry, None).await;
874
875        add_concurrent(&mut indexer, A).await;
876        add_concurrent(&mut indexer, B).await;
877        add_sequential(&mut indexer, C).await;
878        add_sequential(&mut indexer, D).await;
879
880        assert_eq!(indexer.first_checkpoint, Some(5));
881        assert_eq!(indexer.last_checkpoint, None);
882        assert_eq!(indexer.next_checkpoint, 5);
883        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
884    }
885
886    /// next_checkpoint is smallest among existing watermarks + 1 if
887    /// all pipelines have watermarks (ignores first_checkpoint).
888    #[tokio::test]
889    async fn test_next_checkpoint_ignore_first_checkpoint() {
890        let registry = Registry::new();
891        let store = MockStore::default();
892
893        test_pipeline!(B, "concurrent_b");
894        test_pipeline!(C, "sequential_c");
895
896        let mut conn = store.connect().await.unwrap();
897        set_committer_watermark(&mut conn, B::NAME, 50).await;
898        set_committer_watermark(&mut conn, C::NAME, 10).await;
899
900        let indexer_args = IndexerArgs {
901            first_checkpoint: Some(5),
902            ..Default::default()
903        };
904        let (mut indexer, _temp_dir) =
905            create_test_indexer(store, indexer_args, &registry, None).await;
906
907        add_concurrent(&mut indexer, B).await;
908        add_sequential(&mut indexer, C).await;
909
910        assert_eq!(indexer.first_checkpoint, Some(5));
911        assert_eq!(indexer.last_checkpoint, None);
912        assert_eq!(indexer.next_checkpoint, 11);
913        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
914    }
915
916    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
917    /// will not be used as the starting point if it is not the smallest valid committer watermark
918    /// to resume ingesting from.
919    #[tokio::test]
920    async fn test_next_checkpoint_large_first_checkpoint() {
921        let registry = Registry::new();
922        let store = MockStore::default();
923
924        test_pipeline!(A, "concurrent_a");
925        test_pipeline!(B, "concurrent_b");
926        test_pipeline!(C, "sequential_c");
927
928        let mut conn = store.connect().await.unwrap();
929        set_committer_watermark(&mut conn, B::NAME, 50).await;
930        set_committer_watermark(&mut conn, C::NAME, 10).await;
931
932        let indexer_args = IndexerArgs {
933            first_checkpoint: Some(24),
934            ..Default::default()
935        };
936        let (mut indexer, _temp_dir) =
937            create_test_indexer(store, indexer_args, &registry, None).await;
938
939        add_concurrent(&mut indexer, A).await;
940        add_concurrent(&mut indexer, B).await;
941        add_sequential(&mut indexer, C).await;
942
943        assert_eq!(indexer.first_checkpoint, Some(24));
944        assert_eq!(indexer.last_checkpoint, None);
945        assert_eq!(indexer.next_checkpoint, 11);
946        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
947    }
948
949    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
950    #[tokio::test]
951    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
952        let registry = Registry::new();
953        let store = MockStore::default();
954
955        test_pipeline!(A, "concurrent_a");
956        test_pipeline!(B, "concurrent_b");
957        test_pipeline!(C, "sequential_c");
958        test_pipeline!(D, "sequential_d");
959
960        let mut conn = store.connect().await.unwrap();
961        set_committer_watermark(&mut conn, A::NAME, 5).await;
962        set_committer_watermark(&mut conn, B::NAME, 10).await;
963        set_committer_watermark(&mut conn, C::NAME, 15).await;
964        set_committer_watermark(&mut conn, D::NAME, 20).await;
965
966        let indexer_args = IndexerArgs {
967            last_checkpoint: Some(29),
968            ..Default::default()
969        };
970        let (mut indexer, _temp_dir) =
971            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
972
973        add_concurrent(&mut indexer, A).await;
974        add_concurrent(&mut indexer, B).await;
975        add_sequential(&mut indexer, C).await;
976        add_sequential(&mut indexer, D).await;
977
978        let ingestion_metrics = indexer.ingestion_metrics().clone();
979        let indexer_metrics = indexer.indexer_metrics().clone();
980
981        indexer.run().await.unwrap().join().await.unwrap();
982
983        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
984        assert_out_of_order!(indexer_metrics, A::NAME, 0);
985        assert_out_of_order!(indexer_metrics, B::NAME, 5);
986        assert_out_of_order!(indexer_metrics, C::NAME, 10);
987        assert_out_of_order!(indexer_metrics, D::NAME, 15);
988    }
989
990    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
991    #[tokio::test]
992    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
993        let registry = Registry::new();
994        let store = MockStore::default();
995
996        test_pipeline!(A, "concurrent_a");
997        test_pipeline!(B, "concurrent_b");
998        test_pipeline!(C, "sequential_c");
999        test_pipeline!(D, "sequential_d");
1000
1001        let mut conn = store.connect().await.unwrap();
1002        set_committer_watermark(&mut conn, B::NAME, 10).await;
1003        set_committer_watermark(&mut conn, C::NAME, 15).await;
1004        set_committer_watermark(&mut conn, D::NAME, 20).await;
1005
1006        let indexer_args = IndexerArgs {
1007            last_checkpoint: Some(29),
1008            ..Default::default()
1009        };
1010        let (mut indexer, _temp_dir) =
1011            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1012
1013        add_concurrent(&mut indexer, A).await;
1014        add_concurrent(&mut indexer, B).await;
1015        add_sequential(&mut indexer, C).await;
1016        add_sequential(&mut indexer, D).await;
1017
1018        let ingestion_metrics = indexer.ingestion_metrics().clone();
1019        let metrics = indexer.indexer_metrics().clone();
1020        indexer.run().await.unwrap().join().await.unwrap();
1021
1022        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1023        assert_out_of_order!(metrics, A::NAME, 0);
1024        assert_out_of_order!(metrics, B::NAME, 11);
1025        assert_out_of_order!(metrics, C::NAME, 16);
1026        assert_out_of_order!(metrics, D::NAME, 21);
1027    }
1028
1029    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1030    #[tokio::test]
1031    async fn test_indexer_ingestion_use_first_checkpoint() {
1032        let registry = Registry::new();
1033        let store = MockStore::default();
1034
1035        test_pipeline!(A, "concurrent_a");
1036        test_pipeline!(B, "concurrent_b");
1037        test_pipeline!(C, "sequential_c");
1038        test_pipeline!(D, "sequential_d");
1039
1040        let mut conn = store.connect().await.unwrap();
1041        set_committer_watermark(&mut conn, B::NAME, 10).await;
1042        set_committer_watermark(&mut conn, C::NAME, 15).await;
1043        set_committer_watermark(&mut conn, D::NAME, 20).await;
1044
1045        let indexer_args = IndexerArgs {
1046            first_checkpoint: Some(10),
1047            last_checkpoint: Some(29),
1048            ..Default::default()
1049        };
1050        let (mut indexer, _temp_dir) =
1051            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1052
1053        add_concurrent(&mut indexer, A).await;
1054        add_concurrent(&mut indexer, B).await;
1055        add_sequential(&mut indexer, C).await;
1056        add_sequential(&mut indexer, D).await;
1057
1058        let ingestion_metrics = indexer.ingestion_metrics().clone();
1059        let metrics = indexer.indexer_metrics().clone();
1060        indexer.run().await.unwrap().join().await.unwrap();
1061
1062        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1063        assert_out_of_order!(metrics, A::NAME, 0);
1064        assert_out_of_order!(metrics, B::NAME, 1);
1065        assert_out_of_order!(metrics, C::NAME, 6);
1066        assert_out_of_order!(metrics, D::NAME, 11);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1071        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1072        assert_eq!(committer_watermark, None);
1073        assert_eq!(pruner_watermark, None);
1074    }
1075
1076    #[tokio::test]
1077    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1078        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1079        assert_eq!(committer_watermark, None);
1080        assert_eq!(pruner_watermark, None);
1081    }
1082
1083    #[tokio::test]
1084    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1085        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1086
1087        let committer_watermark = committer_watermark.unwrap();
1088        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1089
1090        let pruner_watermark = pruner_watermark.unwrap();
1091        assert_eq!(pruner_watermark.reader_lo, 1);
1092        assert_eq!(pruner_watermark.pruner_hi, 1);
1093    }
1094
1095    #[tokio::test]
1096    async fn test_init_watermark_sequential() {
1097        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1098
1099        let committer_watermark = committer_watermark.unwrap();
1100        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1101
1102        let pruner_watermark = pruner_watermark.unwrap();
1103        assert_eq!(pruner_watermark.reader_lo, 1);
1104        assert_eq!(pruner_watermark.pruner_hi, 1);
1105    }
1106
1107    #[tokio::test]
1108    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1109        let registry = Registry::new();
1110        let store = MockStore::default();
1111
1112        let mut conn = store.connect().await.unwrap();
1113        set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1114        set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1115
1116        let indexer_args = IndexerArgs {
1117            first_checkpoint: None,
1118            last_checkpoint: Some(19),
1119            pipeline: vec![],
1120            ..Default::default()
1121        };
1122        let (mut indexer, _temp_dir) =
1123            create_test_indexer(store.clone(), indexer_args, &registry, Some((20, 2))).await;
1124
1125        // Add first sequential pipeline
1126        add_sequential(&mut indexer, MockHandler).await;
1127
1128        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1129        assert_eq!(
1130            indexer.next_sequential_checkpoint(),
1131            Some(11),
1132            "next_sequential_checkpoint should be 11"
1133        );
1134
1135        // Add second sequential pipeline
1136        add_sequential(&mut indexer, SequentialHandler).await;
1137
1138        // Should change to 6 (minimum of 6 and 11)
1139        assert_eq!(
1140            indexer.next_sequential_checkpoint(),
1141            Some(6),
1142            "next_sequential_checkpoint should still be 6"
1143        );
1144
1145        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1146        indexer.run().await.unwrap().join().await.unwrap();
1147
1148        // Verify each pipeline made some progress independently
1149        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1150        let watermark2 = conn
1151            .committer_watermark(SequentialHandler::NAME)
1152            .await
1153            .unwrap();
1154
1155        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1156        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1157    }
1158
1159    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1160    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1161    /// committing checkpoints less than the main pipeline's reader watermark.
1162    #[tokio::test]
1163    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1164        let registry = Registry::new();
1165        let store = MockStore::default();
1166
1167        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1168        // reader watermark at `7`.
1169        let mut conn = store.connect().await.unwrap();
1170        set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1171        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1172            .await
1173            .unwrap();
1174
1175        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1176        // be ignored by the tasked indexer.
1177        let indexer_args = IndexerArgs {
1178            first_checkpoint: Some(0),
1179            last_checkpoint: Some(15),
1180            task: TaskArgs::tasked("task".to_string(), 10),
1181            ..Default::default()
1182        };
1183        let (mut tasked_indexer, _temp_dir) =
1184            create_test_indexer(store.clone(), indexer_args, &registry, Some((16, 2))).await;
1185
1186        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1187
1188        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1189        let metrics = tasked_indexer.indexer_metrics().clone();
1190
1191        tasked_indexer.run().await.unwrap().join().await.unwrap();
1192
1193        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1194        assert_eq!(
1195            metrics
1196                .total_collector_skipped_checkpoints
1197                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1198                .unwrap()
1199                .get(),
1200            7
1201        );
1202        let data = store
1203            .data
1204            .get(MockCheckpointSequenceNumberHandler::NAME)
1205            .unwrap();
1206        assert_eq!(data.len(), 9);
1207        for i in 0..7 {
1208            assert!(data.get(&i).is_none());
1209        }
1210        for i in 7..16 {
1211            assert!(data.get(&i).is_some());
1212        }
1213    }
1214
1215    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1216    #[tokio::test]
1217    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1218        let registry = Registry::new();
1219        let store = MockStore::default();
1220
1221        let mut conn = store.connect().await.unwrap();
1222        set_committer_watermark(&mut conn, "test", 10).await;
1223        conn.set_reader_watermark("test", 5).await.unwrap();
1224
1225        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1226        // watermarks.
1227        let indexer_args = IndexerArgs {
1228            first_checkpoint: Some(9),
1229            last_checkpoint: Some(25),
1230            task: TaskArgs::tasked("task".to_string(), 10),
1231            ..Default::default()
1232        };
1233        let (mut tasked_indexer, _temp_dir) =
1234            create_test_indexer(store.clone(), indexer_args, &registry, Some((26, 2))).await;
1235
1236        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1237
1238        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1239        let metrics = tasked_indexer.indexer_metrics().clone();
1240
1241        tasked_indexer.run().await.unwrap().join().await.unwrap();
1242
1243        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1244        assert_out_of_order!(metrics, "test", 0);
1245        assert_eq!(
1246            metrics
1247                .total_collector_skipped_checkpoints
1248                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1249                .unwrap()
1250                .get(),
1251            0
1252        );
1253
1254        let data = store.data.get("test").unwrap();
1255        assert_eq!(data.len(), 17);
1256        for i in 0..9 {
1257            assert!(data.get(&i).is_none());
1258        }
1259        for i in 9..26 {
1260            assert!(data.get(&i).is_some());
1261        }
1262        let main_pipeline_watermark = store.watermark("test").unwrap();
1263        // assert that the main pipeline's watermarks are not updated
1264        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1265        assert_eq!(main_pipeline_watermark.reader_lo, 5);
1266        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1267        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1268        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1269    }
1270
1271    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
1272    /// committed, and any checkpoints inflight < X will be skipped.
1273    #[tokio::test]
1274    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1275        let registry = Registry::new();
1276        let store = MockStore::default();
1277        let mut conn = store.connect().await.unwrap();
1278        // Set the main pipeline watermark.
1279        set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1280
1281        // Generate 500 checkpoints upfront, for the indexer to process all at once.
1282        let indexer_args = IndexerArgs {
1283            first_checkpoint: Some(0),
1284            last_checkpoint: Some(500),
1285            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
1286            ..Default::default()
1287        };
1288        let (mut tasked_indexer, _temp_dir) =
1289            create_test_indexer(store.clone(), indexer_args, &registry, Some((501, 2))).await;
1290        let mut allow_process = 10;
1291        // Limit the pipeline to process only checkpoints `[0, 10]`.
1292        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1293        let _ = tasked_indexer
1294            .concurrent_pipeline(
1295                controllable_handler,
1296                ConcurrentConfig {
1297                    committer: CommitterConfig {
1298                        collect_interval_ms: 10,
1299                        watermark_interval_ms: 10,
1300                        ..Default::default()
1301                    },
1302                    // High fixed concurrency so all checkpoints can be processed
1303                    // concurrently despite out-of-order arrival.
1304                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1305                    ..Default::default()
1306                },
1307            )
1308            .await;
1309        let metrics = tasked_indexer.indexer_metrics().clone();
1310
1311        let mut s_indexer = tasked_indexer.run().await.unwrap();
1312
1313        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
1314        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
1315        // committed.
1316        store
1317            .wait_for_watermark(
1318                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1319                10,
1320                Duration::from_secs(10),
1321            )
1322            .await;
1323
1324        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
1325        // track_main_reader_lo task will eventually pick this up and update the atomic. The
1326        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
1327        // one at a time until the collector_reader_lo metric shows the new value.
1328        conn.set_reader_watermark(ControllableHandler::NAME, 250)
1329            .await
1330            .unwrap();
1331
1332        let reader_lo = metrics
1333            .collector_reader_lo
1334            .with_label_values(&[ControllableHandler::NAME]);
1335
1336        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
1337        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
1338        // checkpoints have been processed.
1339        let mut interval = tokio::time::interval(Duration::from_millis(10));
1340        while reader_lo.get() != 250 {
1341            interval.tick().await;
1342            // allow_process is initialized to 11, bump to 11 for the next checkpoint
1343            allow_process += 1;
1344            assert!(
1345                allow_process <= 500,
1346                "Released all checkpoints but collector never observed new reader_lo"
1347            );
1348            process_below.send(allow_process).ok();
1349        }
1350
1351        // At this point, the collector has observed reader_lo = 250. Release all remaining
1352        // checkpoints. Guarantees:
1353        // - [0, 10]: committed (before reader_lo was set)
1354        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
1355        // - (allow_process, 250): skipped (in-flight, filtered by collector)
1356        // - [250, 500]: committed (>= reader_lo)
1357        process_below.send(500).ok();
1358
1359        s_indexer.join().await.unwrap();
1360
1361        let data = store.data.get(ControllableHandler::NAME).unwrap();
1362
1363        // Checkpoints (allow_process, 250) must be skipped.
1364        for chkpt in (allow_process + 1)..250 {
1365            assert!(
1366                data.get(&chkpt).is_none(),
1367                "Checkpoint {chkpt} should have been skipped"
1368            );
1369        }
1370
1371        // Checkpoints >= reader_lo must be committed.
1372        for chkpt in 250..=500 {
1373            assert!(
1374                data.get(&chkpt).is_some(),
1375                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1376            );
1377        }
1378
1379        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
1380        for chkpt in 0..=10 {
1381            assert!(
1382                data.get(&chkpt).is_some(),
1383                "Checkpoint {chkpt} should have been committed (baseline)"
1384            );
1385        }
1386    }
1387}