sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36/// External users access the store trait through framework::store
37pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52/// Command-line arguments for the indexer
53#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55    /// Override the next checkpoint for all pipelines without a committer watermark to start
56    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
57    /// setting and always resume from their committer watermark + 1.
58    ///
59    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
60    /// is the minimum across all pipelines' next checkpoints.
61    #[arg(long)]
62    pub first_checkpoint: Option<u64>,
63
64    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
65    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
66    #[arg(long)]
67    pub last_checkpoint: Option<u64>,
68
69    /// Only run the following pipelines. If not provided, all pipelines found in the
70    /// configuration file will be run.
71    #[arg(long, action = clap::ArgAction::Append)]
72    pub pipeline: Vec<String>,
73
74    /// Additional configurations for running a tasked indexer.
75    #[clap(flatten)]
76    pub task: TaskArgs,
77}
78
79/// Command-line arguments for configuring a tasked indexer.
80#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
83    /// delimiter defined on the store. This allows the same pipelines to run under multiple
84    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
85    /// entries in the database.
86    ///
87    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
88    ///
89    /// Sequential pipelines cannot be attached to a tasked indexer.
90    ///
91    /// The framework ensures that tasked pipelines never commit checkpoints below the main
92    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
93    #[arg(long, requires = "reader_interval_ms")]
94    task: Option<String>,
95
96    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
97    /// refetch its main pipeline's reader watermark.
98    ///
99    /// This is required when `--task` is set and should should ideally be set to a value that is
100    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
101    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
102    /// it.
103    ///
104    /// If the main pipeline does not have pruning enabled, this value can be set to some high
105    /// value, as the tasked pipeline will never see an updated reader watermark.
106    #[arg(long, requires = "task")]
107    reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111    /// The storage backend that the indexer uses to write and query indexed data. This
112    /// generic implementation allows for plugging in different storage solutions that implement the
113    /// `Store` trait.
114    store: S,
115
116    /// Prometheus Metrics.
117    metrics: Arc<IndexerMetrics>,
118
119    /// Service for downloading and disseminating checkpoint data.
120    ingestion_service: IngestionService,
121
122    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
123    /// watermark will start processing at this checkpoint.
124    first_checkpoint: Option<u64>,
125
126    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
127    /// this checkpoint.
128    last_checkpoint: Option<u64>,
129
130    /// The network's latest checkpoint, when the indexer was started.
131    latest_checkpoint: u64,
132
133    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
134    /// to start ingesting from.
135    next_checkpoint: u64,
136
137    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
138    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
139    next_sequential_checkpoint: Option<u64>,
140
141    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
142    /// delimiter defined on the store. This allows the same pipelines to run under multiple
143    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
144    /// entries in the database.
145    ///
146    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
147    ///
148    /// Sequential pipelines cannot be attached to a tasked indexer.
149    ///
150    /// The framework ensures that tasked pipelines never commit checkpoints below the main
151    /// pipeline’s pruner watermark.
152    task: Option<Task>,
153
154    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
155    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
156    /// a warning when the indexer is run.
157    enabled_pipelines: Option<BTreeSet<String>>,
158
159    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
160    /// with the same name isn't added twice.
161    added_pipelines: BTreeSet<&'static str>,
162
163    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
164    pipelines: Vec<Service>,
165}
166
167/// Configuration for a tasked indexer.
168#[derive(Clone)]
169pub(crate) struct Task {
170    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
171    /// record pipeline watermarks.
172    task: String,
173    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
174    /// pipeline's reader watermark.
175    reader_interval: Duration,
176}
177
178impl TaskArgs {
179    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
180        Self {
181            task: Some(task),
182            reader_interval_ms: Some(reader_interval_ms),
183        }
184    }
185
186    fn into_task(self) -> Option<Task> {
187        Some(Task {
188            task: self.task?,
189            reader_interval: Duration::from_millis(self.reader_interval_ms?),
190        })
191    }
192}
193
194impl<S: Store> Indexer<S> {
195    /// Create a new instance of the indexer framework from a store that implements the `Store`
196    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
197    /// arguments configure the following:
198    ///
199    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
200    ///   watermarks) and where to serve metrics from,
201    /// - Where to download checkpoints from,
202    /// - Concurrency and buffering parameters for downloading checkpoints.
203    ///
204    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
205    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
206    pub async fn new(
207        store: S,
208        indexer_args: IndexerArgs,
209        client_args: ClientArgs,
210        ingestion_config: IngestionConfig,
211        metrics_prefix: Option<&str>,
212        registry: &Registry,
213    ) -> Result<Self> {
214        let IndexerArgs {
215            first_checkpoint,
216            last_checkpoint,
217            pipeline,
218            task,
219        } = indexer_args;
220
221        let metrics = IndexerMetrics::new(metrics_prefix, registry);
222
223        let ingestion_service =
224            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
225
226        let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
227
228        info!(latest_checkpoint, "Ingestion store state");
229
230        Ok(Self {
231            store,
232            metrics,
233            ingestion_service,
234            first_checkpoint,
235            last_checkpoint,
236            latest_checkpoint,
237            next_checkpoint: u64::MAX,
238            next_sequential_checkpoint: None,
239            task: task.into_task(),
240            enabled_pipelines: if pipeline.is_empty() {
241                None
242            } else {
243                Some(pipeline.into_iter().collect())
244            },
245            added_pipelines: BTreeSet::new(),
246            pipelines: vec![],
247        })
248    }
249
250    /// The store used by the indexer.
251    pub fn store(&self) -> &S {
252        &self.store
253    }
254
255    /// The ingestion client used by the indexer to fetch checkpoints.
256    pub fn ingestion_client(&self) -> &IngestionClient {
257        self.ingestion_service.ingestion_client()
258    }
259
260    /// The indexer's metrics.
261    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
262        &self.metrics
263    }
264
265    /// The ingestion service's metrics.
266    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
267        self.ingestion_service.metrics()
268    }
269
270    /// The pipelines that this indexer will run.
271    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
272        self.added_pipelines.iter().copied().filter(|p| {
273            self.enabled_pipelines
274                .as_ref()
275                .is_none_or(|e| e.contains(*p))
276        })
277    }
278
279    /// The minimum next checkpoint across all sequential pipelines. This value is used to
280    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
281    /// too far ahead of sequential pipelines.
282    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
283        self.next_sequential_checkpoint
284    }
285
286    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
287    /// will start processing and committing once the ingestion service has caught up to their
288    /// respective watermarks.
289    ///
290    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
291    pub async fn run(self) -> Result<Service> {
292        if let Some(enabled_pipelines) = self.enabled_pipelines {
293            ensure!(
294                enabled_pipelines.is_empty(),
295                "Tried to enable pipelines that this indexer does not know about: \
296                {enabled_pipelines:#?}",
297            );
298        }
299
300        let start = self.next_checkpoint;
301        let end = self.last_checkpoint;
302        info!(start, end, "Ingestion range");
303
304        let mut service = self
305            .ingestion_service
306            .run(
307                (
308                    Bound::Included(start),
309                    end.map_or(Bound::Unbounded, Bound::Included),
310                ),
311                self.next_sequential_checkpoint,
312            )
313            .await
314            .context("Failed to start ingestion service")?;
315
316        for pipeline in self.pipelines {
317            service = service.merge(pipeline);
318        }
319
320        Ok(service)
321    }
322
323    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
324    /// checkpoint after its watermark, or if that doesn't exist, then the provided
325    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
326    ///
327    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
328    /// calculated above.
329    ///
330    /// Returns `Ok(None)` if the pipeline is disabled.
331    async fn add_pipeline<P: Processor + 'static>(
332        &mut self,
333        pipeline_task: String,
334        retention: Option<u64>,
335    ) -> Result<Option<u64>> {
336        ensure!(
337            self.added_pipelines.insert(P::NAME),
338            "Pipeline {:?} already added",
339            P::NAME,
340        );
341
342        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
343            && !enabled_pipelines.remove(P::NAME)
344        {
345            info!(pipeline = P::NAME, "Skipping");
346            return Ok(None);
347        }
348
349        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
350        // Otherwise, use the existing record and disregard the proposed value.
351        let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
352            first_checkpoint
353        } else if let Some(retention) = retention {
354            self.latest_checkpoint.saturating_sub(retention)
355        } else {
356            0
357        };
358        let mut conn = self.store.connect().await?;
359        let init_watermark = conn
360            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
361            .await
362            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
363
364        let next_checkpoint = if let Some(init_watermark) = init_watermark {
365            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
366                checkpoint_hi_inclusive + 1
367            } else {
368                0
369            }
370        } else {
371            proposed_next_checkpoint
372        };
373
374        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
375
376        Ok(Some(next_checkpoint))
377    }
378}
379
380impl<S: ConcurrentStore> Indexer<S> {
381    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
382    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
383    ///
384    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
385    /// keep the watermark table up-to-date with the highest point they can guarantee all data
386    /// exists for, for their pipeline.
387    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
388        &mut self,
389        handler: H,
390        config: ConcurrentConfig,
391    ) -> Result<()> {
392        let pipeline_task =
393            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
394        let retention = config.pruner.as_ref().map(|p| p.retention);
395        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
396            return Ok(());
397        };
398
399        self.pipelines.push(concurrent::pipeline::<H>(
400            handler,
401            next_checkpoint,
402            config,
403            self.store.clone(),
404            self.task.clone(),
405            self.ingestion_service.subscribe().0,
406            self.metrics.clone(),
407        ));
408
409        Ok(())
410    }
411}
412
413impl<T: SequentialStore> Indexer<T> {
414    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
415    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
416    ///
417    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
418    /// required to handle pipelines that modify data in-place (where each update is not an insert,
419    /// but could be a modification of an existing row, where ordering between updates is
420    /// important).
421    ///
422    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
423    /// number of checkpoints (configured by `checkpoint_lag`).
424    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
425        &mut self,
426        handler: H,
427        config: SequentialConfig,
428    ) -> Result<()> {
429        if self.task.is_some() {
430            bail!(
431                "Sequential pipelines do not support pipeline tasks. \
432                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
433                Running the same pipeline under a different task would violate these guarantees."
434            );
435        }
436
437        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
438            return Ok(());
439        };
440
441        // Track the minimum next_checkpoint across all sequential pipelines
442        self.next_sequential_checkpoint = Some(
443            self.next_sequential_checkpoint
444                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
445        );
446
447        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
448
449        self.pipelines.push(sequential::pipeline::<H>(
450            handler,
451            next_checkpoint,
452            config,
453            self.store.clone(),
454            checkpoint_rx,
455            commit_hi_tx,
456            self.metrics.clone(),
457        ));
458
459        Ok(())
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use std::sync::Arc;
466
467    use async_trait::async_trait;
468    use clap::Parser;
469    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
470    use sui_synthetic_ingestion::synthetic_ingestion;
471    use tokio::sync::watch;
472
473    use crate::FieldCount;
474    use crate::config::ConcurrencyConfig;
475    use crate::ingestion::ingestion_client::IngestionClientArgs;
476    use crate::ingestion::store_client::ObjectStoreWatermark;
477    use crate::ingestion::store_client::WATERMARK_PATH;
478    use crate::mocks::store::MockStore;
479    use crate::pipeline::CommitterConfig;
480    use crate::pipeline::Processor;
481    use crate::pipeline::concurrent::ConcurrentConfig;
482    use crate::store::CommitterWatermark;
483    use crate::store::ConcurrentConnection as _;
484    use crate::store::Connection as _;
485
486    use super::*;
487
488    #[allow(dead_code)]
489    #[derive(Clone, FieldCount)]
490    struct MockValue(u64);
491
492    /// A handler that can be controlled externally to block checkpoint processing.
493    struct ControllableHandler {
494        /// Process checkpoints less than or equal to this value.
495        process_below: watch::Receiver<u64>,
496    }
497
498    impl ControllableHandler {
499        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
500            let (tx, rx) = watch::channel(limit);
501            (Self { process_below: rx }, tx)
502        }
503    }
504
505    #[async_trait]
506    impl Processor for ControllableHandler {
507        const NAME: &'static str = "controllable";
508        type Value = MockValue;
509
510        async fn process(
511            &self,
512            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
513        ) -> Result<Vec<Self::Value>> {
514            let cp_num = checkpoint.summary.sequence_number;
515
516            // Wait until the checkpoint is allowed to be processed
517            self.process_below
518                .clone()
519                .wait_for(|&limit| cp_num <= limit)
520                .await
521                .ok();
522
523            Ok(vec![MockValue(cp_num)])
524        }
525    }
526
527    #[async_trait]
528    impl concurrent::Handler for ControllableHandler {
529        type Store = MockStore;
530        type Batch = Vec<MockValue>;
531
532        fn batch(
533            &self,
534            batch: &mut Self::Batch,
535            values: &mut std::vec::IntoIter<Self::Value>,
536        ) -> concurrent::BatchStatus {
537            batch.extend(values);
538            concurrent::BatchStatus::Ready
539        }
540
541        async fn commit<'a>(
542            &self,
543            batch: &Self::Batch,
544            conn: &mut <Self::Store as Store>::Connection<'a>,
545        ) -> Result<usize> {
546            for value in batch {
547                conn.0
548                    .commit_data(Self::NAME, value.0, vec![value.0])
549                    .await?;
550            }
551            Ok(batch.len())
552        }
553    }
554
555    macro_rules! test_pipeline {
556        ($handler:ident, $name:literal) => {
557            struct $handler;
558
559            #[async_trait]
560            impl Processor for $handler {
561                const NAME: &'static str = $name;
562                type Value = MockValue;
563                async fn process(
564                    &self,
565                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
566                ) -> Result<Vec<Self::Value>> {
567                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
568                }
569            }
570
571            #[async_trait]
572            impl crate::pipeline::concurrent::Handler for $handler {
573                type Store = MockStore;
574                type Batch = Vec<Self::Value>;
575
576                fn batch(
577                    &self,
578                    batch: &mut Self::Batch,
579                    values: &mut std::vec::IntoIter<Self::Value>,
580                ) -> crate::pipeline::concurrent::BatchStatus {
581                    batch.extend(values);
582                    crate::pipeline::concurrent::BatchStatus::Pending
583                }
584
585                async fn commit<'a>(
586                    &self,
587                    batch: &Self::Batch,
588                    conn: &mut <Self::Store as Store>::Connection<'a>,
589                ) -> Result<usize> {
590                    for value in batch {
591                        conn.0
592                            .commit_data(Self::NAME, value.0, vec![value.0])
593                            .await?;
594                    }
595                    Ok(batch.len())
596                }
597            }
598
599            #[async_trait]
600            impl crate::pipeline::sequential::Handler for $handler {
601                type Store = MockStore;
602                type Batch = Vec<Self::Value>;
603
604                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
605                    batch.extend(values);
606                }
607
608                async fn commit<'a>(
609                    &self,
610                    _batch: &Self::Batch,
611                    _conn: &mut <Self::Store as Store>::Connection<'a>,
612                ) -> Result<usize> {
613                    Ok(1)
614                }
615            }
616        };
617    }
618
619    test_pipeline!(MockHandler, "test_processor");
620    test_pipeline!(SequentialHandler, "sequential_handler");
621    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
622
623    fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
624        let dir = tempfile::tempdir().unwrap();
625        if let Some(cp) = latest_checkpoint {
626            let watermark_path = dir.path().join(WATERMARK_PATH);
627            std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
628            let watermark = ObjectStoreWatermark {
629                checkpoint_hi_inclusive: cp,
630            };
631            std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
632        }
633        dir
634    }
635
636    /// If `ingestion_data` is `Some((num_checkpoints, checkpoint_size))`, synthetic ingestion
637    /// data will be generated in the temp directory before creating the indexer.
638    async fn create_test_indexer(
639        store: MockStore,
640        indexer_args: IndexerArgs,
641        registry: &Registry,
642        ingestion_data: Option<(u64, u64)>,
643    ) -> (Indexer<MockStore>, tempfile::TempDir) {
644        let temp_dir = init_ingestion_dir(None);
645        if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
646            synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
647                ingestion_dir: temp_dir.path().to_owned(),
648                starting_checkpoint: 0,
649                num_checkpoints,
650                checkpoint_size,
651            })
652            .await;
653        }
654        let client_args = ClientArgs {
655            ingestion: IngestionClientArgs {
656                local_ingestion_path: Some(temp_dir.path().to_owned()),
657                ..Default::default()
658            },
659            ..Default::default()
660        };
661        let indexer = Indexer::new(
662            store,
663            indexer_args,
664            client_args,
665            IngestionConfig::default(),
666            None,
667            registry,
668        )
669        .await
670        .unwrap();
671        (indexer, temp_dir)
672    }
673
674    async fn set_committer_watermark(
675        conn: &mut <MockStore as Store>::Connection<'_>,
676        name: &str,
677        hi: u64,
678    ) {
679        conn.set_committer_watermark(
680            name,
681            CommitterWatermark {
682                checkpoint_hi_inclusive: hi,
683                ..Default::default()
684            },
685        )
686        .await
687        .unwrap();
688    }
689
690    async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
691        indexer: &mut Indexer<MockStore>,
692        handler: H,
693    ) {
694        indexer
695            .concurrent_pipeline(handler, ConcurrentConfig::default())
696            .await
697            .unwrap();
698    }
699
700    async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
701        indexer: &mut Indexer<MockStore>,
702        handler: H,
703    ) {
704        indexer
705            .sequential_pipeline(handler, SequentialConfig::default())
706            .await
707            .unwrap();
708    }
709
710    macro_rules! assert_out_of_order {
711        ($metrics:expr, $pipeline:expr, $expected:expr) => {
712            assert_eq!(
713                $metrics
714                    .total_watermarks_out_of_order
715                    .get_metric_with_label_values(&[$pipeline])
716                    .unwrap()
717                    .get(),
718                $expected,
719            );
720        };
721    }
722
723    async fn test_init_watermark(
724        first_checkpoint: Option<u64>,
725        is_concurrent: bool,
726    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
727        let registry = Registry::new();
728        let store = MockStore::default();
729
730        test_pipeline!(A, "pipeline_name");
731
732        let mut conn = store.connect().await.unwrap();
733
734        let indexer_args = IndexerArgs {
735            first_checkpoint,
736            ..IndexerArgs::default()
737        };
738        let (mut indexer, _temp_dir) =
739            create_test_indexer(store.clone(), indexer_args, &registry, None).await;
740
741        if is_concurrent {
742            add_concurrent(&mut indexer, A).await;
743        } else {
744            add_sequential(&mut indexer, A).await;
745        }
746
747        (
748            conn.committer_watermark(A::NAME).await.unwrap(),
749            conn.pruner_watermark(A::NAME, Duration::ZERO)
750                .await
751                .unwrap(),
752        )
753    }
754
755    const LATEST_CHECKPOINT: u64 = 10;
756
757    /// Set up an indexer as if the network is at `latest_checkpoint` (the next checkpoint to
758    /// ingest). Runs a single concurrent pipeline with the given config. If `watermark` is
759    /// provided, the pipeline's high watermark is pre-set; `first_checkpoint` controls where
760    /// ingestion begins.
761    async fn test_next_checkpoint(
762        watermark: Option<u64>,
763        first_checkpoint: Option<u64>,
764        concurrent_config: ConcurrentConfig,
765    ) -> Indexer<MockStore> {
766        let registry = Registry::new();
767        let store = MockStore::default();
768
769        test_pipeline!(A, "concurrent_a");
770
771        if let Some(checkpoint_hi_inclusive) = watermark {
772            let mut conn = store.connect().await.unwrap();
773            conn.set_committer_watermark(
774                A::NAME,
775                CommitterWatermark {
776                    checkpoint_hi_inclusive,
777                    ..Default::default()
778                },
779            )
780            .await
781            .unwrap();
782        }
783
784        let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
785        let mut indexer = Indexer::new(
786            store,
787            IndexerArgs {
788                first_checkpoint,
789                ..Default::default()
790            },
791            ClientArgs {
792                ingestion: IngestionClientArgs {
793                    local_ingestion_path: Some(temp_dir.path().to_owned()),
794                    ..Default::default()
795                },
796                ..Default::default()
797            },
798            IngestionConfig::default(),
799            None,
800            &registry,
801        )
802        .await
803        .unwrap();
804
805        assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
806
807        indexer
808            .concurrent_pipeline::<A>(A, concurrent_config)
809            .await
810            .unwrap();
811
812        indexer
813    }
814
815    fn pruner_config(retention: u64) -> ConcurrentConfig {
816        ConcurrentConfig {
817            pruner: Some(concurrent::PrunerConfig {
818                retention,
819                ..Default::default()
820            }),
821            ..Default::default()
822        }
823    }
824
825    #[test]
826    fn test_arg_parsing() {
827        #[derive(Parser)]
828        struct Args {
829            #[clap(flatten)]
830            indexer: IndexerArgs,
831        }
832
833        let args = Args::try_parse_from([
834            "cmd",
835            "--first-checkpoint",
836            "10",
837            "--last-checkpoint",
838            "100",
839            "--pipeline",
840            "a",
841            "--pipeline",
842            "b",
843            "--task",
844            "t",
845            "--reader-interval-ms",
846            "5000",
847        ])
848        .unwrap();
849
850        assert_eq!(args.indexer.first_checkpoint, Some(10));
851        assert_eq!(args.indexer.last_checkpoint, Some(100));
852        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
853        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
854        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
855    }
856
857    /// next_checkpoint is smallest among existing watermarks + 1.
858    #[tokio::test]
859    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
860        let registry = Registry::new();
861        let store = MockStore::default();
862
863        test_pipeline!(A, "concurrent_a");
864        test_pipeline!(B, "concurrent_b");
865        test_pipeline!(C, "sequential_c");
866        test_pipeline!(D, "sequential_d");
867
868        let mut conn = store.connect().await.unwrap();
869
870        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
871        set_committer_watermark(&mut conn, A::NAME, 100).await;
872
873        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
874        set_committer_watermark(&mut conn, B::NAME, 10).await;
875
876        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
877        set_committer_watermark(&mut conn, C::NAME, 1).await;
878
879        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
880        set_committer_watermark(&mut conn, D::NAME, 50).await;
881
882        let (mut indexer, _temp_dir) =
883            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
884
885        add_concurrent(&mut indexer, A).await;
886        add_concurrent(&mut indexer, B).await;
887        add_sequential(&mut indexer, C).await;
888        add_sequential(&mut indexer, D).await;
889
890        assert_eq!(indexer.first_checkpoint, None);
891        assert_eq!(indexer.last_checkpoint, None);
892        assert_eq!(indexer.latest_checkpoint, 0);
893        assert_eq!(indexer.next_checkpoint, 2);
894        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
895    }
896
897    /// next_checkpoint is 0 when at least one pipeline has no watermark.
898    #[tokio::test]
899    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
900        let registry = Registry::new();
901        let store = MockStore::default();
902
903        test_pipeline!(A, "concurrent_a");
904        test_pipeline!(B, "concurrent_b");
905        test_pipeline!(C, "sequential_c");
906        test_pipeline!(D, "sequential_d");
907
908        let mut conn = store.connect().await.unwrap();
909        set_committer_watermark(&mut conn, B::NAME, 10).await;
910        set_committer_watermark(&mut conn, C::NAME, 1).await;
911
912        let (mut indexer, _temp_dir) =
913            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
914
915        add_concurrent(&mut indexer, A).await;
916        add_concurrent(&mut indexer, B).await;
917        add_sequential(&mut indexer, C).await;
918        add_sequential(&mut indexer, D).await;
919
920        assert_eq!(indexer.first_checkpoint, None);
921        assert_eq!(indexer.last_checkpoint, None);
922        assert_eq!(indexer.latest_checkpoint, 0);
923        assert_eq!(indexer.next_checkpoint, 0);
924        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
925    }
926
927    /// next_checkpoint is 1 when smallest committer watermark is 0.
928    #[tokio::test]
929    async fn test_next_checkpoint_smallest_is_0() {
930        let registry = Registry::new();
931        let store = MockStore::default();
932
933        test_pipeline!(A, "concurrent_a");
934        test_pipeline!(B, "concurrent_b");
935        test_pipeline!(C, "sequential_c");
936        test_pipeline!(D, "sequential_d");
937
938        let mut conn = store.connect().await.unwrap();
939        set_committer_watermark(&mut conn, A::NAME, 100).await;
940        set_committer_watermark(&mut conn, B::NAME, 10).await;
941        set_committer_watermark(&mut conn, C::NAME, 1).await;
942        set_committer_watermark(&mut conn, D::NAME, 0).await;
943
944        let (mut indexer, _temp_dir) =
945            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
946
947        add_concurrent(&mut indexer, A).await;
948        add_concurrent(&mut indexer, B).await;
949        add_sequential(&mut indexer, C).await;
950        add_sequential(&mut indexer, D).await;
951
952        assert_eq!(indexer.next_checkpoint, 1);
953    }
954
955    /// next_checkpoint is first_checkpoint when at least one pipeline has no
956    /// watermark, and first_checkpoint is smallest.
957    #[tokio::test]
958    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
959        let registry = Registry::new();
960        let store = MockStore::default();
961
962        test_pipeline!(A, "concurrent_a");
963        test_pipeline!(B, "concurrent_b");
964        test_pipeline!(C, "sequential_c");
965        test_pipeline!(D, "sequential_d");
966
967        let mut conn = store.connect().await.unwrap();
968        set_committer_watermark(&mut conn, B::NAME, 50).await;
969        set_committer_watermark(&mut conn, C::NAME, 10).await;
970
971        let indexer_args = IndexerArgs {
972            first_checkpoint: Some(5),
973            ..Default::default()
974        };
975        let (mut indexer, _temp_dir) =
976            create_test_indexer(store, indexer_args, &registry, None).await;
977
978        add_concurrent(&mut indexer, A).await;
979        add_concurrent(&mut indexer, B).await;
980        add_sequential(&mut indexer, C).await;
981        add_sequential(&mut indexer, D).await;
982
983        assert_eq!(indexer.first_checkpoint, Some(5));
984        assert_eq!(indexer.last_checkpoint, None);
985        assert_eq!(indexer.latest_checkpoint, 0);
986        assert_eq!(indexer.next_checkpoint, 5);
987        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
988    }
989
990    /// next_checkpoint is smallest among existing watermarks + 1 if
991    /// all pipelines have watermarks (ignores first_checkpoint).
992    #[tokio::test]
993    async fn test_next_checkpoint_ignore_first_checkpoint() {
994        let registry = Registry::new();
995        let store = MockStore::default();
996
997        test_pipeline!(B, "concurrent_b");
998        test_pipeline!(C, "sequential_c");
999
1000        let mut conn = store.connect().await.unwrap();
1001        set_committer_watermark(&mut conn, B::NAME, 50).await;
1002        set_committer_watermark(&mut conn, C::NAME, 10).await;
1003
1004        let indexer_args = IndexerArgs {
1005            first_checkpoint: Some(5),
1006            ..Default::default()
1007        };
1008        let (mut indexer, _temp_dir) =
1009            create_test_indexer(store, indexer_args, &registry, None).await;
1010
1011        add_concurrent(&mut indexer, B).await;
1012        add_sequential(&mut indexer, C).await;
1013
1014        assert_eq!(indexer.first_checkpoint, Some(5));
1015        assert_eq!(indexer.last_checkpoint, None);
1016        assert_eq!(indexer.latest_checkpoint, 0);
1017        assert_eq!(indexer.next_checkpoint, 11);
1018        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1019    }
1020
1021    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1022    /// will not be used as the starting point if it is not the smallest valid committer watermark
1023    /// to resume ingesting from.
1024    #[tokio::test]
1025    async fn test_next_checkpoint_large_first_checkpoint() {
1026        let registry = Registry::new();
1027        let store = MockStore::default();
1028
1029        test_pipeline!(A, "concurrent_a");
1030        test_pipeline!(B, "concurrent_b");
1031        test_pipeline!(C, "sequential_c");
1032
1033        let mut conn = store.connect().await.unwrap();
1034        set_committer_watermark(&mut conn, B::NAME, 50).await;
1035        set_committer_watermark(&mut conn, C::NAME, 10).await;
1036
1037        let indexer_args = IndexerArgs {
1038            first_checkpoint: Some(24),
1039            ..Default::default()
1040        };
1041        let (mut indexer, _temp_dir) =
1042            create_test_indexer(store, indexer_args, &registry, None).await;
1043
1044        add_concurrent(&mut indexer, A).await;
1045        add_concurrent(&mut indexer, B).await;
1046        add_sequential(&mut indexer, C).await;
1047
1048        assert_eq!(indexer.first_checkpoint, Some(24));
1049        assert_eq!(indexer.last_checkpoint, None);
1050        assert_eq!(indexer.latest_checkpoint, 0);
1051        assert_eq!(indexer.next_checkpoint, 11);
1052        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1053    }
1054
1055    /// latest_checkpoint is read from the watermark file.
1056    #[tokio::test]
1057    async fn test_latest_checkpoint_from_watermark() {
1058        let registry = Registry::new();
1059        let store = MockStore::default();
1060        let temp_dir = init_ingestion_dir(Some(30));
1061        let indexer = Indexer::new(
1062            store,
1063            IndexerArgs::default(),
1064            ClientArgs {
1065                ingestion: IngestionClientArgs {
1066                    local_ingestion_path: Some(temp_dir.path().to_owned()),
1067                    ..Default::default()
1068                },
1069                ..Default::default()
1070            },
1071            IngestionConfig::default(),
1072            None,
1073            &registry,
1074        )
1075        .await
1076        .unwrap();
1077
1078        assert_eq!(indexer.latest_checkpoint, 30);
1079    }
1080
1081    /// No watermark, no first_checkpoint, pruner with retention:
1082    /// next_checkpoint = LATEST_CHECKPOINT - retention.
1083    #[tokio::test]
1084    async fn test_next_checkpoint_with_pruner_uses_retention() {
1085        let retention = LATEST_CHECKPOINT - 1;
1086        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1087        assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1088    }
1089
1090    /// No watermark, no first_checkpoint, no pruner: falls back to 0.
1091    #[tokio::test]
1092    async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1093        let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1094        assert_eq!(indexer.next_checkpoint, 0);
1095    }
1096
1097    /// Watermark at 5 takes priority over latest_checkpoint - retention.
1098    #[tokio::test]
1099    async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1100        let retention = LATEST_CHECKPOINT - 1;
1101        let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1102        assert_eq!(indexer.next_checkpoint, 6);
1103    }
1104
1105    /// first_checkpoint takes priority over latest_checkpoint - retention when
1106    /// there's no watermark.
1107    #[tokio::test]
1108    async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1109        let retention = LATEST_CHECKPOINT - 1;
1110        let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1111        assert_eq!(indexer.next_checkpoint, 2);
1112    }
1113
1114    /// When retention exceeds latest_checkpoint, saturating_sub clamps to 0.
1115    #[tokio::test]
1116    async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1117        let retention = LATEST_CHECKPOINT + 1;
1118        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1119        assert_eq!(indexer.next_checkpoint, 0);
1120    }
1121
1122    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1123    #[tokio::test]
1124    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1125        let registry = Registry::new();
1126        let store = MockStore::default();
1127
1128        test_pipeline!(A, "concurrent_a");
1129        test_pipeline!(B, "concurrent_b");
1130        test_pipeline!(C, "sequential_c");
1131        test_pipeline!(D, "sequential_d");
1132
1133        let mut conn = store.connect().await.unwrap();
1134        set_committer_watermark(&mut conn, A::NAME, 5).await;
1135        set_committer_watermark(&mut conn, B::NAME, 10).await;
1136        set_committer_watermark(&mut conn, C::NAME, 15).await;
1137        set_committer_watermark(&mut conn, D::NAME, 20).await;
1138
1139        let indexer_args = IndexerArgs {
1140            last_checkpoint: Some(29),
1141            ..Default::default()
1142        };
1143        let (mut indexer, _temp_dir) =
1144            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1145
1146        add_concurrent(&mut indexer, A).await;
1147        add_concurrent(&mut indexer, B).await;
1148        add_sequential(&mut indexer, C).await;
1149        add_sequential(&mut indexer, D).await;
1150
1151        let ingestion_metrics = indexer.ingestion_metrics().clone();
1152        let indexer_metrics = indexer.indexer_metrics().clone();
1153
1154        indexer.run().await.unwrap().join().await.unwrap();
1155
1156        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1157        assert_out_of_order!(indexer_metrics, A::NAME, 0);
1158        assert_out_of_order!(indexer_metrics, B::NAME, 5);
1159        assert_out_of_order!(indexer_metrics, C::NAME, 10);
1160        assert_out_of_order!(indexer_metrics, D::NAME, 15);
1161    }
1162
1163    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1164    #[tokio::test]
1165    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1166        let registry = Registry::new();
1167        let store = MockStore::default();
1168
1169        test_pipeline!(A, "concurrent_a");
1170        test_pipeline!(B, "concurrent_b");
1171        test_pipeline!(C, "sequential_c");
1172        test_pipeline!(D, "sequential_d");
1173
1174        let mut conn = store.connect().await.unwrap();
1175        set_committer_watermark(&mut conn, A::NAME, 5).await;
1176        set_committer_watermark(&mut conn, B::NAME, 10).await;
1177        set_committer_watermark(&mut conn, C::NAME, 15).await;
1178        set_committer_watermark(&mut conn, D::NAME, 20).await;
1179
1180        let indexer_args = IndexerArgs {
1181            first_checkpoint: Some(3),
1182            last_checkpoint: Some(29),
1183            ..Default::default()
1184        };
1185        let (mut indexer, _temp_dir) =
1186            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1187
1188        add_concurrent(&mut indexer, A).await;
1189        add_concurrent(&mut indexer, B).await;
1190        add_sequential(&mut indexer, C).await;
1191        add_sequential(&mut indexer, D).await;
1192
1193        let ingestion_metrics = indexer.ingestion_metrics().clone();
1194        let metrics = indexer.indexer_metrics().clone();
1195        indexer.run().await.unwrap().join().await.unwrap();
1196
1197        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1198        assert_out_of_order!(metrics, A::NAME, 0);
1199        assert_out_of_order!(metrics, B::NAME, 5);
1200        assert_out_of_order!(metrics, C::NAME, 10);
1201        assert_out_of_order!(metrics, D::NAME, 15);
1202    }
1203
1204    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1205    #[tokio::test]
1206    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1207        let registry = Registry::new();
1208        let store = MockStore::default();
1209
1210        test_pipeline!(A, "concurrent_a");
1211        test_pipeline!(B, "concurrent_b");
1212        test_pipeline!(C, "sequential_c");
1213        test_pipeline!(D, "sequential_d");
1214
1215        let mut conn = store.connect().await.unwrap();
1216        set_committer_watermark(&mut conn, B::NAME, 10).await;
1217        set_committer_watermark(&mut conn, C::NAME, 15).await;
1218        set_committer_watermark(&mut conn, D::NAME, 20).await;
1219
1220        let indexer_args = IndexerArgs {
1221            last_checkpoint: Some(29),
1222            ..Default::default()
1223        };
1224        let (mut indexer, _temp_dir) =
1225            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1226
1227        add_concurrent(&mut indexer, A).await;
1228        add_concurrent(&mut indexer, B).await;
1229        add_sequential(&mut indexer, C).await;
1230        add_sequential(&mut indexer, D).await;
1231
1232        let ingestion_metrics = indexer.ingestion_metrics().clone();
1233        let metrics = indexer.indexer_metrics().clone();
1234        indexer.run().await.unwrap().join().await.unwrap();
1235
1236        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1237        assert_out_of_order!(metrics, A::NAME, 0);
1238        assert_out_of_order!(metrics, B::NAME, 11);
1239        assert_out_of_order!(metrics, C::NAME, 16);
1240        assert_out_of_order!(metrics, D::NAME, 21);
1241    }
1242
1243    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1244    #[tokio::test]
1245    async fn test_indexer_ingestion_use_first_checkpoint() {
1246        let registry = Registry::new();
1247        let store = MockStore::default();
1248
1249        test_pipeline!(A, "concurrent_a");
1250        test_pipeline!(B, "concurrent_b");
1251        test_pipeline!(C, "sequential_c");
1252        test_pipeline!(D, "sequential_d");
1253
1254        let mut conn = store.connect().await.unwrap();
1255        set_committer_watermark(&mut conn, B::NAME, 10).await;
1256        set_committer_watermark(&mut conn, C::NAME, 15).await;
1257        set_committer_watermark(&mut conn, D::NAME, 20).await;
1258
1259        let indexer_args = IndexerArgs {
1260            first_checkpoint: Some(10),
1261            last_checkpoint: Some(29),
1262            ..Default::default()
1263        };
1264        let (mut indexer, _temp_dir) =
1265            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1266
1267        add_concurrent(&mut indexer, A).await;
1268        add_concurrent(&mut indexer, B).await;
1269        add_sequential(&mut indexer, C).await;
1270        add_sequential(&mut indexer, D).await;
1271
1272        let ingestion_metrics = indexer.ingestion_metrics().clone();
1273        let metrics = indexer.indexer_metrics().clone();
1274        indexer.run().await.unwrap().join().await.unwrap();
1275
1276        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1277        assert_out_of_order!(metrics, A::NAME, 0);
1278        assert_out_of_order!(metrics, B::NAME, 1);
1279        assert_out_of_order!(metrics, C::NAME, 6);
1280        assert_out_of_order!(metrics, D::NAME, 11);
1281    }
1282
1283    #[tokio::test]
1284    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1285        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1286        assert_eq!(committer_watermark, None);
1287        assert_eq!(pruner_watermark, None);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1292        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1293        assert_eq!(committer_watermark, None);
1294        assert_eq!(pruner_watermark, None);
1295    }
1296
1297    #[tokio::test]
1298    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1299        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1300
1301        let committer_watermark = committer_watermark.unwrap();
1302        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1303
1304        let pruner_watermark = pruner_watermark.unwrap();
1305        assert_eq!(pruner_watermark.reader_lo, 1);
1306        assert_eq!(pruner_watermark.pruner_hi, 1);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_init_watermark_sequential() {
1311        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1312
1313        let committer_watermark = committer_watermark.unwrap();
1314        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1315
1316        let pruner_watermark = pruner_watermark.unwrap();
1317        assert_eq!(pruner_watermark.reader_lo, 1);
1318        assert_eq!(pruner_watermark.pruner_hi, 1);
1319    }
1320
1321    #[tokio::test]
1322    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1323        let registry = Registry::new();
1324        let store = MockStore::default();
1325
1326        let mut conn = store.connect().await.unwrap();
1327        set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1328        set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1329
1330        let indexer_args = IndexerArgs {
1331            first_checkpoint: None,
1332            last_checkpoint: Some(19),
1333            pipeline: vec![],
1334            ..Default::default()
1335        };
1336        let (mut indexer, _temp_dir) =
1337            create_test_indexer(store.clone(), indexer_args, &registry, Some((20, 2))).await;
1338
1339        // Add first sequential pipeline
1340        add_sequential(&mut indexer, MockHandler).await;
1341
1342        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1343        assert_eq!(
1344            indexer.next_sequential_checkpoint(),
1345            Some(11),
1346            "next_sequential_checkpoint should be 11"
1347        );
1348
1349        // Add second sequential pipeline
1350        add_sequential(&mut indexer, SequentialHandler).await;
1351
1352        // Should change to 6 (minimum of 6 and 11)
1353        assert_eq!(
1354            indexer.next_sequential_checkpoint(),
1355            Some(6),
1356            "next_sequential_checkpoint should still be 6"
1357        );
1358
1359        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1360        indexer.run().await.unwrap().join().await.unwrap();
1361
1362        // Verify each pipeline made some progress independently
1363        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1364        let watermark2 = conn
1365            .committer_watermark(SequentialHandler::NAME)
1366            .await
1367            .unwrap();
1368
1369        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1370        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1371    }
1372
1373    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1374    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1375    /// committing checkpoints less than the main pipeline's reader watermark.
1376    #[tokio::test]
1377    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1378        let registry = Registry::new();
1379        let store = MockStore::default();
1380
1381        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1382        // reader watermark at `7`.
1383        let mut conn = store.connect().await.unwrap();
1384        set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1385        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1386            .await
1387            .unwrap();
1388
1389        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1390        // be ignored by the tasked indexer.
1391        let indexer_args = IndexerArgs {
1392            first_checkpoint: Some(0),
1393            last_checkpoint: Some(15),
1394            task: TaskArgs::tasked("task".to_string(), 10),
1395            ..Default::default()
1396        };
1397        let (mut tasked_indexer, _temp_dir) =
1398            create_test_indexer(store.clone(), indexer_args, &registry, Some((16, 2))).await;
1399
1400        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1401
1402        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1403        let metrics = tasked_indexer.indexer_metrics().clone();
1404
1405        tasked_indexer.run().await.unwrap().join().await.unwrap();
1406
1407        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1408        assert_eq!(
1409            metrics
1410                .total_collector_skipped_checkpoints
1411                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1412                .unwrap()
1413                .get(),
1414            7
1415        );
1416        let data = store
1417            .data
1418            .get(MockCheckpointSequenceNumberHandler::NAME)
1419            .unwrap();
1420        assert_eq!(data.len(), 9);
1421        for i in 0..7 {
1422            assert!(data.get(&i).is_none());
1423        }
1424        for i in 7..16 {
1425            assert!(data.get(&i).is_some());
1426        }
1427    }
1428
1429    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1430    #[tokio::test]
1431    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1432        let registry = Registry::new();
1433        let store = MockStore::default();
1434
1435        let mut conn = store.connect().await.unwrap();
1436        set_committer_watermark(&mut conn, "test", 10).await;
1437        conn.set_reader_watermark("test", 5).await.unwrap();
1438
1439        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1440        // watermarks.
1441        let indexer_args = IndexerArgs {
1442            first_checkpoint: Some(9),
1443            last_checkpoint: Some(25),
1444            task: TaskArgs::tasked("task".to_string(), 10),
1445            ..Default::default()
1446        };
1447        let (mut tasked_indexer, _temp_dir) =
1448            create_test_indexer(store.clone(), indexer_args, &registry, Some((26, 2))).await;
1449
1450        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1451
1452        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1453        let metrics = tasked_indexer.indexer_metrics().clone();
1454
1455        tasked_indexer.run().await.unwrap().join().await.unwrap();
1456
1457        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1458        assert_out_of_order!(metrics, "test", 0);
1459        assert_eq!(
1460            metrics
1461                .total_collector_skipped_checkpoints
1462                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1463                .unwrap()
1464                .get(),
1465            0
1466        );
1467
1468        let data = store.data.get("test").unwrap();
1469        assert_eq!(data.len(), 17);
1470        for i in 0..9 {
1471            assert!(data.get(&i).is_none());
1472        }
1473        for i in 9..26 {
1474            assert!(data.get(&i).is_some());
1475        }
1476        let main_pipeline_watermark = store.watermark("test").unwrap();
1477        // assert that the main pipeline's watermarks are not updated
1478        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1479        assert_eq!(main_pipeline_watermark.reader_lo, 5);
1480        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1481        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1482        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1483    }
1484
1485    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
1486    /// committed, and any checkpoints inflight < X will be skipped.
1487    #[tokio::test]
1488    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1489        let registry = Registry::new();
1490        let store = MockStore::default();
1491        let mut conn = store.connect().await.unwrap();
1492        // Set the main pipeline watermark.
1493        set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1494
1495        // Generate 500 checkpoints upfront, for the indexer to process all at once.
1496        let indexer_args = IndexerArgs {
1497            first_checkpoint: Some(0),
1498            last_checkpoint: Some(500),
1499            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
1500            ..Default::default()
1501        };
1502        let (mut tasked_indexer, _temp_dir) =
1503            create_test_indexer(store.clone(), indexer_args, &registry, Some((501, 2))).await;
1504        let mut allow_process = 10;
1505        // Limit the pipeline to process only checkpoints `[0, 10]`.
1506        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1507        let _ = tasked_indexer
1508            .concurrent_pipeline(
1509                controllable_handler,
1510                ConcurrentConfig {
1511                    committer: CommitterConfig {
1512                        collect_interval_ms: 10,
1513                        watermark_interval_ms: 10,
1514                        ..Default::default()
1515                    },
1516                    // High fixed concurrency so all checkpoints can be processed
1517                    // concurrently despite out-of-order arrival.
1518                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1519                    ..Default::default()
1520                },
1521            )
1522            .await;
1523        let metrics = tasked_indexer.indexer_metrics().clone();
1524
1525        let mut s_indexer = tasked_indexer.run().await.unwrap();
1526
1527        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
1528        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
1529        // committed.
1530        store
1531            .wait_for_watermark(
1532                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1533                10,
1534                Duration::from_secs(10),
1535            )
1536            .await;
1537
1538        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
1539        // track_main_reader_lo task will eventually pick this up and update the atomic. The
1540        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
1541        // one at a time until the collector_reader_lo metric shows the new value.
1542        conn.set_reader_watermark(ControllableHandler::NAME, 250)
1543            .await
1544            .unwrap();
1545
1546        let reader_lo = metrics
1547            .collector_reader_lo
1548            .with_label_values(&[ControllableHandler::NAME]);
1549
1550        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
1551        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
1552        // checkpoints have been processed.
1553        let mut interval = tokio::time::interval(Duration::from_millis(10));
1554        while reader_lo.get() != 250 {
1555            interval.tick().await;
1556            // allow_process is initialized to 11, bump to 11 for the next checkpoint
1557            allow_process += 1;
1558            assert!(
1559                allow_process <= 500,
1560                "Released all checkpoints but collector never observed new reader_lo"
1561            );
1562            process_below.send(allow_process).ok();
1563        }
1564
1565        // At this point, the collector has observed reader_lo = 250. Release all remaining
1566        // checkpoints. Guarantees:
1567        // - [0, 10]: committed (before reader_lo was set)
1568        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
1569        // - (allow_process, 250): skipped (in-flight, filtered by collector)
1570        // - [250, 500]: committed (>= reader_lo)
1571        process_below.send(500).ok();
1572
1573        s_indexer.join().await.unwrap();
1574
1575        let data = store.data.get(ControllableHandler::NAME).unwrap();
1576
1577        // Checkpoints (allow_process, 250) must be skipped.
1578        for chkpt in (allow_process + 1)..250 {
1579            assert!(
1580                data.get(&chkpt).is_none(),
1581                "Checkpoint {chkpt} should have been skipped"
1582            );
1583        }
1584
1585        // Checkpoints >= reader_lo must be committed.
1586        for chkpt in 250..=500 {
1587            assert!(
1588                data.get(&chkpt).is_some(),
1589                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1590            );
1591        }
1592
1593        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
1594        for chkpt in 0..=10 {
1595            assert!(
1596                data.get(&chkpt).is_some(),
1597                "Checkpoint {chkpt} should have been committed (baseline)"
1598            );
1599        }
1600    }
1601}