sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51/// Command-line arguments for the indexer
52#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54    /// Override the next checkpoint for all pipelines without a committer watermark to start
55    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
56    /// setting and always resume from their committer watermark + 1.
57    ///
58    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
59    /// is the minimum across all pipelines' next checkpoints.
60    #[arg(long)]
61    pub first_checkpoint: Option<u64>,
62
63    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
64    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
65    #[arg(long)]
66    pub last_checkpoint: Option<u64>,
67
68    /// Only run the following pipelines. If not provided, all pipelines found in the
69    /// configuration file will be run.
70    #[arg(long, action = clap::ArgAction::Append)]
71    pub pipeline: Vec<String>,
72
73    /// Additional configurations for running a tasked indexer.
74    #[clap(flatten)]
75    pub task: TaskArgs,
76}
77
78/// Command-line arguments for configuring a tasked indexer.
79#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
82    /// delimiter defined on the store. This allows the same pipelines to run under multiple
83    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
84    /// entries in the database.
85    ///
86    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
87    ///
88    /// Sequential pipelines cannot be attached to a tasked indexer.
89    ///
90    /// The framework ensures that tasked pipelines never commit checkpoints below the main
91    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
92    #[arg(long, requires = "reader_interval_ms")]
93    task: Option<String>,
94
95    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
96    /// refetch its main pipeline's reader watermark.
97    ///
98    /// This is required when `--task` is set and should should ideally be set to a value that is
99    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
100    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
101    /// it.
102    ///
103    /// If the main pipeline does not have pruning enabled, this value can be set to some high
104    /// value, as the tasked pipeline will never see an updated reader watermark.
105    #[arg(long, requires = "task")]
106    reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110    /// The storage backend that the indexer uses to write and query indexed data. This
111    /// generic implementation allows for plugging in different storage solutions that implement the
112    /// `Store` trait.
113    store: S,
114
115    /// Prometheus Metrics.
116    metrics: Arc<IndexerMetrics>,
117
118    /// Service for downloading and disseminating checkpoint data.
119    ingestion_service: IngestionService,
120
121    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
122    /// watermark will start processing at this checkpoint.
123    first_checkpoint: Option<u64>,
124
125    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
126    /// this checkpoint.
127    last_checkpoint: Option<u64>,
128
129    /// The network's latest checkpoint, when the indexer was started.
130    latest_checkpoint: u64,
131
132    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
133    /// to start ingesting from.
134    next_checkpoint: u64,
135
136    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
137    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
138    next_sequential_checkpoint: Option<u64>,
139
140    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
141    /// delimiter defined on the store. This allows the same pipelines to run under multiple
142    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
143    /// entries in the database.
144    ///
145    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
146    ///
147    /// Sequential pipelines cannot be attached to a tasked indexer.
148    ///
149    /// The framework ensures that tasked pipelines never commit checkpoints below the main
150    /// pipeline’s pruner watermark.
151    task: Option<Task>,
152
153    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
154    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
155    /// a warning when the indexer is run.
156    enabled_pipelines: Option<BTreeSet<String>>,
157
158    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
159    /// with the same name isn't added twice.
160    added_pipelines: BTreeSet<&'static str>,
161
162    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
163    pipelines: Vec<Service>,
164}
165
166/// Configuration for a tasked indexer.
167#[derive(Clone)]
168pub(crate) struct Task {
169    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
170    /// record pipeline watermarks.
171    task: String,
172    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
173    /// pipeline's reader watermark.
174    reader_interval: Duration,
175}
176
177impl TaskArgs {
178    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179        Self {
180            task: Some(task),
181            reader_interval_ms: Some(reader_interval_ms),
182        }
183    }
184
185    fn into_task(self) -> Option<Task> {
186        Some(Task {
187            task: self.task?,
188            reader_interval: Duration::from_millis(self.reader_interval_ms?),
189        })
190    }
191}
192
193impl<S: Store> Indexer<S> {
194    /// Create a new instance of the indexer framework from a store that implements the `Store`
195    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
196    /// arguments configure the following:
197    ///
198    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
199    ///   watermarks) and where to serve metrics from,
200    /// - Where to download checkpoints from,
201    /// - Concurrency and buffering parameters for downloading checkpoints.
202    ///
203    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
204    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
205    pub async fn new(
206        store: S,
207        indexer_args: IndexerArgs,
208        client_args: ClientArgs,
209        ingestion_config: IngestionConfig,
210        metrics_prefix: Option<&str>,
211        registry: &Registry,
212    ) -> anyhow::Result<Self> {
213        let IndexerArgs {
214            first_checkpoint,
215            last_checkpoint,
216            pipeline,
217            task,
218        } = indexer_args;
219
220        let metrics = IndexerMetrics::new(metrics_prefix, registry);
221
222        let ingestion_service =
223            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
224
225        let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
226
227        info!(latest_checkpoint);
228
229        Ok(Self {
230            store,
231            metrics,
232            ingestion_service,
233            first_checkpoint,
234            last_checkpoint,
235            latest_checkpoint,
236            next_checkpoint: u64::MAX,
237            next_sequential_checkpoint: None,
238            task: task.into_task(),
239            enabled_pipelines: if pipeline.is_empty() {
240                None
241            } else {
242                Some(pipeline.into_iter().collect())
243            },
244            added_pipelines: BTreeSet::new(),
245            pipelines: vec![],
246        })
247    }
248
249    /// The store used by the indexer.
250    pub fn store(&self) -> &S {
251        &self.store
252    }
253
254    /// The ingestion client used by the indexer to fetch checkpoints.
255    pub fn ingestion_client(&self) -> &IngestionClient {
256        self.ingestion_service.ingestion_client()
257    }
258
259    /// The indexer's metrics.
260    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
261        &self.metrics
262    }
263
264    /// The ingestion service's metrics.
265    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
266        self.ingestion_service.metrics()
267    }
268
269    /// The pipelines that this indexer will run.
270    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
271        self.added_pipelines.iter().copied().filter(|p| {
272            self.enabled_pipelines
273                .as_ref()
274                .is_none_or(|e| e.contains(*p))
275        })
276    }
277
278    /// The minimum next checkpoint across all sequential pipelines. This value is used to
279    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
280    /// too far ahead of sequential pipelines.
281    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
282        self.next_sequential_checkpoint
283    }
284
285    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
286    /// will start processing and committing once the ingestion service has caught up to their
287    /// respective watermarks.
288    ///
289    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
290    pub async fn run(self) -> anyhow::Result<Service> {
291        if let Some(enabled_pipelines) = self.enabled_pipelines {
292            ensure!(
293                enabled_pipelines.is_empty(),
294                "Tried to enable pipelines that this indexer does not know about: \
295                {enabled_pipelines:#?}",
296            );
297        }
298
299        let start = self.next_checkpoint;
300        let end = self.last_checkpoint;
301        info!(start, end, "Ingestion range");
302
303        let mut service = self
304            .ingestion_service
305            .run(
306                (
307                    Bound::Included(start),
308                    end.map_or(Bound::Unbounded, Bound::Included),
309                ),
310                self.next_sequential_checkpoint,
311            )
312            .await
313            .context("Failed to start ingestion service")?;
314
315        for pipeline in self.pipelines {
316            service = service.merge(pipeline);
317        }
318
319        Ok(service)
320    }
321
322    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
323    /// checkpoint after its watermark, or if that doesn't exist, then the provided
324    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
325    ///
326    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
327    /// calculated above.
328    ///
329    /// Returns `Ok(None)` if the pipeline is disabled.
330    async fn add_pipeline<P: Processor + 'static>(
331        &mut self,
332        pipeline_task: String,
333        retention: Option<u64>,
334    ) -> anyhow::Result<Option<u64>> {
335        ensure!(
336            self.added_pipelines.insert(P::NAME),
337            "Pipeline {:?} already added",
338            P::NAME,
339        );
340
341        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
342            && !enabled_pipelines.remove(P::NAME)
343        {
344            info!(pipeline = P::NAME, "Skipping");
345            return Ok(None);
346        }
347
348        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
349        // Otherwise, use the existing record and disregard the proposed value.
350        let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
351            first_checkpoint
352        } else if let Some(retention) = retention {
353            self.latest_checkpoint.saturating_sub(retention)
354        } else {
355            0
356        };
357        let mut conn = self.store.connect().await?;
358        let init_watermark = conn
359            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
360            .await
361            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
362
363        let next_checkpoint = if let Some(init_watermark) = init_watermark {
364            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
365                checkpoint_hi_inclusive + 1
366            } else {
367                0
368            }
369        } else {
370            proposed_next_checkpoint
371        };
372
373        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
374
375        Ok(Some(next_checkpoint))
376    }
377}
378
379impl<S: ConcurrentStore> Indexer<S> {
380    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
381    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
382    ///
383    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
384    /// keep the watermark table up-to-date with the highest point they can guarantee all data
385    /// exists for, for their pipeline.
386    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
387        &mut self,
388        handler: H,
389        config: ConcurrentConfig,
390    ) -> anyhow::Result<()> {
391        let pipeline_task =
392            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
393        let retention = config.pruner.as_ref().map(|p| p.retention);
394        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
395            return Ok(());
396        };
397
398        self.pipelines.push(concurrent::pipeline::<H>(
399            handler,
400            next_checkpoint,
401            config,
402            self.store.clone(),
403            self.task.clone(),
404            self.ingestion_service.subscribe().0,
405            self.metrics.clone(),
406        ));
407
408        Ok(())
409    }
410}
411
412impl<T: SequentialStore> Indexer<T> {
413    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
414    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
415    ///
416    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
417    /// required to handle pipelines that modify data in-place (where each update is not an insert,
418    /// but could be a modification of an existing row, where ordering between updates is
419    /// important).
420    ///
421    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
422    /// number of checkpoints (configured by `checkpoint_lag`).
423    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
424        &mut self,
425        handler: H,
426        config: SequentialConfig,
427    ) -> anyhow::Result<()> {
428        if self.task.is_some() {
429            bail!(
430                "Sequential pipelines do not support pipeline tasks. \
431                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
432                Running the same pipeline under a different task would violate these guarantees."
433            );
434        }
435
436        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
437            return Ok(());
438        };
439
440        // Track the minimum next_checkpoint across all sequential pipelines
441        self.next_sequential_checkpoint = Some(
442            self.next_sequential_checkpoint
443                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
444        );
445
446        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
447
448        self.pipelines.push(sequential::pipeline::<H>(
449            handler,
450            next_checkpoint,
451            config,
452            self.store.clone(),
453            checkpoint_rx,
454            commit_hi_tx,
455            self.metrics.clone(),
456        ));
457
458        Ok(())
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use std::sync::Arc;
465
466    use async_trait::async_trait;
467    use clap::Parser;
468    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
469    use sui_synthetic_ingestion::synthetic_ingestion;
470    use tokio::sync::watch;
471
472    use crate::FieldCount;
473    use crate::config::ConcurrencyConfig;
474    use crate::ingestion::ingestion_client::IngestionClientArgs;
475    use crate::ingestion::store_client::ObjectStoreWatermark;
476    use crate::ingestion::store_client::WATERMARK_PATH;
477    use crate::mocks::store::MockStore;
478    use crate::pipeline::CommitterConfig;
479    use crate::pipeline::Processor;
480    use crate::pipeline::concurrent::ConcurrentConfig;
481    use crate::store::CommitterWatermark;
482    use crate::store::ConcurrentConnection as _;
483    use crate::store::Connection as _;
484
485    use super::*;
486
487    #[allow(dead_code)]
488    #[derive(Clone, FieldCount)]
489    struct MockValue(u64);
490
491    /// A handler that can be controlled externally to block checkpoint processing.
492    struct ControllableHandler {
493        /// Process checkpoints less than or equal to this value.
494        process_below: watch::Receiver<u64>,
495    }
496
497    impl ControllableHandler {
498        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
499            let (tx, rx) = watch::channel(limit);
500            (Self { process_below: rx }, tx)
501        }
502    }
503
504    #[async_trait]
505    impl Processor for ControllableHandler {
506        const NAME: &'static str = "controllable";
507        type Value = MockValue;
508
509        async fn process(
510            &self,
511            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
512        ) -> anyhow::Result<Vec<Self::Value>> {
513            let cp_num = checkpoint.summary.sequence_number;
514
515            // Wait until the checkpoint is allowed to be processed
516            self.process_below
517                .clone()
518                .wait_for(|&limit| cp_num <= limit)
519                .await
520                .ok();
521
522            Ok(vec![MockValue(cp_num)])
523        }
524    }
525
526    #[async_trait]
527    impl concurrent::Handler for ControllableHandler {
528        type Store = MockStore;
529        type Batch = Vec<MockValue>;
530
531        fn batch(
532            &self,
533            batch: &mut Self::Batch,
534            values: &mut std::vec::IntoIter<Self::Value>,
535        ) -> concurrent::BatchStatus {
536            batch.extend(values);
537            concurrent::BatchStatus::Ready
538        }
539
540        async fn commit<'a>(
541            &self,
542            batch: &Self::Batch,
543            conn: &mut <Self::Store as Store>::Connection<'a>,
544        ) -> anyhow::Result<usize> {
545            for value in batch {
546                conn.0
547                    .commit_data(Self::NAME, value.0, vec![value.0])
548                    .await?;
549            }
550            Ok(batch.len())
551        }
552    }
553
554    macro_rules! test_pipeline {
555        ($handler:ident, $name:literal) => {
556            struct $handler;
557
558            #[async_trait]
559            impl Processor for $handler {
560                const NAME: &'static str = $name;
561                type Value = MockValue;
562                async fn process(
563                    &self,
564                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
565                ) -> anyhow::Result<Vec<Self::Value>> {
566                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
567                }
568            }
569
570            #[async_trait]
571            impl crate::pipeline::concurrent::Handler for $handler {
572                type Store = MockStore;
573                type Batch = Vec<Self::Value>;
574
575                fn batch(
576                    &self,
577                    batch: &mut Self::Batch,
578                    values: &mut std::vec::IntoIter<Self::Value>,
579                ) -> crate::pipeline::concurrent::BatchStatus {
580                    batch.extend(values);
581                    crate::pipeline::concurrent::BatchStatus::Pending
582                }
583
584                async fn commit<'a>(
585                    &self,
586                    batch: &Self::Batch,
587                    conn: &mut <Self::Store as Store>::Connection<'a>,
588                ) -> anyhow::Result<usize> {
589                    for value in batch {
590                        conn.0
591                            .commit_data(Self::NAME, value.0, vec![value.0])
592                            .await?;
593                    }
594                    Ok(batch.len())
595                }
596            }
597
598            #[async_trait]
599            impl crate::pipeline::sequential::Handler for $handler {
600                type Store = MockStore;
601                type Batch = Vec<Self::Value>;
602
603                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
604                    batch.extend(values);
605                }
606
607                async fn commit<'a>(
608                    &self,
609                    _batch: &Self::Batch,
610                    _conn: &mut <Self::Store as Store>::Connection<'a>,
611                ) -> anyhow::Result<usize> {
612                    Ok(1)
613                }
614            }
615        };
616    }
617
618    test_pipeline!(MockHandler, "test_processor");
619    test_pipeline!(SequentialHandler, "sequential_handler");
620    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
621
622    fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
623        let dir = tempfile::tempdir().unwrap();
624        if let Some(cp) = latest_checkpoint {
625            let watermark_path = dir.path().join(WATERMARK_PATH);
626            std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
627            let watermark = ObjectStoreWatermark {
628                checkpoint_hi_inclusive: cp,
629            };
630            std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
631        }
632        dir
633    }
634
635    /// If `ingestion_data` is `Some((num_checkpoints, checkpoint_size))`, synthetic ingestion
636    /// data will be generated in the temp directory before creating the indexer.
637    async fn create_test_indexer(
638        store: MockStore,
639        indexer_args: IndexerArgs,
640        registry: &Registry,
641        ingestion_data: Option<(u64, u64)>,
642    ) -> (Indexer<MockStore>, tempfile::TempDir) {
643        let temp_dir = init_ingestion_dir(None);
644        if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
645            synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
646                ingestion_dir: temp_dir.path().to_owned(),
647                starting_checkpoint: 0,
648                num_checkpoints,
649                checkpoint_size,
650            })
651            .await;
652        }
653        let client_args = ClientArgs {
654            ingestion: IngestionClientArgs {
655                local_ingestion_path: Some(temp_dir.path().to_owned()),
656                ..Default::default()
657            },
658            ..Default::default()
659        };
660        let indexer = Indexer::new(
661            store,
662            indexer_args,
663            client_args,
664            IngestionConfig::default(),
665            None,
666            registry,
667        )
668        .await
669        .unwrap();
670        (indexer, temp_dir)
671    }
672
673    async fn set_committer_watermark(
674        conn: &mut <MockStore as Store>::Connection<'_>,
675        name: &str,
676        hi: u64,
677    ) {
678        conn.set_committer_watermark(
679            name,
680            CommitterWatermark {
681                checkpoint_hi_inclusive: hi,
682                ..Default::default()
683            },
684        )
685        .await
686        .unwrap();
687    }
688
689    async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
690        indexer: &mut Indexer<MockStore>,
691        handler: H,
692    ) {
693        indexer
694            .concurrent_pipeline(handler, ConcurrentConfig::default())
695            .await
696            .unwrap();
697    }
698
699    async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
700        indexer: &mut Indexer<MockStore>,
701        handler: H,
702    ) {
703        indexer
704            .sequential_pipeline(handler, SequentialConfig::default())
705            .await
706            .unwrap();
707    }
708
709    macro_rules! assert_out_of_order {
710        ($metrics:expr, $pipeline:expr, $expected:expr) => {
711            assert_eq!(
712                $metrics
713                    .total_watermarks_out_of_order
714                    .get_metric_with_label_values(&[$pipeline])
715                    .unwrap()
716                    .get(),
717                $expected,
718            );
719        };
720    }
721
722    async fn test_init_watermark(
723        first_checkpoint: Option<u64>,
724        is_concurrent: bool,
725    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
726        let registry = Registry::new();
727        let store = MockStore::default();
728
729        test_pipeline!(A, "pipeline_name");
730
731        let mut conn = store.connect().await.unwrap();
732
733        let indexer_args = IndexerArgs {
734            first_checkpoint,
735            ..IndexerArgs::default()
736        };
737        let (mut indexer, _temp_dir) =
738            create_test_indexer(store.clone(), indexer_args, &registry, None).await;
739
740        if is_concurrent {
741            add_concurrent(&mut indexer, A).await;
742        } else {
743            add_sequential(&mut indexer, A).await;
744        }
745
746        (
747            conn.committer_watermark(A::NAME).await.unwrap(),
748            conn.pruner_watermark(A::NAME, Duration::ZERO)
749                .await
750                .unwrap(),
751        )
752    }
753
754    const LATEST_CHECKPOINT: u64 = 10;
755
756    /// Set up an indexer as if the network is at `latest_checkpoint` (the next checkpoint to
757    /// ingest). Runs a single concurrent pipeline with the given config. If `watermark` is
758    /// provided, the pipeline's high watermark is pre-set; `first_checkpoint` controls where
759    /// ingestion begins.
760    async fn test_next_checkpoint(
761        watermark: Option<u64>,
762        first_checkpoint: Option<u64>,
763        concurrent_config: ConcurrentConfig,
764    ) -> Indexer<MockStore> {
765        let registry = Registry::new();
766        let store = MockStore::default();
767
768        test_pipeline!(A, "concurrent_a");
769
770        if let Some(checkpoint_hi_inclusive) = watermark {
771            let mut conn = store.connect().await.unwrap();
772            conn.set_committer_watermark(
773                A::NAME,
774                CommitterWatermark {
775                    checkpoint_hi_inclusive,
776                    ..Default::default()
777                },
778            )
779            .await
780            .unwrap();
781        }
782
783        let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
784        let mut indexer = Indexer::new(
785            store,
786            IndexerArgs {
787                first_checkpoint,
788                ..Default::default()
789            },
790            ClientArgs {
791                ingestion: IngestionClientArgs {
792                    local_ingestion_path: Some(temp_dir.path().to_owned()),
793                    ..Default::default()
794                },
795                ..Default::default()
796            },
797            IngestionConfig::default(),
798            None,
799            &registry,
800        )
801        .await
802        .unwrap();
803
804        assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
805
806        indexer
807            .concurrent_pipeline::<A>(A, concurrent_config)
808            .await
809            .unwrap();
810
811        indexer
812    }
813
814    fn pruner_config(retention: u64) -> ConcurrentConfig {
815        ConcurrentConfig {
816            pruner: Some(concurrent::PrunerConfig {
817                retention,
818                ..Default::default()
819            }),
820            ..Default::default()
821        }
822    }
823
824    #[test]
825    fn test_arg_parsing() {
826        #[derive(Parser)]
827        struct Args {
828            #[clap(flatten)]
829            indexer: IndexerArgs,
830        }
831
832        let args = Args::try_parse_from([
833            "cmd",
834            "--first-checkpoint",
835            "10",
836            "--last-checkpoint",
837            "100",
838            "--pipeline",
839            "a",
840            "--pipeline",
841            "b",
842            "--task",
843            "t",
844            "--reader-interval-ms",
845            "5000",
846        ])
847        .unwrap();
848
849        assert_eq!(args.indexer.first_checkpoint, Some(10));
850        assert_eq!(args.indexer.last_checkpoint, Some(100));
851        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
852        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
853        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
854    }
855
856    /// next_checkpoint is smallest among existing watermarks + 1.
857    #[tokio::test]
858    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
859        let registry = Registry::new();
860        let store = MockStore::default();
861
862        test_pipeline!(A, "concurrent_a");
863        test_pipeline!(B, "concurrent_b");
864        test_pipeline!(C, "sequential_c");
865        test_pipeline!(D, "sequential_d");
866
867        let mut conn = store.connect().await.unwrap();
868
869        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
870        set_committer_watermark(&mut conn, A::NAME, 100).await;
871
872        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
873        set_committer_watermark(&mut conn, B::NAME, 10).await;
874
875        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
876        set_committer_watermark(&mut conn, C::NAME, 1).await;
877
878        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
879        set_committer_watermark(&mut conn, D::NAME, 50).await;
880
881        let (mut indexer, _temp_dir) =
882            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
883
884        add_concurrent(&mut indexer, A).await;
885        add_concurrent(&mut indexer, B).await;
886        add_sequential(&mut indexer, C).await;
887        add_sequential(&mut indexer, D).await;
888
889        assert_eq!(indexer.first_checkpoint, None);
890        assert_eq!(indexer.last_checkpoint, None);
891        assert_eq!(indexer.latest_checkpoint, 0);
892        assert_eq!(indexer.next_checkpoint, 2);
893        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
894    }
895
896    /// next_checkpoint is 0 when at least one pipeline has no watermark.
897    #[tokio::test]
898    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
899        let registry = Registry::new();
900        let store = MockStore::default();
901
902        test_pipeline!(A, "concurrent_a");
903        test_pipeline!(B, "concurrent_b");
904        test_pipeline!(C, "sequential_c");
905        test_pipeline!(D, "sequential_d");
906
907        let mut conn = store.connect().await.unwrap();
908        set_committer_watermark(&mut conn, B::NAME, 10).await;
909        set_committer_watermark(&mut conn, C::NAME, 1).await;
910
911        let (mut indexer, _temp_dir) =
912            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
913
914        add_concurrent(&mut indexer, A).await;
915        add_concurrent(&mut indexer, B).await;
916        add_sequential(&mut indexer, C).await;
917        add_sequential(&mut indexer, D).await;
918
919        assert_eq!(indexer.first_checkpoint, None);
920        assert_eq!(indexer.last_checkpoint, None);
921        assert_eq!(indexer.latest_checkpoint, 0);
922        assert_eq!(indexer.next_checkpoint, 0);
923        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
924    }
925
926    /// next_checkpoint is 1 when smallest committer watermark is 0.
927    #[tokio::test]
928    async fn test_next_checkpoint_smallest_is_0() {
929        let registry = Registry::new();
930        let store = MockStore::default();
931
932        test_pipeline!(A, "concurrent_a");
933        test_pipeline!(B, "concurrent_b");
934        test_pipeline!(C, "sequential_c");
935        test_pipeline!(D, "sequential_d");
936
937        let mut conn = store.connect().await.unwrap();
938        set_committer_watermark(&mut conn, A::NAME, 100).await;
939        set_committer_watermark(&mut conn, B::NAME, 10).await;
940        set_committer_watermark(&mut conn, C::NAME, 1).await;
941        set_committer_watermark(&mut conn, D::NAME, 0).await;
942
943        let (mut indexer, _temp_dir) =
944            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
945
946        add_concurrent(&mut indexer, A).await;
947        add_concurrent(&mut indexer, B).await;
948        add_sequential(&mut indexer, C).await;
949        add_sequential(&mut indexer, D).await;
950
951        assert_eq!(indexer.next_checkpoint, 1);
952    }
953
954    /// next_checkpoint is first_checkpoint when at least one pipeline has no
955    /// watermark, and first_checkpoint is smallest.
956    #[tokio::test]
957    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
958        let registry = Registry::new();
959        let store = MockStore::default();
960
961        test_pipeline!(A, "concurrent_a");
962        test_pipeline!(B, "concurrent_b");
963        test_pipeline!(C, "sequential_c");
964        test_pipeline!(D, "sequential_d");
965
966        let mut conn = store.connect().await.unwrap();
967        set_committer_watermark(&mut conn, B::NAME, 50).await;
968        set_committer_watermark(&mut conn, C::NAME, 10).await;
969
970        let indexer_args = IndexerArgs {
971            first_checkpoint: Some(5),
972            ..Default::default()
973        };
974        let (mut indexer, _temp_dir) =
975            create_test_indexer(store, indexer_args, &registry, None).await;
976
977        add_concurrent(&mut indexer, A).await;
978        add_concurrent(&mut indexer, B).await;
979        add_sequential(&mut indexer, C).await;
980        add_sequential(&mut indexer, D).await;
981
982        assert_eq!(indexer.first_checkpoint, Some(5));
983        assert_eq!(indexer.last_checkpoint, None);
984        assert_eq!(indexer.latest_checkpoint, 0);
985        assert_eq!(indexer.next_checkpoint, 5);
986        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
987    }
988
989    /// next_checkpoint is smallest among existing watermarks + 1 if
990    /// all pipelines have watermarks (ignores first_checkpoint).
991    #[tokio::test]
992    async fn test_next_checkpoint_ignore_first_checkpoint() {
993        let registry = Registry::new();
994        let store = MockStore::default();
995
996        test_pipeline!(B, "concurrent_b");
997        test_pipeline!(C, "sequential_c");
998
999        let mut conn = store.connect().await.unwrap();
1000        set_committer_watermark(&mut conn, B::NAME, 50).await;
1001        set_committer_watermark(&mut conn, C::NAME, 10).await;
1002
1003        let indexer_args = IndexerArgs {
1004            first_checkpoint: Some(5),
1005            ..Default::default()
1006        };
1007        let (mut indexer, _temp_dir) =
1008            create_test_indexer(store, indexer_args, &registry, None).await;
1009
1010        add_concurrent(&mut indexer, B).await;
1011        add_sequential(&mut indexer, C).await;
1012
1013        assert_eq!(indexer.first_checkpoint, Some(5));
1014        assert_eq!(indexer.last_checkpoint, None);
1015        assert_eq!(indexer.latest_checkpoint, 0);
1016        assert_eq!(indexer.next_checkpoint, 11);
1017        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1018    }
1019
1020    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1021    /// will not be used as the starting point if it is not the smallest valid committer watermark
1022    /// to resume ingesting from.
1023    #[tokio::test]
1024    async fn test_next_checkpoint_large_first_checkpoint() {
1025        let registry = Registry::new();
1026        let store = MockStore::default();
1027
1028        test_pipeline!(A, "concurrent_a");
1029        test_pipeline!(B, "concurrent_b");
1030        test_pipeline!(C, "sequential_c");
1031
1032        let mut conn = store.connect().await.unwrap();
1033        set_committer_watermark(&mut conn, B::NAME, 50).await;
1034        set_committer_watermark(&mut conn, C::NAME, 10).await;
1035
1036        let indexer_args = IndexerArgs {
1037            first_checkpoint: Some(24),
1038            ..Default::default()
1039        };
1040        let (mut indexer, _temp_dir) =
1041            create_test_indexer(store, indexer_args, &registry, None).await;
1042
1043        add_concurrent(&mut indexer, A).await;
1044        add_concurrent(&mut indexer, B).await;
1045        add_sequential(&mut indexer, C).await;
1046
1047        assert_eq!(indexer.first_checkpoint, Some(24));
1048        assert_eq!(indexer.last_checkpoint, None);
1049        assert_eq!(indexer.latest_checkpoint, 0);
1050        assert_eq!(indexer.next_checkpoint, 11);
1051        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1052    }
1053
1054    /// latest_checkpoint is read from the watermark file.
1055    #[tokio::test]
1056    async fn test_latest_checkpoint_from_watermark() {
1057        let registry = Registry::new();
1058        let store = MockStore::default();
1059        let temp_dir = init_ingestion_dir(Some(30));
1060        let indexer = Indexer::new(
1061            store,
1062            IndexerArgs::default(),
1063            ClientArgs {
1064                ingestion: IngestionClientArgs {
1065                    local_ingestion_path: Some(temp_dir.path().to_owned()),
1066                    ..Default::default()
1067                },
1068                ..Default::default()
1069            },
1070            IngestionConfig::default(),
1071            None,
1072            &registry,
1073        )
1074        .await
1075        .unwrap();
1076
1077        assert_eq!(indexer.latest_checkpoint, 30);
1078    }
1079
1080    /// No watermark, no first_checkpoint, pruner with retention:
1081    /// next_checkpoint = LATEST_CHECKPOINT - retention.
1082    #[tokio::test]
1083    async fn test_next_checkpoint_with_pruner_uses_retention() {
1084        let retention = LATEST_CHECKPOINT - 1;
1085        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1086        assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1087    }
1088
1089    /// No watermark, no first_checkpoint, no pruner: falls back to 0.
1090    #[tokio::test]
1091    async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1092        let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1093        assert_eq!(indexer.next_checkpoint, 0);
1094    }
1095
1096    /// Watermark at 5 takes priority over latest_checkpoint - retention.
1097    #[tokio::test]
1098    async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1099        let retention = LATEST_CHECKPOINT - 1;
1100        let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1101        assert_eq!(indexer.next_checkpoint, 6);
1102    }
1103
1104    /// first_checkpoint takes priority over latest_checkpoint - retention when
1105    /// there's no watermark.
1106    #[tokio::test]
1107    async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1108        let retention = LATEST_CHECKPOINT - 1;
1109        let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1110        assert_eq!(indexer.next_checkpoint, 2);
1111    }
1112
1113    /// When retention exceeds latest_checkpoint, saturating_sub clamps to 0.
1114    #[tokio::test]
1115    async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1116        let retention = LATEST_CHECKPOINT + 1;
1117        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1118        assert_eq!(indexer.next_checkpoint, 0);
1119    }
1120
1121    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1122    #[tokio::test]
1123    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1124        let registry = Registry::new();
1125        let store = MockStore::default();
1126
1127        test_pipeline!(A, "concurrent_a");
1128        test_pipeline!(B, "concurrent_b");
1129        test_pipeline!(C, "sequential_c");
1130        test_pipeline!(D, "sequential_d");
1131
1132        let mut conn = store.connect().await.unwrap();
1133        set_committer_watermark(&mut conn, A::NAME, 5).await;
1134        set_committer_watermark(&mut conn, B::NAME, 10).await;
1135        set_committer_watermark(&mut conn, C::NAME, 15).await;
1136        set_committer_watermark(&mut conn, D::NAME, 20).await;
1137
1138        let indexer_args = IndexerArgs {
1139            last_checkpoint: Some(29),
1140            ..Default::default()
1141        };
1142        let (mut indexer, _temp_dir) =
1143            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1144
1145        add_concurrent(&mut indexer, A).await;
1146        add_concurrent(&mut indexer, B).await;
1147        add_sequential(&mut indexer, C).await;
1148        add_sequential(&mut indexer, D).await;
1149
1150        let ingestion_metrics = indexer.ingestion_metrics().clone();
1151        let indexer_metrics = indexer.indexer_metrics().clone();
1152
1153        indexer.run().await.unwrap().join().await.unwrap();
1154
1155        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1156        assert_out_of_order!(indexer_metrics, A::NAME, 0);
1157        assert_out_of_order!(indexer_metrics, B::NAME, 5);
1158        assert_out_of_order!(indexer_metrics, C::NAME, 10);
1159        assert_out_of_order!(indexer_metrics, D::NAME, 15);
1160    }
1161
1162    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1163    #[tokio::test]
1164    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1165        let registry = Registry::new();
1166        let store = MockStore::default();
1167
1168        test_pipeline!(A, "concurrent_a");
1169        test_pipeline!(B, "concurrent_b");
1170        test_pipeline!(C, "sequential_c");
1171        test_pipeline!(D, "sequential_d");
1172
1173        let mut conn = store.connect().await.unwrap();
1174        set_committer_watermark(&mut conn, A::NAME, 5).await;
1175        set_committer_watermark(&mut conn, B::NAME, 10).await;
1176        set_committer_watermark(&mut conn, C::NAME, 15).await;
1177        set_committer_watermark(&mut conn, D::NAME, 20).await;
1178
1179        let indexer_args = IndexerArgs {
1180            first_checkpoint: Some(3),
1181            last_checkpoint: Some(29),
1182            ..Default::default()
1183        };
1184        let (mut indexer, _temp_dir) =
1185            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1186
1187        add_concurrent(&mut indexer, A).await;
1188        add_concurrent(&mut indexer, B).await;
1189        add_sequential(&mut indexer, C).await;
1190        add_sequential(&mut indexer, D).await;
1191
1192        let ingestion_metrics = indexer.ingestion_metrics().clone();
1193        let metrics = indexer.indexer_metrics().clone();
1194        indexer.run().await.unwrap().join().await.unwrap();
1195
1196        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1197        assert_out_of_order!(metrics, A::NAME, 0);
1198        assert_out_of_order!(metrics, B::NAME, 5);
1199        assert_out_of_order!(metrics, C::NAME, 10);
1200        assert_out_of_order!(metrics, D::NAME, 15);
1201    }
1202
1203    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1204    #[tokio::test]
1205    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1206        let registry = Registry::new();
1207        let store = MockStore::default();
1208
1209        test_pipeline!(A, "concurrent_a");
1210        test_pipeline!(B, "concurrent_b");
1211        test_pipeline!(C, "sequential_c");
1212        test_pipeline!(D, "sequential_d");
1213
1214        let mut conn = store.connect().await.unwrap();
1215        set_committer_watermark(&mut conn, B::NAME, 10).await;
1216        set_committer_watermark(&mut conn, C::NAME, 15).await;
1217        set_committer_watermark(&mut conn, D::NAME, 20).await;
1218
1219        let indexer_args = IndexerArgs {
1220            last_checkpoint: Some(29),
1221            ..Default::default()
1222        };
1223        let (mut indexer, _temp_dir) =
1224            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1225
1226        add_concurrent(&mut indexer, A).await;
1227        add_concurrent(&mut indexer, B).await;
1228        add_sequential(&mut indexer, C).await;
1229        add_sequential(&mut indexer, D).await;
1230
1231        let ingestion_metrics = indexer.ingestion_metrics().clone();
1232        let metrics = indexer.indexer_metrics().clone();
1233        indexer.run().await.unwrap().join().await.unwrap();
1234
1235        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1236        assert_out_of_order!(metrics, A::NAME, 0);
1237        assert_out_of_order!(metrics, B::NAME, 11);
1238        assert_out_of_order!(metrics, C::NAME, 16);
1239        assert_out_of_order!(metrics, D::NAME, 21);
1240    }
1241
1242    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1243    #[tokio::test]
1244    async fn test_indexer_ingestion_use_first_checkpoint() {
1245        let registry = Registry::new();
1246        let store = MockStore::default();
1247
1248        test_pipeline!(A, "concurrent_a");
1249        test_pipeline!(B, "concurrent_b");
1250        test_pipeline!(C, "sequential_c");
1251        test_pipeline!(D, "sequential_d");
1252
1253        let mut conn = store.connect().await.unwrap();
1254        set_committer_watermark(&mut conn, B::NAME, 10).await;
1255        set_committer_watermark(&mut conn, C::NAME, 15).await;
1256        set_committer_watermark(&mut conn, D::NAME, 20).await;
1257
1258        let indexer_args = IndexerArgs {
1259            first_checkpoint: Some(10),
1260            last_checkpoint: Some(29),
1261            ..Default::default()
1262        };
1263        let (mut indexer, _temp_dir) =
1264            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1265
1266        add_concurrent(&mut indexer, A).await;
1267        add_concurrent(&mut indexer, B).await;
1268        add_sequential(&mut indexer, C).await;
1269        add_sequential(&mut indexer, D).await;
1270
1271        let ingestion_metrics = indexer.ingestion_metrics().clone();
1272        let metrics = indexer.indexer_metrics().clone();
1273        indexer.run().await.unwrap().join().await.unwrap();
1274
1275        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1276        assert_out_of_order!(metrics, A::NAME, 0);
1277        assert_out_of_order!(metrics, B::NAME, 1);
1278        assert_out_of_order!(metrics, C::NAME, 6);
1279        assert_out_of_order!(metrics, D::NAME, 11);
1280    }
1281
1282    #[tokio::test]
1283    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1284        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1285        assert_eq!(committer_watermark, None);
1286        assert_eq!(pruner_watermark, None);
1287    }
1288
1289    #[tokio::test]
1290    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1291        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1292        assert_eq!(committer_watermark, None);
1293        assert_eq!(pruner_watermark, None);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1298        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1299
1300        let committer_watermark = committer_watermark.unwrap();
1301        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1302
1303        let pruner_watermark = pruner_watermark.unwrap();
1304        assert_eq!(pruner_watermark.reader_lo, 1);
1305        assert_eq!(pruner_watermark.pruner_hi, 1);
1306    }
1307
1308    #[tokio::test]
1309    async fn test_init_watermark_sequential() {
1310        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1311
1312        let committer_watermark = committer_watermark.unwrap();
1313        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1314
1315        let pruner_watermark = pruner_watermark.unwrap();
1316        assert_eq!(pruner_watermark.reader_lo, 1);
1317        assert_eq!(pruner_watermark.pruner_hi, 1);
1318    }
1319
1320    #[tokio::test]
1321    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1322        let registry = Registry::new();
1323        let store = MockStore::default();
1324
1325        let mut conn = store.connect().await.unwrap();
1326        set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1327        set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1328
1329        let indexer_args = IndexerArgs {
1330            first_checkpoint: None,
1331            last_checkpoint: Some(19),
1332            pipeline: vec![],
1333            ..Default::default()
1334        };
1335        let (mut indexer, _temp_dir) =
1336            create_test_indexer(store.clone(), indexer_args, &registry, Some((20, 2))).await;
1337
1338        // Add first sequential pipeline
1339        add_sequential(&mut indexer, MockHandler).await;
1340
1341        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1342        assert_eq!(
1343            indexer.next_sequential_checkpoint(),
1344            Some(11),
1345            "next_sequential_checkpoint should be 11"
1346        );
1347
1348        // Add second sequential pipeline
1349        add_sequential(&mut indexer, SequentialHandler).await;
1350
1351        // Should change to 6 (minimum of 6 and 11)
1352        assert_eq!(
1353            indexer.next_sequential_checkpoint(),
1354            Some(6),
1355            "next_sequential_checkpoint should still be 6"
1356        );
1357
1358        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1359        indexer.run().await.unwrap().join().await.unwrap();
1360
1361        // Verify each pipeline made some progress independently
1362        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1363        let watermark2 = conn
1364            .committer_watermark(SequentialHandler::NAME)
1365            .await
1366            .unwrap();
1367
1368        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1369        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1370    }
1371
1372    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1373    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1374    /// committing checkpoints less than the main pipeline's reader watermark.
1375    #[tokio::test]
1376    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1377        let registry = Registry::new();
1378        let store = MockStore::default();
1379
1380        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1381        // reader watermark at `7`.
1382        let mut conn = store.connect().await.unwrap();
1383        set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1384        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1385            .await
1386            .unwrap();
1387
1388        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1389        // be ignored by the tasked indexer.
1390        let indexer_args = IndexerArgs {
1391            first_checkpoint: Some(0),
1392            last_checkpoint: Some(15),
1393            task: TaskArgs::tasked("task".to_string(), 10),
1394            ..Default::default()
1395        };
1396        let (mut tasked_indexer, _temp_dir) =
1397            create_test_indexer(store.clone(), indexer_args, &registry, Some((16, 2))).await;
1398
1399        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1400
1401        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1402        let metrics = tasked_indexer.indexer_metrics().clone();
1403
1404        tasked_indexer.run().await.unwrap().join().await.unwrap();
1405
1406        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1407        assert_eq!(
1408            metrics
1409                .total_collector_skipped_checkpoints
1410                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1411                .unwrap()
1412                .get(),
1413            7
1414        );
1415        let data = store
1416            .data
1417            .get(MockCheckpointSequenceNumberHandler::NAME)
1418            .unwrap();
1419        assert_eq!(data.len(), 9);
1420        for i in 0..7 {
1421            assert!(data.get(&i).is_none());
1422        }
1423        for i in 7..16 {
1424            assert!(data.get(&i).is_some());
1425        }
1426    }
1427
1428    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1429    #[tokio::test]
1430    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1431        let registry = Registry::new();
1432        let store = MockStore::default();
1433
1434        let mut conn = store.connect().await.unwrap();
1435        set_committer_watermark(&mut conn, "test", 10).await;
1436        conn.set_reader_watermark("test", 5).await.unwrap();
1437
1438        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1439        // watermarks.
1440        let indexer_args = IndexerArgs {
1441            first_checkpoint: Some(9),
1442            last_checkpoint: Some(25),
1443            task: TaskArgs::tasked("task".to_string(), 10),
1444            ..Default::default()
1445        };
1446        let (mut tasked_indexer, _temp_dir) =
1447            create_test_indexer(store.clone(), indexer_args, &registry, Some((26, 2))).await;
1448
1449        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1450
1451        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1452        let metrics = tasked_indexer.indexer_metrics().clone();
1453
1454        tasked_indexer.run().await.unwrap().join().await.unwrap();
1455
1456        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1457        assert_out_of_order!(metrics, "test", 0);
1458        assert_eq!(
1459            metrics
1460                .total_collector_skipped_checkpoints
1461                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1462                .unwrap()
1463                .get(),
1464            0
1465        );
1466
1467        let data = store.data.get("test").unwrap();
1468        assert_eq!(data.len(), 17);
1469        for i in 0..9 {
1470            assert!(data.get(&i).is_none());
1471        }
1472        for i in 9..26 {
1473            assert!(data.get(&i).is_some());
1474        }
1475        let main_pipeline_watermark = store.watermark("test").unwrap();
1476        // assert that the main pipeline's watermarks are not updated
1477        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1478        assert_eq!(main_pipeline_watermark.reader_lo, 5);
1479        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1480        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1481        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1482    }
1483
1484    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
1485    /// committed, and any checkpoints inflight < X will be skipped.
1486    #[tokio::test]
1487    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1488        let registry = Registry::new();
1489        let store = MockStore::default();
1490        let mut conn = store.connect().await.unwrap();
1491        // Set the main pipeline watermark.
1492        set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1493
1494        // Generate 500 checkpoints upfront, for the indexer to process all at once.
1495        let indexer_args = IndexerArgs {
1496            first_checkpoint: Some(0),
1497            last_checkpoint: Some(500),
1498            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
1499            ..Default::default()
1500        };
1501        let (mut tasked_indexer, _temp_dir) =
1502            create_test_indexer(store.clone(), indexer_args, &registry, Some((501, 2))).await;
1503        let mut allow_process = 10;
1504        // Limit the pipeline to process only checkpoints `[0, 10]`.
1505        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1506        let _ = tasked_indexer
1507            .concurrent_pipeline(
1508                controllable_handler,
1509                ConcurrentConfig {
1510                    committer: CommitterConfig {
1511                        collect_interval_ms: 10,
1512                        watermark_interval_ms: 10,
1513                        ..Default::default()
1514                    },
1515                    // High fixed concurrency so all checkpoints can be processed
1516                    // concurrently despite out-of-order arrival.
1517                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1518                    ..Default::default()
1519                },
1520            )
1521            .await;
1522        let metrics = tasked_indexer.indexer_metrics().clone();
1523
1524        let mut s_indexer = tasked_indexer.run().await.unwrap();
1525
1526        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
1527        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
1528        // committed.
1529        store
1530            .wait_for_watermark(
1531                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1532                10,
1533                Duration::from_secs(10),
1534            )
1535            .await;
1536
1537        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
1538        // track_main_reader_lo task will eventually pick this up and update the atomic. The
1539        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
1540        // one at a time until the collector_reader_lo metric shows the new value.
1541        conn.set_reader_watermark(ControllableHandler::NAME, 250)
1542            .await
1543            .unwrap();
1544
1545        let reader_lo = metrics
1546            .collector_reader_lo
1547            .with_label_values(&[ControllableHandler::NAME]);
1548
1549        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
1550        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
1551        // checkpoints have been processed.
1552        let mut interval = tokio::time::interval(Duration::from_millis(10));
1553        while reader_lo.get() != 250 {
1554            interval.tick().await;
1555            // allow_process is initialized to 11, bump to 11 for the next checkpoint
1556            allow_process += 1;
1557            assert!(
1558                allow_process <= 500,
1559                "Released all checkpoints but collector never observed new reader_lo"
1560            );
1561            process_below.send(allow_process).ok();
1562        }
1563
1564        // At this point, the collector has observed reader_lo = 250. Release all remaining
1565        // checkpoints. Guarantees:
1566        // - [0, 10]: committed (before reader_lo was set)
1567        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
1568        // - (allow_process, 250): skipped (in-flight, filtered by collector)
1569        // - [250, 500]: committed (>= reader_lo)
1570        process_below.send(500).ok();
1571
1572        s_indexer.join().await.unwrap();
1573
1574        let data = store.data.get(ControllableHandler::NAME).unwrap();
1575
1576        // Checkpoints (allow_process, 250) must be skipped.
1577        for chkpt in (allow_process + 1)..250 {
1578            assert!(
1579                data.get(&chkpt).is_none(),
1580                "Checkpoint {chkpt} should have been skipped"
1581            );
1582        }
1583
1584        // Checkpoints >= reader_lo must be committed.
1585        for chkpt in 250..=500 {
1586            assert!(
1587                data.get(&chkpt).is_some(),
1588                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1589            );
1590        }
1591
1592        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
1593        for chkpt in 0..=10 {
1594            assert!(
1595                data.get(&chkpt).is_some(),
1596                "Checkpoint {chkpt} should have been committed (baseline)"
1597            );
1598        }
1599    }
1600}