sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51/// Command-line arguments for the indexer
52#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54    /// Override the next checkpoint for all pipelines without a committer watermark to start
55    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
56    /// setting and always resume from their committer watermark + 1.
57    ///
58    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
59    /// is the minimum across all pipelines' next checkpoints.
60    #[arg(long)]
61    pub first_checkpoint: Option<u64>,
62
63    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
64    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
65    #[arg(long)]
66    pub last_checkpoint: Option<u64>,
67
68    /// Only run the following pipelines. If not provided, all pipelines found in the
69    /// configuration file will be run.
70    #[arg(long, action = clap::ArgAction::Append)]
71    pub pipeline: Vec<String>,
72
73    /// Additional configurations for running a tasked indexer.
74    #[clap(flatten)]
75    pub task: TaskArgs,
76}
77
78/// Command-line arguments for configuring a tasked indexer.
79#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
82    /// delimiter defined on the store. This allows the same pipelines to run under multiple
83    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
84    /// entries in the database.
85    ///
86    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
87    ///
88    /// Sequential pipelines cannot be attached to a tasked indexer.
89    ///
90    /// The framework ensures that tasked pipelines never commit checkpoints below the main
91    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
92    #[arg(long, requires = "reader_interval_ms")]
93    task: Option<String>,
94
95    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
96    /// refetch its main pipeline's reader watermark.
97    ///
98    /// This is required when `--task` is set and should should ideally be set to a value that is
99    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
100    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
101    /// it.
102    ///
103    /// If the main pipeline does not have pruning enabled, this value can be set to some high
104    /// value, as the tasked pipeline will never see an updated reader watermark.
105    #[arg(long, requires = "task")]
106    reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110    /// The storage backend that the indexer uses to write and query indexed data. This
111    /// generic implementation allows for plugging in different storage solutions that implement the
112    /// `Store` trait.
113    store: S,
114
115    /// Prometheus Metrics.
116    metrics: Arc<IndexerMetrics>,
117
118    /// Service for downloading and disseminating checkpoint data.
119    ingestion_service: IngestionService,
120
121    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
122    /// watermark will start processing at this checkpoint.
123    first_checkpoint: Option<u64>,
124
125    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
126    /// this checkpoint.
127    last_checkpoint: Option<u64>,
128
129    /// The network's latest checkpoint, when the indexer was started.
130    latest_checkpoint: u64,
131
132    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
133    /// to start ingesting from.
134    next_checkpoint: u64,
135
136    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
137    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
138    next_sequential_checkpoint: Option<u64>,
139
140    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
141    /// delimiter defined on the store. This allows the same pipelines to run under multiple
142    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
143    /// entries in the database.
144    ///
145    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
146    ///
147    /// Sequential pipelines cannot be attached to a tasked indexer.
148    ///
149    /// The framework ensures that tasked pipelines never commit checkpoints below the main
150    /// pipeline’s pruner watermark.
151    task: Option<Task>,
152
153    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
154    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
155    /// a warning when the indexer is run.
156    enabled_pipelines: Option<BTreeSet<String>>,
157
158    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
159    /// with the same name isn't added twice.
160    added_pipelines: BTreeSet<&'static str>,
161
162    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
163    pipelines: Vec<Service>,
164}
165
166/// Configuration for a tasked indexer.
167#[derive(Clone)]
168pub(crate) struct Task {
169    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
170    /// record pipeline watermarks.
171    task: String,
172    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
173    /// pipeline's reader watermark.
174    reader_interval: Duration,
175}
176
177impl TaskArgs {
178    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179        Self {
180            task: Some(task),
181            reader_interval_ms: Some(reader_interval_ms),
182        }
183    }
184
185    fn into_task(self) -> Option<Task> {
186        Some(Task {
187            task: self.task?,
188            reader_interval: Duration::from_millis(self.reader_interval_ms?),
189        })
190    }
191}
192
193impl<S: Store> Indexer<S> {
194    /// Create a new instance of the indexer framework from a store that implements the `Store`
195    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
196    /// arguments configure the following:
197    ///
198    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
199    ///   watermarks) and where to serve metrics from,
200    /// - Where to download checkpoints from,
201    /// - Concurrency and buffering parameters for downloading checkpoints.
202    ///
203    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
204    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
205    pub async fn new(
206        store: S,
207        indexer_args: IndexerArgs,
208        client_args: ClientArgs,
209        ingestion_config: IngestionConfig,
210        metrics_prefix: Option<&str>,
211        registry: &Registry,
212    ) -> anyhow::Result<Self> {
213        let IndexerArgs {
214            first_checkpoint,
215            last_checkpoint,
216            pipeline,
217            task,
218        } = indexer_args;
219
220        let metrics = IndexerMetrics::new(metrics_prefix, registry);
221
222        let ingestion_service =
223            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
224
225        let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
226
227        info!(latest_checkpoint);
228
229        Ok(Self {
230            store,
231            metrics,
232            ingestion_service,
233            first_checkpoint,
234            last_checkpoint,
235            latest_checkpoint,
236            next_checkpoint: u64::MAX,
237            next_sequential_checkpoint: None,
238            task: task.into_task(),
239            enabled_pipelines: if pipeline.is_empty() {
240                None
241            } else {
242                Some(pipeline.into_iter().collect())
243            },
244            added_pipelines: BTreeSet::new(),
245            pipelines: vec![],
246        })
247    }
248
249    /// The store used by the indexer.
250    pub fn store(&self) -> &S {
251        &self.store
252    }
253
254    /// The ingestion client used by the indexer to fetch checkpoints.
255    pub fn ingestion_client(&self) -> &IngestionClient {
256        self.ingestion_service.ingestion_client()
257    }
258
259    /// The indexer's metrics.
260    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
261        &self.metrics
262    }
263
264    /// The ingestion service's metrics.
265    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
266        self.ingestion_service.metrics()
267    }
268
269    /// The pipelines that this indexer will run.
270    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
271        self.added_pipelines.iter().copied().filter(|p| {
272            self.enabled_pipelines
273                .as_ref()
274                .is_none_or(|e| e.contains(*p))
275        })
276    }
277
278    /// The minimum next checkpoint across all sequential pipelines. This value is used to
279    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
280    /// too far ahead of sequential pipelines.
281    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
282        self.next_sequential_checkpoint
283    }
284
285    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
286    /// will start processing and committing once the ingestion service has caught up to their
287    /// respective watermarks.
288    ///
289    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
290    pub async fn run(self) -> anyhow::Result<Service> {
291        if let Some(enabled_pipelines) = self.enabled_pipelines {
292            ensure!(
293                enabled_pipelines.is_empty(),
294                "Tried to enable pipelines that this indexer does not know about: \
295                {enabled_pipelines:#?}",
296            );
297        }
298
299        let start = self.next_checkpoint;
300        let end = self.last_checkpoint;
301        info!(start, end, "Ingestion range");
302
303        let mut service = self
304            .ingestion_service
305            .run((
306                Bound::Included(start),
307                end.map_or(Bound::Unbounded, Bound::Included),
308            ))
309            .await
310            .context("Failed to start ingestion service")?;
311
312        for pipeline in self.pipelines {
313            service = service.merge(pipeline);
314        }
315
316        Ok(service)
317    }
318
319    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
320    /// checkpoint after its watermark, or if that doesn't exist, then the provided
321    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
322    ///
323    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
324    /// calculated above.
325    ///
326    /// Returns `Ok(None)` if the pipeline is disabled.
327    async fn add_pipeline<P: Processor + 'static>(
328        &mut self,
329        pipeline_task: String,
330        retention: Option<u64>,
331    ) -> anyhow::Result<Option<u64>> {
332        ensure!(
333            self.added_pipelines.insert(P::NAME),
334            "Pipeline {:?} already added",
335            P::NAME,
336        );
337
338        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
339            && !enabled_pipelines.remove(P::NAME)
340        {
341            info!(pipeline = P::NAME, "Skipping");
342            return Ok(None);
343        }
344
345        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
346        // Otherwise, use the existing record and disregard the proposed value.
347        let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
348            first_checkpoint
349        } else if let Some(retention) = retention {
350            self.latest_checkpoint.saturating_sub(retention)
351        } else {
352            0
353        };
354        let mut conn = self.store.connect().await?;
355        let init_watermark = conn
356            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
357            .await
358            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
359
360        let next_checkpoint = if let Some(init_watermark) = init_watermark {
361            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
362                checkpoint_hi_inclusive + 1
363            } else {
364                0
365            }
366        } else {
367            proposed_next_checkpoint
368        };
369
370        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
371
372        Ok(Some(next_checkpoint))
373    }
374}
375
376impl<S: ConcurrentStore> Indexer<S> {
377    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
378    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
379    ///
380    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
381    /// keep the watermark table up-to-date with the highest point they can guarantee all data
382    /// exists for, for their pipeline.
383    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
384        &mut self,
385        handler: H,
386        config: ConcurrentConfig,
387    ) -> anyhow::Result<()> {
388        let pipeline_task =
389            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
390        let retention = config.pruner.as_ref().map(|p| p.retention);
391        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
392            return Ok(());
393        };
394
395        let checkpoint_rx = self
396            .ingestion_service
397            .subscribe_bounded(config.ingestion.subscriber_channel_size());
398
399        self.pipelines.push(concurrent::pipeline::<H>(
400            handler,
401            next_checkpoint,
402            config,
403            self.store.clone(),
404            self.task.clone(),
405            checkpoint_rx,
406            self.metrics.clone(),
407        ));
408
409        Ok(())
410    }
411}
412
413impl<T: SequentialStore> Indexer<T> {
414    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
415    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
416    ///
417    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
418    /// required to handle pipelines that modify data in-place (where each update is not an insert,
419    /// but could be a modification of an existing row, where ordering between updates is
420    /// important).
421    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
422        &mut self,
423        handler: H,
424        config: SequentialConfig,
425    ) -> anyhow::Result<()> {
426        if self.task.is_some() {
427            bail!(
428                "Sequential pipelines do not support pipeline tasks. \
429                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
430                Running the same pipeline under a different task would violate these guarantees."
431            );
432        }
433
434        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
435            return Ok(());
436        };
437
438        // Track the minimum next_checkpoint across all sequential pipelines
439        self.next_sequential_checkpoint = Some(
440            self.next_sequential_checkpoint
441                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
442        );
443
444        let checkpoint_rx = self
445            .ingestion_service
446            .subscribe_bounded(config.ingestion.subscriber_channel_size());
447
448        self.pipelines.push(sequential::pipeline::<H>(
449            handler,
450            next_checkpoint,
451            config,
452            self.store.clone(),
453            checkpoint_rx,
454            self.metrics.clone(),
455        ));
456
457        Ok(())
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::sync::Arc;
464
465    use async_trait::async_trait;
466    use clap::Parser;
467    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
468    use sui_synthetic_ingestion::synthetic_ingestion;
469    use tokio::sync::watch;
470
471    use crate::FieldCount;
472    use crate::config::ConcurrencyConfig;
473    use crate::ingestion::ingestion_client::IngestionClientArgs;
474    use crate::ingestion::store_client::ObjectStoreWatermark;
475    use crate::ingestion::store_client::WATERMARK_PATH;
476    use crate::mocks::store::MockStore;
477    use crate::pipeline::CommitterConfig;
478    use crate::pipeline::Processor;
479    use crate::pipeline::concurrent::ConcurrentConfig;
480    use crate::store::CommitterWatermark;
481    use crate::store::ConcurrentConnection as _;
482    use crate::store::Connection as _;
483
484    use super::*;
485
486    #[allow(dead_code)]
487    #[derive(Clone, FieldCount)]
488    struct MockValue(u64);
489
490    /// A handler that can be controlled externally to block checkpoint processing.
491    struct ControllableHandler {
492        /// Process checkpoints less than or equal to this value.
493        process_below: watch::Receiver<u64>,
494    }
495
496    impl ControllableHandler {
497        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
498            let (tx, rx) = watch::channel(limit);
499            (Self { process_below: rx }, tx)
500        }
501    }
502
503    #[async_trait]
504    impl Processor for ControllableHandler {
505        const NAME: &'static str = "controllable";
506        type Value = MockValue;
507
508        async fn process(
509            &self,
510            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
511        ) -> anyhow::Result<Vec<Self::Value>> {
512            let cp_num = checkpoint.summary.sequence_number;
513
514            // Wait until the checkpoint is allowed to be processed
515            self.process_below
516                .clone()
517                .wait_for(|&limit| cp_num <= limit)
518                .await
519                .ok();
520
521            Ok(vec![MockValue(cp_num)])
522        }
523    }
524
525    #[async_trait]
526    impl concurrent::Handler for ControllableHandler {
527        type Store = MockStore;
528        type Batch = Vec<MockValue>;
529
530        fn batch(
531            &self,
532            batch: &mut Self::Batch,
533            values: &mut std::vec::IntoIter<Self::Value>,
534        ) -> concurrent::BatchStatus {
535            batch.extend(values);
536            concurrent::BatchStatus::Ready
537        }
538
539        async fn commit<'a>(
540            &self,
541            batch: &Self::Batch,
542            conn: &mut <Self::Store as Store>::Connection<'a>,
543        ) -> anyhow::Result<usize> {
544            for value in batch {
545                conn.0
546                    .commit_data(Self::NAME, value.0, vec![value.0])
547                    .await?;
548            }
549            Ok(batch.len())
550        }
551    }
552
553    macro_rules! test_pipeline {
554        ($handler:ident, $name:literal) => {
555            struct $handler;
556
557            #[async_trait]
558            impl Processor for $handler {
559                const NAME: &'static str = $name;
560                type Value = MockValue;
561                async fn process(
562                    &self,
563                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
564                ) -> anyhow::Result<Vec<Self::Value>> {
565                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
566                }
567            }
568
569            #[async_trait]
570            impl crate::pipeline::concurrent::Handler for $handler {
571                type Store = MockStore;
572                type Batch = Vec<Self::Value>;
573
574                fn batch(
575                    &self,
576                    batch: &mut Self::Batch,
577                    values: &mut std::vec::IntoIter<Self::Value>,
578                ) -> crate::pipeline::concurrent::BatchStatus {
579                    batch.extend(values);
580                    crate::pipeline::concurrent::BatchStatus::Pending
581                }
582
583                async fn commit<'a>(
584                    &self,
585                    batch: &Self::Batch,
586                    conn: &mut <Self::Store as Store>::Connection<'a>,
587                ) -> anyhow::Result<usize> {
588                    for value in batch {
589                        conn.0
590                            .commit_data(Self::NAME, value.0, vec![value.0])
591                            .await?;
592                    }
593                    Ok(batch.len())
594                }
595            }
596
597            #[async_trait]
598            impl crate::pipeline::sequential::Handler for $handler {
599                type Store = MockStore;
600                type Batch = Vec<Self::Value>;
601
602                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
603                    batch.extend(values);
604                }
605
606                async fn commit<'a>(
607                    &self,
608                    _batch: &Self::Batch,
609                    _conn: &mut <Self::Store as Store>::Connection<'a>,
610                ) -> anyhow::Result<usize> {
611                    Ok(1)
612                }
613            }
614        };
615    }
616
617    test_pipeline!(MockHandler, "test_processor");
618    test_pipeline!(SequentialHandler, "sequential_handler");
619    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
620
621    fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
622        let dir = tempfile::tempdir().unwrap();
623        if let Some(cp) = latest_checkpoint {
624            let watermark_path = dir.path().join(WATERMARK_PATH);
625            std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
626            let watermark = ObjectStoreWatermark {
627                checkpoint_hi_inclusive: cp,
628            };
629            std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
630        }
631        dir
632    }
633
634    /// If `ingestion_data` is `Some((num_checkpoints, checkpoint_size))`, synthetic ingestion
635    /// data will be generated in the temp directory before creating the indexer.
636    async fn create_test_indexer(
637        store: MockStore,
638        indexer_args: IndexerArgs,
639        registry: &Registry,
640        ingestion_data: Option<(u64, u64)>,
641    ) -> (Indexer<MockStore>, tempfile::TempDir) {
642        let temp_dir = init_ingestion_dir(None);
643        if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
644            synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
645                ingestion_dir: temp_dir.path().to_owned(),
646                starting_checkpoint: 0,
647                num_checkpoints,
648                checkpoint_size,
649            })
650            .await;
651        }
652        let client_args = ClientArgs {
653            ingestion: IngestionClientArgs {
654                local_ingestion_path: Some(temp_dir.path().to_owned()),
655                ..Default::default()
656            },
657            ..Default::default()
658        };
659        let indexer = Indexer::new(
660            store,
661            indexer_args,
662            client_args,
663            IngestionConfig::default(),
664            None,
665            registry,
666        )
667        .await
668        .unwrap();
669        (indexer, temp_dir)
670    }
671
672    async fn set_committer_watermark(
673        conn: &mut <MockStore as Store>::Connection<'_>,
674        name: &str,
675        hi: u64,
676    ) {
677        conn.set_committer_watermark(
678            name,
679            CommitterWatermark {
680                checkpoint_hi_inclusive: hi,
681                ..Default::default()
682            },
683        )
684        .await
685        .unwrap();
686    }
687
688    async fn add_concurrent<H: concurrent::Handler<Store = MockStore>>(
689        indexer: &mut Indexer<MockStore>,
690        handler: H,
691    ) {
692        indexer
693            .concurrent_pipeline(handler, ConcurrentConfig::default())
694            .await
695            .unwrap();
696    }
697
698    async fn add_sequential<H: sequential::Handler<Store = MockStore>>(
699        indexer: &mut Indexer<MockStore>,
700        handler: H,
701    ) {
702        indexer
703            .sequential_pipeline(handler, SequentialConfig::default())
704            .await
705            .unwrap();
706    }
707
708    macro_rules! assert_out_of_order {
709        ($metrics:expr, $pipeline:expr, $expected:expr) => {
710            assert_eq!(
711                $metrics
712                    .total_watermarks_out_of_order
713                    .get_metric_with_label_values(&[$pipeline])
714                    .unwrap()
715                    .get(),
716                $expected,
717            );
718        };
719    }
720
721    async fn test_init_watermark(
722        first_checkpoint: Option<u64>,
723        is_concurrent: bool,
724    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
725        let registry = Registry::new();
726        let store = MockStore::default();
727
728        test_pipeline!(A, "pipeline_name");
729
730        let mut conn = store.connect().await.unwrap();
731
732        let indexer_args = IndexerArgs {
733            first_checkpoint,
734            ..IndexerArgs::default()
735        };
736        let (mut indexer, _temp_dir) =
737            create_test_indexer(store.clone(), indexer_args, &registry, None).await;
738
739        if is_concurrent {
740            add_concurrent(&mut indexer, A).await;
741        } else {
742            add_sequential(&mut indexer, A).await;
743        }
744
745        (
746            conn.committer_watermark(A::NAME).await.unwrap(),
747            conn.pruner_watermark(A::NAME, Duration::ZERO)
748                .await
749                .unwrap(),
750        )
751    }
752
753    const LATEST_CHECKPOINT: u64 = 10;
754
755    /// Set up an indexer as if the network is at `latest_checkpoint` (the next checkpoint to
756    /// ingest). Runs a single concurrent pipeline with the given config. If `watermark` is
757    /// provided, the pipeline's high watermark is pre-set; `first_checkpoint` controls where
758    /// ingestion begins.
759    async fn test_next_checkpoint(
760        watermark: Option<u64>,
761        first_checkpoint: Option<u64>,
762        concurrent_config: ConcurrentConfig,
763    ) -> Indexer<MockStore> {
764        let registry = Registry::new();
765        let store = MockStore::default();
766
767        test_pipeline!(A, "concurrent_a");
768
769        if let Some(checkpoint_hi_inclusive) = watermark {
770            let mut conn = store.connect().await.unwrap();
771            conn.set_committer_watermark(
772                A::NAME,
773                CommitterWatermark {
774                    checkpoint_hi_inclusive,
775                    ..Default::default()
776                },
777            )
778            .await
779            .unwrap();
780        }
781
782        let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
783        let mut indexer = Indexer::new(
784            store,
785            IndexerArgs {
786                first_checkpoint,
787                ..Default::default()
788            },
789            ClientArgs {
790                ingestion: IngestionClientArgs {
791                    local_ingestion_path: Some(temp_dir.path().to_owned()),
792                    ..Default::default()
793                },
794                ..Default::default()
795            },
796            IngestionConfig::default(),
797            None,
798            &registry,
799        )
800        .await
801        .unwrap();
802
803        assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
804
805        indexer
806            .concurrent_pipeline::<A>(A, concurrent_config)
807            .await
808            .unwrap();
809
810        indexer
811    }
812
813    fn pruner_config(retention: u64) -> ConcurrentConfig {
814        ConcurrentConfig {
815            pruner: Some(concurrent::PrunerConfig {
816                retention,
817                ..Default::default()
818            }),
819            ..Default::default()
820        }
821    }
822
823    #[test]
824    fn test_arg_parsing() {
825        #[derive(Parser)]
826        struct Args {
827            #[clap(flatten)]
828            indexer: IndexerArgs,
829        }
830
831        let args = Args::try_parse_from([
832            "cmd",
833            "--first-checkpoint",
834            "10",
835            "--last-checkpoint",
836            "100",
837            "--pipeline",
838            "a",
839            "--pipeline",
840            "b",
841            "--task",
842            "t",
843            "--reader-interval-ms",
844            "5000",
845        ])
846        .unwrap();
847
848        assert_eq!(args.indexer.first_checkpoint, Some(10));
849        assert_eq!(args.indexer.last_checkpoint, Some(100));
850        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
851        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
852        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
853    }
854
855    /// next_checkpoint is smallest among existing watermarks + 1.
856    #[tokio::test]
857    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
858        let registry = Registry::new();
859        let store = MockStore::default();
860
861        test_pipeline!(A, "concurrent_a");
862        test_pipeline!(B, "concurrent_b");
863        test_pipeline!(C, "sequential_c");
864        test_pipeline!(D, "sequential_d");
865
866        let mut conn = store.connect().await.unwrap();
867
868        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
869        set_committer_watermark(&mut conn, A::NAME, 100).await;
870
871        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
872        set_committer_watermark(&mut conn, B::NAME, 10).await;
873
874        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
875        set_committer_watermark(&mut conn, C::NAME, 1).await;
876
877        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
878        set_committer_watermark(&mut conn, D::NAME, 50).await;
879
880        let (mut indexer, _temp_dir) =
881            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
882
883        add_concurrent(&mut indexer, A).await;
884        add_concurrent(&mut indexer, B).await;
885        add_sequential(&mut indexer, C).await;
886        add_sequential(&mut indexer, D).await;
887
888        assert_eq!(indexer.first_checkpoint, None);
889        assert_eq!(indexer.last_checkpoint, None);
890        assert_eq!(indexer.latest_checkpoint, 0);
891        assert_eq!(indexer.next_checkpoint, 2);
892        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
893    }
894
895    /// next_checkpoint is 0 when at least one pipeline has no watermark.
896    #[tokio::test]
897    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
898        let registry = Registry::new();
899        let store = MockStore::default();
900
901        test_pipeline!(A, "concurrent_a");
902        test_pipeline!(B, "concurrent_b");
903        test_pipeline!(C, "sequential_c");
904        test_pipeline!(D, "sequential_d");
905
906        let mut conn = store.connect().await.unwrap();
907        set_committer_watermark(&mut conn, B::NAME, 10).await;
908        set_committer_watermark(&mut conn, C::NAME, 1).await;
909
910        let (mut indexer, _temp_dir) =
911            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
912
913        add_concurrent(&mut indexer, A).await;
914        add_concurrent(&mut indexer, B).await;
915        add_sequential(&mut indexer, C).await;
916        add_sequential(&mut indexer, D).await;
917
918        assert_eq!(indexer.first_checkpoint, None);
919        assert_eq!(indexer.last_checkpoint, None);
920        assert_eq!(indexer.latest_checkpoint, 0);
921        assert_eq!(indexer.next_checkpoint, 0);
922        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
923    }
924
925    /// next_checkpoint is 1 when smallest committer watermark is 0.
926    #[tokio::test]
927    async fn test_next_checkpoint_smallest_is_0() {
928        let registry = Registry::new();
929        let store = MockStore::default();
930
931        test_pipeline!(A, "concurrent_a");
932        test_pipeline!(B, "concurrent_b");
933        test_pipeline!(C, "sequential_c");
934        test_pipeline!(D, "sequential_d");
935
936        let mut conn = store.connect().await.unwrap();
937        set_committer_watermark(&mut conn, A::NAME, 100).await;
938        set_committer_watermark(&mut conn, B::NAME, 10).await;
939        set_committer_watermark(&mut conn, C::NAME, 1).await;
940        set_committer_watermark(&mut conn, D::NAME, 0).await;
941
942        let (mut indexer, _temp_dir) =
943            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
944
945        add_concurrent(&mut indexer, A).await;
946        add_concurrent(&mut indexer, B).await;
947        add_sequential(&mut indexer, C).await;
948        add_sequential(&mut indexer, D).await;
949
950        assert_eq!(indexer.next_checkpoint, 1);
951    }
952
953    /// next_checkpoint is first_checkpoint when at least one pipeline has no
954    /// watermark, and first_checkpoint is smallest.
955    #[tokio::test]
956    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
957        let registry = Registry::new();
958        let store = MockStore::default();
959
960        test_pipeline!(A, "concurrent_a");
961        test_pipeline!(B, "concurrent_b");
962        test_pipeline!(C, "sequential_c");
963        test_pipeline!(D, "sequential_d");
964
965        let mut conn = store.connect().await.unwrap();
966        set_committer_watermark(&mut conn, B::NAME, 50).await;
967        set_committer_watermark(&mut conn, C::NAME, 10).await;
968
969        let indexer_args = IndexerArgs {
970            first_checkpoint: Some(5),
971            ..Default::default()
972        };
973        let (mut indexer, _temp_dir) =
974            create_test_indexer(store, indexer_args, &registry, None).await;
975
976        add_concurrent(&mut indexer, A).await;
977        add_concurrent(&mut indexer, B).await;
978        add_sequential(&mut indexer, C).await;
979        add_sequential(&mut indexer, D).await;
980
981        assert_eq!(indexer.first_checkpoint, Some(5));
982        assert_eq!(indexer.last_checkpoint, None);
983        assert_eq!(indexer.latest_checkpoint, 0);
984        assert_eq!(indexer.next_checkpoint, 5);
985        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
986    }
987
988    /// next_checkpoint is smallest among existing watermarks + 1 if
989    /// all pipelines have watermarks (ignores first_checkpoint).
990    #[tokio::test]
991    async fn test_next_checkpoint_ignore_first_checkpoint() {
992        let registry = Registry::new();
993        let store = MockStore::default();
994
995        test_pipeline!(B, "concurrent_b");
996        test_pipeline!(C, "sequential_c");
997
998        let mut conn = store.connect().await.unwrap();
999        set_committer_watermark(&mut conn, B::NAME, 50).await;
1000        set_committer_watermark(&mut conn, C::NAME, 10).await;
1001
1002        let indexer_args = IndexerArgs {
1003            first_checkpoint: Some(5),
1004            ..Default::default()
1005        };
1006        let (mut indexer, _temp_dir) =
1007            create_test_indexer(store, indexer_args, &registry, None).await;
1008
1009        add_concurrent(&mut indexer, B).await;
1010        add_sequential(&mut indexer, C).await;
1011
1012        assert_eq!(indexer.first_checkpoint, Some(5));
1013        assert_eq!(indexer.last_checkpoint, None);
1014        assert_eq!(indexer.latest_checkpoint, 0);
1015        assert_eq!(indexer.next_checkpoint, 11);
1016        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1017    }
1018
1019    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1020    /// will not be used as the starting point if it is not the smallest valid committer watermark
1021    /// to resume ingesting from.
1022    #[tokio::test]
1023    async fn test_next_checkpoint_large_first_checkpoint() {
1024        let registry = Registry::new();
1025        let store = MockStore::default();
1026
1027        test_pipeline!(A, "concurrent_a");
1028        test_pipeline!(B, "concurrent_b");
1029        test_pipeline!(C, "sequential_c");
1030
1031        let mut conn = store.connect().await.unwrap();
1032        set_committer_watermark(&mut conn, B::NAME, 50).await;
1033        set_committer_watermark(&mut conn, C::NAME, 10).await;
1034
1035        let indexer_args = IndexerArgs {
1036            first_checkpoint: Some(24),
1037            ..Default::default()
1038        };
1039        let (mut indexer, _temp_dir) =
1040            create_test_indexer(store, indexer_args, &registry, None).await;
1041
1042        add_concurrent(&mut indexer, A).await;
1043        add_concurrent(&mut indexer, B).await;
1044        add_sequential(&mut indexer, C).await;
1045
1046        assert_eq!(indexer.first_checkpoint, Some(24));
1047        assert_eq!(indexer.last_checkpoint, None);
1048        assert_eq!(indexer.latest_checkpoint, 0);
1049        assert_eq!(indexer.next_checkpoint, 11);
1050        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1051    }
1052
1053    /// latest_checkpoint is read from the watermark file.
1054    #[tokio::test]
1055    async fn test_latest_checkpoint_from_watermark() {
1056        let registry = Registry::new();
1057        let store = MockStore::default();
1058        let temp_dir = init_ingestion_dir(Some(30));
1059        let indexer = Indexer::new(
1060            store,
1061            IndexerArgs::default(),
1062            ClientArgs {
1063                ingestion: IngestionClientArgs {
1064                    local_ingestion_path: Some(temp_dir.path().to_owned()),
1065                    ..Default::default()
1066                },
1067                ..Default::default()
1068            },
1069            IngestionConfig::default(),
1070            None,
1071            &registry,
1072        )
1073        .await
1074        .unwrap();
1075
1076        assert_eq!(indexer.latest_checkpoint, 30);
1077    }
1078
1079    /// No watermark, no first_checkpoint, pruner with retention:
1080    /// next_checkpoint = LATEST_CHECKPOINT - retention.
1081    #[tokio::test]
1082    async fn test_next_checkpoint_with_pruner_uses_retention() {
1083        let retention = LATEST_CHECKPOINT - 1;
1084        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1085        assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1086    }
1087
1088    /// No watermark, no first_checkpoint, no pruner: falls back to 0.
1089    #[tokio::test]
1090    async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1091        let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1092        assert_eq!(indexer.next_checkpoint, 0);
1093    }
1094
1095    /// Watermark at 5 takes priority over latest_checkpoint - retention.
1096    #[tokio::test]
1097    async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1098        let retention = LATEST_CHECKPOINT - 1;
1099        let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1100        assert_eq!(indexer.next_checkpoint, 6);
1101    }
1102
1103    /// first_checkpoint takes priority over latest_checkpoint - retention when
1104    /// there's no watermark.
1105    #[tokio::test]
1106    async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1107        let retention = LATEST_CHECKPOINT - 1;
1108        let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1109        assert_eq!(indexer.next_checkpoint, 2);
1110    }
1111
1112    /// When retention exceeds latest_checkpoint, saturating_sub clamps to 0.
1113    #[tokio::test]
1114    async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1115        let retention = LATEST_CHECKPOINT + 1;
1116        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1117        assert_eq!(indexer.next_checkpoint, 0);
1118    }
1119
1120    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1121    #[tokio::test]
1122    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1123        let registry = Registry::new();
1124        let store = MockStore::default();
1125
1126        test_pipeline!(A, "concurrent_a");
1127        test_pipeline!(B, "concurrent_b");
1128        test_pipeline!(C, "sequential_c");
1129        test_pipeline!(D, "sequential_d");
1130
1131        let mut conn = store.connect().await.unwrap();
1132        set_committer_watermark(&mut conn, A::NAME, 5).await;
1133        set_committer_watermark(&mut conn, B::NAME, 10).await;
1134        set_committer_watermark(&mut conn, C::NAME, 15).await;
1135        set_committer_watermark(&mut conn, D::NAME, 20).await;
1136
1137        let indexer_args = IndexerArgs {
1138            last_checkpoint: Some(29),
1139            ..Default::default()
1140        };
1141        let (mut indexer, _temp_dir) =
1142            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1143
1144        add_concurrent(&mut indexer, A).await;
1145        add_concurrent(&mut indexer, B).await;
1146        add_sequential(&mut indexer, C).await;
1147        add_sequential(&mut indexer, D).await;
1148
1149        let ingestion_metrics = indexer.ingestion_metrics().clone();
1150        let indexer_metrics = indexer.indexer_metrics().clone();
1151
1152        indexer.run().await.unwrap().join().await.unwrap();
1153
1154        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1155        assert_out_of_order!(indexer_metrics, A::NAME, 0);
1156        assert_out_of_order!(indexer_metrics, B::NAME, 5);
1157        assert_out_of_order!(indexer_metrics, C::NAME, 10);
1158        assert_out_of_order!(indexer_metrics, D::NAME, 15);
1159    }
1160
1161    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1162    #[tokio::test]
1163    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1164        let registry = Registry::new();
1165        let store = MockStore::default();
1166
1167        test_pipeline!(A, "concurrent_a");
1168        test_pipeline!(B, "concurrent_b");
1169        test_pipeline!(C, "sequential_c");
1170        test_pipeline!(D, "sequential_d");
1171
1172        let mut conn = store.connect().await.unwrap();
1173        set_committer_watermark(&mut conn, A::NAME, 5).await;
1174        set_committer_watermark(&mut conn, B::NAME, 10).await;
1175        set_committer_watermark(&mut conn, C::NAME, 15).await;
1176        set_committer_watermark(&mut conn, D::NAME, 20).await;
1177
1178        let indexer_args = IndexerArgs {
1179            first_checkpoint: Some(3),
1180            last_checkpoint: Some(29),
1181            ..Default::default()
1182        };
1183        let (mut indexer, _temp_dir) =
1184            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1185
1186        add_concurrent(&mut indexer, A).await;
1187        add_concurrent(&mut indexer, B).await;
1188        add_sequential(&mut indexer, C).await;
1189        add_sequential(&mut indexer, D).await;
1190
1191        let ingestion_metrics = indexer.ingestion_metrics().clone();
1192        let metrics = indexer.indexer_metrics().clone();
1193        indexer.run().await.unwrap().join().await.unwrap();
1194
1195        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1196        assert_out_of_order!(metrics, A::NAME, 0);
1197        assert_out_of_order!(metrics, B::NAME, 5);
1198        assert_out_of_order!(metrics, C::NAME, 10);
1199        assert_out_of_order!(metrics, D::NAME, 15);
1200    }
1201
1202    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1203    #[tokio::test]
1204    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1205        let registry = Registry::new();
1206        let store = MockStore::default();
1207
1208        test_pipeline!(A, "concurrent_a");
1209        test_pipeline!(B, "concurrent_b");
1210        test_pipeline!(C, "sequential_c");
1211        test_pipeline!(D, "sequential_d");
1212
1213        let mut conn = store.connect().await.unwrap();
1214        set_committer_watermark(&mut conn, B::NAME, 10).await;
1215        set_committer_watermark(&mut conn, C::NAME, 15).await;
1216        set_committer_watermark(&mut conn, D::NAME, 20).await;
1217
1218        let indexer_args = IndexerArgs {
1219            last_checkpoint: Some(29),
1220            ..Default::default()
1221        };
1222        let (mut indexer, _temp_dir) =
1223            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1224
1225        add_concurrent(&mut indexer, A).await;
1226        add_concurrent(&mut indexer, B).await;
1227        add_sequential(&mut indexer, C).await;
1228        add_sequential(&mut indexer, D).await;
1229
1230        let ingestion_metrics = indexer.ingestion_metrics().clone();
1231        let metrics = indexer.indexer_metrics().clone();
1232        indexer.run().await.unwrap().join().await.unwrap();
1233
1234        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1235        assert_out_of_order!(metrics, A::NAME, 0);
1236        assert_out_of_order!(metrics, B::NAME, 11);
1237        assert_out_of_order!(metrics, C::NAME, 16);
1238        assert_out_of_order!(metrics, D::NAME, 21);
1239    }
1240
1241    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1242    #[tokio::test]
1243    async fn test_indexer_ingestion_use_first_checkpoint() {
1244        let registry = Registry::new();
1245        let store = MockStore::default();
1246
1247        test_pipeline!(A, "concurrent_a");
1248        test_pipeline!(B, "concurrent_b");
1249        test_pipeline!(C, "sequential_c");
1250        test_pipeline!(D, "sequential_d");
1251
1252        let mut conn = store.connect().await.unwrap();
1253        set_committer_watermark(&mut conn, B::NAME, 10).await;
1254        set_committer_watermark(&mut conn, C::NAME, 15).await;
1255        set_committer_watermark(&mut conn, D::NAME, 20).await;
1256
1257        let indexer_args = IndexerArgs {
1258            first_checkpoint: Some(10),
1259            last_checkpoint: Some(29),
1260            ..Default::default()
1261        };
1262        let (mut indexer, _temp_dir) =
1263            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1264
1265        add_concurrent(&mut indexer, A).await;
1266        add_concurrent(&mut indexer, B).await;
1267        add_sequential(&mut indexer, C).await;
1268        add_sequential(&mut indexer, D).await;
1269
1270        let ingestion_metrics = indexer.ingestion_metrics().clone();
1271        let metrics = indexer.indexer_metrics().clone();
1272        indexer.run().await.unwrap().join().await.unwrap();
1273
1274        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1275        assert_out_of_order!(metrics, A::NAME, 0);
1276        assert_out_of_order!(metrics, B::NAME, 1);
1277        assert_out_of_order!(metrics, C::NAME, 6);
1278        assert_out_of_order!(metrics, D::NAME, 11);
1279    }
1280
1281    #[tokio::test]
1282    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1283        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1284        assert_eq!(committer_watermark, None);
1285        assert_eq!(pruner_watermark, None);
1286    }
1287
1288    #[tokio::test]
1289    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1290        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1291        assert_eq!(committer_watermark, None);
1292        assert_eq!(pruner_watermark, None);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1297        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1298
1299        let committer_watermark = committer_watermark.unwrap();
1300        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1301
1302        let pruner_watermark = pruner_watermark.unwrap();
1303        assert_eq!(pruner_watermark.reader_lo, 1);
1304        assert_eq!(pruner_watermark.pruner_hi, 1);
1305    }
1306
1307    #[tokio::test]
1308    async fn test_init_watermark_sequential() {
1309        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1310
1311        let committer_watermark = committer_watermark.unwrap();
1312        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1313
1314        let pruner_watermark = pruner_watermark.unwrap();
1315        assert_eq!(pruner_watermark.reader_lo, 1);
1316        assert_eq!(pruner_watermark.pruner_hi, 1);
1317    }
1318
1319    #[tokio::test]
1320    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1321        let registry = Registry::new();
1322        let store = MockStore::default();
1323
1324        let mut conn = store.connect().await.unwrap();
1325        set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1326        set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1327
1328        let indexer_args = IndexerArgs {
1329            first_checkpoint: None,
1330            last_checkpoint: Some(19),
1331            pipeline: vec![],
1332            ..Default::default()
1333        };
1334        let (mut indexer, _temp_dir) =
1335            create_test_indexer(store.clone(), indexer_args, &registry, Some((20, 2))).await;
1336
1337        // Add first sequential pipeline
1338        add_sequential(&mut indexer, MockHandler).await;
1339
1340        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1341        assert_eq!(
1342            indexer.next_sequential_checkpoint(),
1343            Some(11),
1344            "next_sequential_checkpoint should be 11"
1345        );
1346
1347        // Add second sequential pipeline
1348        add_sequential(&mut indexer, SequentialHandler).await;
1349
1350        // Should change to 6 (minimum of 6 and 11)
1351        assert_eq!(
1352            indexer.next_sequential_checkpoint(),
1353            Some(6),
1354            "next_sequential_checkpoint should still be 6"
1355        );
1356
1357        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1358        indexer.run().await.unwrap().join().await.unwrap();
1359
1360        // Verify each pipeline made some progress independently
1361        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1362        let watermark2 = conn
1363            .committer_watermark(SequentialHandler::NAME)
1364            .await
1365            .unwrap();
1366
1367        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1368        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1369    }
1370
1371    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1372    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1373    /// committing checkpoints less than the main pipeline's reader watermark.
1374    #[tokio::test]
1375    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1376        let registry = Registry::new();
1377        let store = MockStore::default();
1378
1379        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1380        // reader watermark at `7`.
1381        let mut conn = store.connect().await.unwrap();
1382        set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1383        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1384            .await
1385            .unwrap();
1386
1387        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1388        // be ignored by the tasked indexer.
1389        let indexer_args = IndexerArgs {
1390            first_checkpoint: Some(0),
1391            last_checkpoint: Some(15),
1392            task: TaskArgs::tasked("task".to_string(), 10),
1393            ..Default::default()
1394        };
1395        let (mut tasked_indexer, _temp_dir) =
1396            create_test_indexer(store.clone(), indexer_args, &registry, Some((16, 2))).await;
1397
1398        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1399
1400        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1401        let metrics = tasked_indexer.indexer_metrics().clone();
1402
1403        tasked_indexer.run().await.unwrap().join().await.unwrap();
1404
1405        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1406        assert_eq!(
1407            metrics
1408                .total_collector_skipped_checkpoints
1409                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1410                .unwrap()
1411                .get(),
1412            7
1413        );
1414        let data = store
1415            .data
1416            .get(MockCheckpointSequenceNumberHandler::NAME)
1417            .unwrap();
1418        assert_eq!(data.len(), 9);
1419        for i in 0..7 {
1420            assert!(data.get(&i).is_none());
1421        }
1422        for i in 7..16 {
1423            assert!(data.get(&i).is_some());
1424        }
1425    }
1426
1427    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1428    #[tokio::test]
1429    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1430        let registry = Registry::new();
1431        let store = MockStore::default();
1432
1433        let mut conn = store.connect().await.unwrap();
1434        set_committer_watermark(&mut conn, "test", 10).await;
1435        conn.set_reader_watermark("test", 5).await.unwrap();
1436
1437        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1438        // watermarks.
1439        let indexer_args = IndexerArgs {
1440            first_checkpoint: Some(9),
1441            last_checkpoint: Some(25),
1442            task: TaskArgs::tasked("task".to_string(), 10),
1443            ..Default::default()
1444        };
1445        let (mut tasked_indexer, _temp_dir) =
1446            create_test_indexer(store.clone(), indexer_args, &registry, Some((26, 2))).await;
1447
1448        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1449
1450        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1451        let metrics = tasked_indexer.indexer_metrics().clone();
1452
1453        tasked_indexer.run().await.unwrap().join().await.unwrap();
1454
1455        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1456        assert_out_of_order!(metrics, "test", 0);
1457        assert_eq!(
1458            metrics
1459                .total_collector_skipped_checkpoints
1460                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1461                .unwrap()
1462                .get(),
1463            0
1464        );
1465
1466        let data = store.data.get("test").unwrap();
1467        assert_eq!(data.len(), 17);
1468        for i in 0..9 {
1469            assert!(data.get(&i).is_none());
1470        }
1471        for i in 9..26 {
1472            assert!(data.get(&i).is_some());
1473        }
1474        let main_pipeline_watermark = store.watermark("test").unwrap();
1475        // assert that the main pipeline's watermarks are not updated
1476        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1477        assert_eq!(main_pipeline_watermark.reader_lo, 5);
1478        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1479        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1480        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1481    }
1482
1483    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
1484    /// committed, and any checkpoints inflight < X will be skipped.
1485    #[tokio::test]
1486    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1487        let registry = Registry::new();
1488        let store = MockStore::default();
1489        let mut conn = store.connect().await.unwrap();
1490        // Set the main pipeline watermark.
1491        set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1492
1493        // Generate 500 checkpoints upfront, for the indexer to process all at once.
1494        let indexer_args = IndexerArgs {
1495            first_checkpoint: Some(0),
1496            last_checkpoint: Some(500),
1497            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
1498            ..Default::default()
1499        };
1500        let (mut tasked_indexer, _temp_dir) =
1501            create_test_indexer(store.clone(), indexer_args, &registry, Some((501, 2))).await;
1502        let mut allow_process = 10;
1503        // Limit the pipeline to process only checkpoints `[0, 10]`.
1504        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1505        let _ = tasked_indexer
1506            .concurrent_pipeline(
1507                controllable_handler,
1508                ConcurrentConfig {
1509                    committer: CommitterConfig {
1510                        collect_interval_ms: 10,
1511                        watermark_interval_ms: 10,
1512                        ..Default::default()
1513                    },
1514                    // High fixed concurrency so all checkpoints can be processed
1515                    // concurrently despite out-of-order arrival.
1516                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1517                    ..Default::default()
1518                },
1519            )
1520            .await;
1521        let metrics = tasked_indexer.indexer_metrics().clone();
1522
1523        let mut s_indexer = tasked_indexer.run().await.unwrap();
1524
1525        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
1526        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
1527        // committed.
1528        store
1529            .wait_for_watermark(
1530                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
1531                10,
1532                Duration::from_secs(10),
1533            )
1534            .await;
1535
1536        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
1537        // track_main_reader_lo task will eventually pick this up and update the atomic. The
1538        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
1539        // one at a time until the collector_reader_lo metric shows the new value.
1540        conn.set_reader_watermark(ControllableHandler::NAME, 250)
1541            .await
1542            .unwrap();
1543
1544        let reader_lo = metrics
1545            .collector_reader_lo
1546            .with_label_values(&[ControllableHandler::NAME]);
1547
1548        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
1549        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
1550        // checkpoints have been processed.
1551        let mut interval = tokio::time::interval(Duration::from_millis(10));
1552        while reader_lo.get() != 250 {
1553            interval.tick().await;
1554            // allow_process is initialized to 11, bump to 11 for the next checkpoint
1555            allow_process += 1;
1556            assert!(
1557                allow_process <= 500,
1558                "Released all checkpoints but collector never observed new reader_lo"
1559            );
1560            process_below.send(allow_process).ok();
1561        }
1562
1563        // At this point, the collector has observed reader_lo = 250. Release all remaining
1564        // checkpoints. Guarantees:
1565        // - [0, 10]: committed (before reader_lo was set)
1566        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
1567        // - (allow_process, 250): skipped (in-flight, filtered by collector)
1568        // - [250, 500]: committed (>= reader_lo)
1569        process_below.send(500).ok();
1570
1571        s_indexer.join().await.unwrap();
1572
1573        let data = store.data.get(ControllableHandler::NAME).unwrap();
1574
1575        // Checkpoints (allow_process, 250) must be skipped.
1576        for chkpt in (allow_process + 1)..250 {
1577            assert!(
1578                data.get(&chkpt).is_none(),
1579                "Checkpoint {chkpt} should have been skipped"
1580            );
1581        }
1582
1583        // Checkpoints >= reader_lo must be committed.
1584        for chkpt in 250..=500 {
1585            assert!(
1586                data.get(&chkpt).is_some(),
1587                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1588            );
1589        }
1590
1591        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
1592        for chkpt in 0..=10 {
1593            assert!(
1594                data.get(&chkpt).is_some(),
1595                "Checkpoint {chkpt} should have been committed (baseline)"
1596            );
1597        }
1598    }
1599}