sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51/// Command-line arguments for the indexer
52#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54    /// Override the next checkpoint for all pipelines without a committer watermark to start
55    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
56    /// setting and always resume from their committer watermark + 1.
57    ///
58    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
59    /// is the minimum across all pipelines' next checkpoints.
60    #[arg(long)]
61    pub first_checkpoint: Option<u64>,
62
63    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
64    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
65    #[arg(long)]
66    pub last_checkpoint: Option<u64>,
67
68    /// Only run the following pipelines. If not provided, all pipelines found in the
69    /// configuration file will be run.
70    #[arg(long, action = clap::ArgAction::Append)]
71    pub pipeline: Vec<String>,
72
73    /// Additional configurations for running a tasked indexer.
74    #[clap(flatten)]
75    pub task: TaskArgs,
76}
77
78/// Command-line arguments for configuring a tasked indexer.
79#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
82    /// delimiter defined on the store. This allows the same pipelines to run under multiple
83    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
84    /// entries in the database.
85    ///
86    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
87    ///
88    /// Sequential pipelines cannot be attached to a tasked indexer.
89    ///
90    /// The framework ensures that tasked pipelines never commit checkpoints below the main
91    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
92    #[arg(long, requires = "reader_interval_ms")]
93    task: Option<String>,
94
95    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
96    /// refetch its main pipeline's reader watermark.
97    ///
98    /// This is required when `--task` is set and should should ideally be set to a value that is
99    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
100    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
101    /// it.
102    ///
103    /// If the main pipeline does not have pruning enabled, this value can be set to some high
104    /// value, as the tasked pipeline will never see an updated reader watermark.
105    #[arg(long, requires = "task")]
106    reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110    /// The storage backend that the indexer uses to write and query indexed data. This
111    /// generic implementation allows for plugging in different storage solutions that implement the
112    /// `Store` trait.
113    store: S,
114
115    /// Prometheus Metrics.
116    metrics: Arc<IndexerMetrics>,
117
118    /// Service for downloading and disseminating checkpoint data.
119    ingestion_service: IngestionService,
120
121    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
122    /// watermark will start processing at this checkpoint.
123    first_checkpoint: Option<u64>,
124
125    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
126    /// this checkpoint.
127    last_checkpoint: Option<u64>,
128
129    /// The network's latest checkpoint, when the indexer was started.
130    latest_checkpoint: u64,
131
132    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
133    /// to start ingesting from.
134    next_checkpoint: u64,
135
136    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
137    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
138    next_sequential_checkpoint: Option<u64>,
139
140    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
141    /// delimiter defined on the store. This allows the same pipelines to run under multiple
142    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
143    /// entries in the database.
144    ///
145    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
146    ///
147    /// Sequential pipelines cannot be attached to a tasked indexer.
148    ///
149    /// The framework ensures that tasked pipelines never commit checkpoints below the main
150    /// pipeline’s pruner watermark.
151    task: Option<Task>,
152
153    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
154    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
155    /// a warning when the indexer is run.
156    enabled_pipelines: Option<BTreeSet<String>>,
157
158    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
159    /// with the same name isn't added twice.
160    added_pipelines: BTreeSet<&'static str>,
161
162    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
163    pipelines: Vec<Service>,
164}
165
166/// Configuration for a tasked indexer.
167#[derive(Clone)]
168pub(crate) struct Task {
169    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
170    /// record pipeline watermarks.
171    task: String,
172    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
173    /// pipeline's reader watermark.
174    reader_interval: Duration,
175}
176
177impl TaskArgs {
178    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
179        Self {
180            task: Some(task),
181            reader_interval_ms: Some(reader_interval_ms),
182        }
183    }
184
185    fn into_task(self) -> Option<Task> {
186        Some(Task {
187            task: self.task?,
188            reader_interval: Duration::from_millis(self.reader_interval_ms?),
189        })
190    }
191}
192
193impl<S: Store> Indexer<S> {
194    /// Create a new instance of the indexer framework from a store that implements the `Store`
195    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
196    /// arguments configure the following:
197    ///
198    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
199    ///   watermarks) and where to serve metrics from,
200    /// - Where to download checkpoints from,
201    /// - Concurrency and buffering parameters for downloading checkpoints.
202    ///
203    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
204    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
205    pub async fn new(
206        store: S,
207        indexer_args: IndexerArgs,
208        client_args: ClientArgs,
209        ingestion_config: IngestionConfig,
210        metrics_prefix: Option<&str>,
211        registry: &Registry,
212    ) -> anyhow::Result<Self> {
213        let ingestion_service =
214            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
215        Self::with_ingestion_service(
216            store,
217            indexer_args,
218            ingestion_service,
219            metrics_prefix,
220            registry,
221        )
222        .await
223    }
224
225    /// Variant of [`Self::new`] that accepts a pre-built
226    /// [`IngestionService`], bypassing [`ClientArgs`]-driven
227    /// construction. Callers that supply their own ingestion /
228    /// streaming clients — for example, when embedding the indexer
229    /// in a fullnode that already has checkpoint data on hand —
230    /// build the service via [`IngestionService::with_clients`] and
231    /// hand it in here.
232    pub async fn with_ingestion_service(
233        store: S,
234        indexer_args: IndexerArgs,
235        mut ingestion_service: IngestionService,
236        metrics_prefix: Option<&str>,
237        registry: &Registry,
238    ) -> anyhow::Result<Self> {
239        let IndexerArgs {
240            first_checkpoint,
241            last_checkpoint,
242            pipeline,
243            task,
244        } = indexer_args;
245
246        let metrics = IndexerMetrics::new(metrics_prefix, registry);
247
248        let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;
249
250        info!(latest_checkpoint);
251
252        Ok(Self {
253            store,
254            metrics,
255            ingestion_service,
256            first_checkpoint,
257            last_checkpoint,
258            latest_checkpoint,
259            next_checkpoint: u64::MAX,
260            next_sequential_checkpoint: None,
261            task: task.into_task(),
262            enabled_pipelines: if pipeline.is_empty() {
263                None
264            } else {
265                Some(pipeline.into_iter().collect())
266            },
267            added_pipelines: BTreeSet::new(),
268            pipelines: vec![],
269        })
270    }
271
272    /// The store used by the indexer.
273    pub fn store(&self) -> &S {
274        &self.store
275    }
276
277    /// The ingestion client used by the indexer to fetch checkpoints.
278    pub fn ingestion_client(&self) -> &IngestionClient {
279        self.ingestion_service.ingestion_client()
280    }
281
282    /// The indexer's metrics.
283    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
284        &self.metrics
285    }
286
287    /// The ingestion service's metrics.
288    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
289        self.ingestion_service.metrics()
290    }
291
292    /// The pipelines that this indexer will run.
293    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
294        self.added_pipelines.iter().copied().filter(|p| {
295            self.enabled_pipelines
296                .as_ref()
297                .is_none_or(|e| e.contains(*p))
298        })
299    }
300
301    /// The minimum next checkpoint across all sequential pipelines. This value is used to
302    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
303    /// too far ahead of sequential pipelines.
304    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
305        self.next_sequential_checkpoint
306    }
307
308    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
309    /// will start processing and committing once the ingestion service has caught up to their
310    /// respective watermarks.
311    ///
312    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
313    pub async fn run(self) -> anyhow::Result<Service> {
314        if let Some(enabled_pipelines) = self.enabled_pipelines {
315            ensure!(
316                enabled_pipelines.is_empty(),
317                "Tried to enable pipelines that this indexer does not know about: \
318                {enabled_pipelines:#?}",
319            );
320        }
321
322        let start = self.next_checkpoint;
323        let end = self.last_checkpoint;
324        info!(start, end, "Ingestion range");
325
326        let mut service = self
327            .ingestion_service
328            .run((
329                Bound::Included(start),
330                end.map_or(Bound::Unbounded, Bound::Included),
331            ))
332            .await
333            .context("Failed to start ingestion service")?;
334
335        for pipeline in self.pipelines {
336            service = service.merge(pipeline);
337        }
338
339        Ok(service)
340    }
341
342    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
343    /// checkpoint after its watermark, or if that doesn't exist, then the provided
344    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
345    ///
346    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
347    /// calculated above.
348    ///
349    /// Returns `Ok(None)` if the pipeline is disabled.
350    async fn add_pipeline<P: Processor + 'static>(
351        &mut self,
352        pipeline_task: String,
353        retention: Option<u64>,
354    ) -> anyhow::Result<Option<u64>> {
355        ensure!(
356            self.added_pipelines.insert(P::NAME),
357            "Pipeline {:?} already added",
358            P::NAME,
359        );
360
361        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
362            && !enabled_pipelines.remove(P::NAME)
363        {
364            info!(pipeline = P::NAME, "Skipping");
365            return Ok(None);
366        }
367
368        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
369        // Otherwise, use the existing record and disregard the proposed value.
370        let proposed_next_checkpoint = if let Some(first_checkpoint) = self.first_checkpoint {
371            first_checkpoint
372        } else if let Some(retention) = retention {
373            self.latest_checkpoint.saturating_sub(retention)
374        } else {
375            0
376        };
377        let mut conn = self.store.connect().await?;
378        let init_watermark = conn
379            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
380            .await
381            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
382
383        let next_checkpoint = if let Some(init_watermark) = init_watermark {
384            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
385                checkpoint_hi_inclusive + 1
386            } else {
387                0
388            }
389        } else {
390            proposed_next_checkpoint
391        };
392
393        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
394
395        Ok(Some(next_checkpoint))
396    }
397}
398
399impl<S: ConcurrentStore> Indexer<S> {
400    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
401    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
402    ///
403    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
404    /// keep the watermark table up-to-date with the highest point they can guarantee all data
405    /// exists for, for their pipeline.
406    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
407        &mut self,
408        handler: H,
409        config: ConcurrentConfig,
410    ) -> anyhow::Result<()> {
411        let pipeline_task =
412            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
413        let retention = config.pruner.as_ref().map(|p| p.retention);
414        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task, retention).await? else {
415            return Ok(());
416        };
417
418        let checkpoint_rx = self
419            .ingestion_service
420            .subscribe_bounded(config.ingestion.subscriber_channel_size());
421
422        self.pipelines.push(concurrent::pipeline::<H>(
423            handler,
424            next_checkpoint,
425            config,
426            self.store.clone(),
427            self.task.clone(),
428            checkpoint_rx,
429            self.metrics.clone(),
430        ));
431
432        Ok(())
433    }
434}
435
436impl<T: SequentialStore> Indexer<T> {
437    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
438    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
439    ///
440    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
441    /// required to handle pipelines that modify data in-place (where each update is not an insert,
442    /// but could be a modification of an existing row, where ordering between updates is
443    /// important).
444    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
445        &mut self,
446        handler: H,
447        config: SequentialConfig,
448    ) -> anyhow::Result<()> {
449        if self.task.is_some() {
450            bail!(
451                "Sequential pipelines do not support pipeline tasks. \
452                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
453                Running the same pipeline under a different task would violate these guarantees."
454            );
455        }
456
457        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned(), None).await? else {
458            return Ok(());
459        };
460
461        // Track the minimum next_checkpoint across all sequential pipelines
462        self.next_sequential_checkpoint = Some(
463            self.next_sequential_checkpoint
464                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
465        );
466
467        let checkpoint_rx = self
468            .ingestion_service
469            .subscribe_bounded(config.ingestion.subscriber_channel_size());
470
471        self.pipelines.push(sequential::pipeline::<H>(
472            handler,
473            next_checkpoint,
474            config,
475            self.store.clone(),
476            checkpoint_rx,
477            self.metrics.clone(),
478        ));
479
480        Ok(())
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use std::sync::Arc;
487
488    use async_trait::async_trait;
489    use clap::Parser;
490    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
491    use sui_synthetic_ingestion::synthetic_ingestion;
492    use tokio::sync::watch;
493
494    use crate::FieldCount;
495    use crate::config::ConcurrencyConfig;
496    use crate::ingestion::ingestion_client::IngestionClientArgs;
497    use crate::ingestion::store_client::ObjectStoreWatermark;
498    use crate::ingestion::store_client::WATERMARK_PATH;
499    use crate::mocks::store::FallibleMockStore;
500    use crate::pipeline::CommitterConfig;
501    use crate::pipeline::Processor;
502    use crate::pipeline::concurrent::ConcurrentConfig;
503    use crate::store::CommitterWatermark;
504    use crate::store::ConcurrentConnection as _;
505    use crate::store::Connection as _;
506
507    use super::*;
508
509    #[allow(dead_code)]
510    #[derive(Clone, FieldCount)]
511    struct MockValue(u64);
512
513    /// A handler that can be controlled externally to block checkpoint processing.
514    struct ControllableHandler {
515        /// Process checkpoints less than or equal to this value.
516        process_below: watch::Receiver<u64>,
517    }
518
519    impl ControllableHandler {
520        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
521            let (tx, rx) = watch::channel(limit);
522            (Self { process_below: rx }, tx)
523        }
524    }
525
526    #[async_trait]
527    impl Processor for ControllableHandler {
528        const NAME: &'static str = "controllable";
529        type Value = MockValue;
530
531        async fn process(
532            &self,
533            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
534        ) -> anyhow::Result<Vec<Self::Value>> {
535            let cp_num = checkpoint.summary.sequence_number;
536
537            // Wait until the checkpoint is allowed to be processed
538            self.process_below
539                .clone()
540                .wait_for(|&limit| cp_num <= limit)
541                .await
542                .ok();
543
544            Ok(vec![MockValue(cp_num)])
545        }
546    }
547
548    #[async_trait]
549    impl concurrent::Handler for ControllableHandler {
550        type Store = FallibleMockStore;
551        type Batch = Vec<MockValue>;
552
553        fn batch(
554            &self,
555            batch: &mut Self::Batch,
556            values: &mut std::vec::IntoIter<Self::Value>,
557        ) -> concurrent::BatchStatus {
558            batch.extend(values);
559            concurrent::BatchStatus::Ready
560        }
561
562        async fn commit<'a>(
563            &self,
564            batch: &Self::Batch,
565            conn: &mut <Self::Store as Store>::Connection<'a>,
566        ) -> anyhow::Result<usize> {
567            for value in batch {
568                conn.0
569                    .commit_data(Self::NAME, value.0, vec![value.0])
570                    .await?;
571            }
572            Ok(batch.len())
573        }
574    }
575
576    macro_rules! test_pipeline {
577        ($handler:ident, $name:literal) => {
578            struct $handler;
579
580            #[async_trait]
581            impl Processor for $handler {
582                const NAME: &'static str = $name;
583                type Value = MockValue;
584                async fn process(
585                    &self,
586                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
587                ) -> anyhow::Result<Vec<Self::Value>> {
588                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
589                }
590            }
591
592            #[async_trait]
593            impl crate::pipeline::concurrent::Handler for $handler {
594                type Store = FallibleMockStore;
595                type Batch = Vec<Self::Value>;
596
597                fn batch(
598                    &self,
599                    batch: &mut Self::Batch,
600                    values: &mut std::vec::IntoIter<Self::Value>,
601                ) -> crate::pipeline::concurrent::BatchStatus {
602                    batch.extend(values);
603                    crate::pipeline::concurrent::BatchStatus::Pending
604                }
605
606                async fn commit<'a>(
607                    &self,
608                    batch: &Self::Batch,
609                    conn: &mut <Self::Store as Store>::Connection<'a>,
610                ) -> anyhow::Result<usize> {
611                    for value in batch {
612                        conn.0
613                            .commit_data(Self::NAME, value.0, vec![value.0])
614                            .await?;
615                    }
616                    Ok(batch.len())
617                }
618            }
619
620            #[async_trait]
621            impl crate::pipeline::sequential::Handler for $handler {
622                type Store = FallibleMockStore;
623                type Batch = Vec<Self::Value>;
624
625                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
626                    batch.extend(values);
627                }
628
629                async fn commit<'a>(
630                    &self,
631                    _batch: &Self::Batch,
632                    _conn: &mut <Self::Store as Store>::Connection<'a>,
633                ) -> anyhow::Result<usize> {
634                    Ok(1)
635                }
636            }
637        };
638    }
639
640    test_pipeline!(MockHandler, "test_processor");
641    test_pipeline!(SequentialHandler, "sequential_handler");
642    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
643
644    fn init_ingestion_dir(latest_checkpoint: Option<u64>) -> tempfile::TempDir {
645        let dir = tempfile::tempdir().unwrap();
646        if let Some(cp) = latest_checkpoint {
647            let watermark_path = dir.path().join(WATERMARK_PATH);
648            std::fs::create_dir_all(watermark_path.parent().unwrap()).unwrap();
649            let watermark = ObjectStoreWatermark {
650                checkpoint_hi_inclusive: cp,
651            };
652            std::fs::write(watermark_path, serde_json::to_string(&watermark).unwrap()).unwrap();
653        }
654        dir
655    }
656
657    /// If `ingestion_data` is `Some((num_checkpoints, checkpoint_size))`, synthetic ingestion
658    /// data will be generated in the temp directory before creating the indexer.
659    async fn create_test_indexer(
660        store: FallibleMockStore,
661        indexer_args: IndexerArgs,
662        registry: &Registry,
663        ingestion_data: Option<(u64, u64)>,
664    ) -> (Indexer<FallibleMockStore>, tempfile::TempDir) {
665        let temp_dir = init_ingestion_dir(None);
666        if let Some((num_checkpoints, checkpoint_size)) = ingestion_data {
667            synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
668                ingestion_dir: temp_dir.path().to_owned(),
669                starting_checkpoint: 0,
670                num_checkpoints,
671                checkpoint_size,
672            })
673            .await;
674        }
675        let client_args = ClientArgs {
676            ingestion: IngestionClientArgs {
677                local_ingestion_path: Some(temp_dir.path().to_owned()),
678                ..Default::default()
679            },
680            ..Default::default()
681        };
682        let indexer = Indexer::new(
683            store,
684            indexer_args,
685            client_args,
686            IngestionConfig::default(),
687            None,
688            registry,
689        )
690        .await
691        .unwrap();
692        (indexer, temp_dir)
693    }
694
695    async fn set_committer_watermark(
696        conn: &mut <FallibleMockStore as Store>::Connection<'_>,
697        name: &str,
698        hi: u64,
699    ) {
700        conn.set_committer_watermark(
701            name,
702            CommitterWatermark {
703                checkpoint_hi_inclusive: hi,
704                ..Default::default()
705            },
706        )
707        .await
708        .unwrap();
709    }
710
711    async fn add_concurrent<H: concurrent::Handler<Store = FallibleMockStore>>(
712        indexer: &mut Indexer<FallibleMockStore>,
713        handler: H,
714    ) {
715        indexer
716            .concurrent_pipeline(handler, ConcurrentConfig::default())
717            .await
718            .unwrap();
719    }
720
721    async fn add_sequential<H: sequential::Handler<Store = FallibleMockStore>>(
722        indexer: &mut Indexer<FallibleMockStore>,
723        handler: H,
724    ) {
725        indexer
726            .sequential_pipeline(handler, SequentialConfig::default())
727            .await
728            .unwrap();
729    }
730
731    macro_rules! assert_out_of_order {
732        ($metrics:expr, $pipeline:expr, $expected:expr) => {
733            assert_eq!(
734                $metrics
735                    .total_watermarks_out_of_order
736                    .get_metric_with_label_values(&[$pipeline])
737                    .unwrap()
738                    .get(),
739                $expected,
740            );
741        };
742    }
743
744    async fn test_init_watermark(
745        first_checkpoint: Option<u64>,
746        is_concurrent: bool,
747    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
748        let registry = Registry::new();
749        let store = FallibleMockStore::default();
750
751        test_pipeline!(A, "pipeline_name");
752
753        let mut conn = store.connect().await.unwrap();
754
755        let indexer_args = IndexerArgs {
756            first_checkpoint,
757            ..IndexerArgs::default()
758        };
759        let (mut indexer, _temp_dir) =
760            create_test_indexer(store.clone(), indexer_args, &registry, None).await;
761
762        if is_concurrent {
763            add_concurrent(&mut indexer, A).await;
764        } else {
765            add_sequential(&mut indexer, A).await;
766        }
767
768        (
769            conn.committer_watermark(A::NAME).await.unwrap(),
770            conn.pruner_watermark(A::NAME, Duration::ZERO)
771                .await
772                .unwrap(),
773        )
774    }
775
776    const LATEST_CHECKPOINT: u64 = 10;
777
778    /// Set up an indexer as if the network is at `latest_checkpoint` (the next checkpoint to
779    /// ingest). Runs a single concurrent pipeline with the given config. If `watermark` is
780    /// provided, the pipeline's high watermark is pre-set; `first_checkpoint` controls where
781    /// ingestion begins.
782    async fn test_next_checkpoint(
783        watermark: Option<u64>,
784        first_checkpoint: Option<u64>,
785        concurrent_config: ConcurrentConfig,
786    ) -> Indexer<FallibleMockStore> {
787        let registry = Registry::new();
788        let store = FallibleMockStore::default();
789
790        test_pipeline!(A, "concurrent_a");
791
792        if let Some(checkpoint_hi_inclusive) = watermark {
793            let mut conn = store.connect().await.unwrap();
794            conn.set_committer_watermark(
795                A::NAME,
796                CommitterWatermark {
797                    checkpoint_hi_inclusive,
798                    ..Default::default()
799                },
800            )
801            .await
802            .unwrap();
803        }
804
805        let temp_dir = init_ingestion_dir(Some(LATEST_CHECKPOINT));
806        let mut indexer = Indexer::new(
807            store,
808            IndexerArgs {
809                first_checkpoint,
810                ..Default::default()
811            },
812            ClientArgs {
813                ingestion: IngestionClientArgs {
814                    local_ingestion_path: Some(temp_dir.path().to_owned()),
815                    ..Default::default()
816                },
817                ..Default::default()
818            },
819            IngestionConfig::default(),
820            None,
821            &registry,
822        )
823        .await
824        .unwrap();
825
826        assert_eq!(indexer.latest_checkpoint, LATEST_CHECKPOINT);
827
828        indexer
829            .concurrent_pipeline::<A>(A, concurrent_config)
830            .await
831            .unwrap();
832
833        indexer
834    }
835
836    fn pruner_config(retention: u64) -> ConcurrentConfig {
837        ConcurrentConfig {
838            pruner: Some(concurrent::PrunerConfig {
839                retention,
840                ..Default::default()
841            }),
842            ..Default::default()
843        }
844    }
845
846    #[test]
847    fn test_arg_parsing() {
848        #[derive(Parser)]
849        struct Args {
850            #[clap(flatten)]
851            indexer: IndexerArgs,
852        }
853
854        let args = Args::try_parse_from([
855            "cmd",
856            "--first-checkpoint",
857            "10",
858            "--last-checkpoint",
859            "100",
860            "--pipeline",
861            "a",
862            "--pipeline",
863            "b",
864            "--task",
865            "t",
866            "--reader-interval-ms",
867            "5000",
868        ])
869        .unwrap();
870
871        assert_eq!(args.indexer.first_checkpoint, Some(10));
872        assert_eq!(args.indexer.last_checkpoint, Some(100));
873        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
874        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
875        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
876    }
877
878    /// next_checkpoint is smallest among existing watermarks + 1.
879    #[tokio::test]
880    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
881        let registry = Registry::new();
882        let store = FallibleMockStore::default();
883
884        test_pipeline!(A, "concurrent_a");
885        test_pipeline!(B, "concurrent_b");
886        test_pipeline!(C, "sequential_c");
887        test_pipeline!(D, "sequential_d");
888
889        let mut conn = store.connect().await.unwrap();
890
891        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
892        set_committer_watermark(&mut conn, A::NAME, 100).await;
893
894        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
895        set_committer_watermark(&mut conn, B::NAME, 10).await;
896
897        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
898        set_committer_watermark(&mut conn, C::NAME, 1).await;
899
900        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
901        set_committer_watermark(&mut conn, D::NAME, 50).await;
902
903        let (mut indexer, _temp_dir) =
904            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
905
906        add_concurrent(&mut indexer, A).await;
907        add_concurrent(&mut indexer, B).await;
908        add_sequential(&mut indexer, C).await;
909        add_sequential(&mut indexer, D).await;
910
911        assert_eq!(indexer.first_checkpoint, None);
912        assert_eq!(indexer.last_checkpoint, None);
913        assert_eq!(indexer.latest_checkpoint, 0);
914        assert_eq!(indexer.next_checkpoint, 2);
915        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
916    }
917
918    /// next_checkpoint is 0 when at least one pipeline has no watermark.
919    #[tokio::test]
920    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
921        let registry = Registry::new();
922        let store = FallibleMockStore::default();
923
924        test_pipeline!(A, "concurrent_a");
925        test_pipeline!(B, "concurrent_b");
926        test_pipeline!(C, "sequential_c");
927        test_pipeline!(D, "sequential_d");
928
929        let mut conn = store.connect().await.unwrap();
930        set_committer_watermark(&mut conn, B::NAME, 10).await;
931        set_committer_watermark(&mut conn, C::NAME, 1).await;
932
933        let (mut indexer, _temp_dir) =
934            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
935
936        add_concurrent(&mut indexer, A).await;
937        add_concurrent(&mut indexer, B).await;
938        add_sequential(&mut indexer, C).await;
939        add_sequential(&mut indexer, D).await;
940
941        assert_eq!(indexer.first_checkpoint, None);
942        assert_eq!(indexer.last_checkpoint, None);
943        assert_eq!(indexer.latest_checkpoint, 0);
944        assert_eq!(indexer.next_checkpoint, 0);
945        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
946    }
947
948    /// next_checkpoint is 1 when smallest committer watermark is 0.
949    #[tokio::test]
950    async fn test_next_checkpoint_smallest_is_0() {
951        let registry = Registry::new();
952        let store = FallibleMockStore::default();
953
954        test_pipeline!(A, "concurrent_a");
955        test_pipeline!(B, "concurrent_b");
956        test_pipeline!(C, "sequential_c");
957        test_pipeline!(D, "sequential_d");
958
959        let mut conn = store.connect().await.unwrap();
960        set_committer_watermark(&mut conn, A::NAME, 100).await;
961        set_committer_watermark(&mut conn, B::NAME, 10).await;
962        set_committer_watermark(&mut conn, C::NAME, 1).await;
963        set_committer_watermark(&mut conn, D::NAME, 0).await;
964
965        let (mut indexer, _temp_dir) =
966            create_test_indexer(store, IndexerArgs::default(), &registry, None).await;
967
968        add_concurrent(&mut indexer, A).await;
969        add_concurrent(&mut indexer, B).await;
970        add_sequential(&mut indexer, C).await;
971        add_sequential(&mut indexer, D).await;
972
973        assert_eq!(indexer.next_checkpoint, 1);
974    }
975
976    /// next_checkpoint is first_checkpoint when at least one pipeline has no
977    /// watermark, and first_checkpoint is smallest.
978    #[tokio::test]
979    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
980        let registry = Registry::new();
981        let store = FallibleMockStore::default();
982
983        test_pipeline!(A, "concurrent_a");
984        test_pipeline!(B, "concurrent_b");
985        test_pipeline!(C, "sequential_c");
986        test_pipeline!(D, "sequential_d");
987
988        let mut conn = store.connect().await.unwrap();
989        set_committer_watermark(&mut conn, B::NAME, 50).await;
990        set_committer_watermark(&mut conn, C::NAME, 10).await;
991
992        let indexer_args = IndexerArgs {
993            first_checkpoint: Some(5),
994            ..Default::default()
995        };
996        let (mut indexer, _temp_dir) =
997            create_test_indexer(store, indexer_args, &registry, None).await;
998
999        add_concurrent(&mut indexer, A).await;
1000        add_concurrent(&mut indexer, B).await;
1001        add_sequential(&mut indexer, C).await;
1002        add_sequential(&mut indexer, D).await;
1003
1004        assert_eq!(indexer.first_checkpoint, Some(5));
1005        assert_eq!(indexer.last_checkpoint, None);
1006        assert_eq!(indexer.latest_checkpoint, 0);
1007        assert_eq!(indexer.next_checkpoint, 5);
1008        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1009    }
1010
1011    /// next_checkpoint is smallest among existing watermarks + 1 if
1012    /// all pipelines have watermarks (ignores first_checkpoint).
1013    #[tokio::test]
1014    async fn test_next_checkpoint_ignore_first_checkpoint() {
1015        let registry = Registry::new();
1016        let store = FallibleMockStore::default();
1017
1018        test_pipeline!(B, "concurrent_b");
1019        test_pipeline!(C, "sequential_c");
1020
1021        let mut conn = store.connect().await.unwrap();
1022        set_committer_watermark(&mut conn, B::NAME, 50).await;
1023        set_committer_watermark(&mut conn, C::NAME, 10).await;
1024
1025        let indexer_args = IndexerArgs {
1026            first_checkpoint: Some(5),
1027            ..Default::default()
1028        };
1029        let (mut indexer, _temp_dir) =
1030            create_test_indexer(store, indexer_args, &registry, None).await;
1031
1032        add_concurrent(&mut indexer, B).await;
1033        add_sequential(&mut indexer, C).await;
1034
1035        assert_eq!(indexer.first_checkpoint, Some(5));
1036        assert_eq!(indexer.last_checkpoint, None);
1037        assert_eq!(indexer.latest_checkpoint, 0);
1038        assert_eq!(indexer.next_checkpoint, 11);
1039        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1040    }
1041
1042    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1043    /// will not be used as the starting point if it is not the smallest valid committer watermark
1044    /// to resume ingesting from.
1045    #[tokio::test]
1046    async fn test_next_checkpoint_large_first_checkpoint() {
1047        let registry = Registry::new();
1048        let store = FallibleMockStore::default();
1049
1050        test_pipeline!(A, "concurrent_a");
1051        test_pipeline!(B, "concurrent_b");
1052        test_pipeline!(C, "sequential_c");
1053
1054        let mut conn = store.connect().await.unwrap();
1055        set_committer_watermark(&mut conn, B::NAME, 50).await;
1056        set_committer_watermark(&mut conn, C::NAME, 10).await;
1057
1058        let indexer_args = IndexerArgs {
1059            first_checkpoint: Some(24),
1060            ..Default::default()
1061        };
1062        let (mut indexer, _temp_dir) =
1063            create_test_indexer(store, indexer_args, &registry, None).await;
1064
1065        add_concurrent(&mut indexer, A).await;
1066        add_concurrent(&mut indexer, B).await;
1067        add_sequential(&mut indexer, C).await;
1068
1069        assert_eq!(indexer.first_checkpoint, Some(24));
1070        assert_eq!(indexer.last_checkpoint, None);
1071        assert_eq!(indexer.latest_checkpoint, 0);
1072        assert_eq!(indexer.next_checkpoint, 11);
1073        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1074    }
1075
1076    /// latest_checkpoint is read from the watermark file.
1077    #[tokio::test]
1078    async fn test_latest_checkpoint_from_watermark() {
1079        let registry = Registry::new();
1080        let store = FallibleMockStore::default();
1081        let temp_dir = init_ingestion_dir(Some(30));
1082        let indexer = Indexer::new(
1083            store,
1084            IndexerArgs::default(),
1085            ClientArgs {
1086                ingestion: IngestionClientArgs {
1087                    local_ingestion_path: Some(temp_dir.path().to_owned()),
1088                    ..Default::default()
1089                },
1090                ..Default::default()
1091            },
1092            IngestionConfig::default(),
1093            None,
1094            &registry,
1095        )
1096        .await
1097        .unwrap();
1098
1099        assert_eq!(indexer.latest_checkpoint, 30);
1100    }
1101
1102    /// No watermark, no first_checkpoint, pruner with retention:
1103    /// next_checkpoint = LATEST_CHECKPOINT - retention.
1104    #[tokio::test]
1105    async fn test_next_checkpoint_with_pruner_uses_retention() {
1106        let retention = LATEST_CHECKPOINT - 1;
1107        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1108        assert_eq!(indexer.next_checkpoint, LATEST_CHECKPOINT - retention);
1109    }
1110
1111    /// No watermark, no first_checkpoint, no pruner: falls back to 0.
1112    #[tokio::test]
1113    async fn test_next_checkpoint_without_pruner_falls_back_to_genesis() {
1114        let indexer = test_next_checkpoint(None, None, ConcurrentConfig::default()).await;
1115        assert_eq!(indexer.next_checkpoint, 0);
1116    }
1117
1118    /// Watermark at 5 takes priority over latest_checkpoint - retention.
1119    #[tokio::test]
1120    async fn test_next_checkpoint_watermark_takes_priority_over_pruner() {
1121        let retention = LATEST_CHECKPOINT - 1;
1122        let indexer = test_next_checkpoint(Some(5), None, pruner_config(retention)).await;
1123        assert_eq!(indexer.next_checkpoint, 6);
1124    }
1125
1126    /// first_checkpoint takes priority over latest_checkpoint - retention when
1127    /// there's no watermark.
1128    #[tokio::test]
1129    async fn test_next_checkpoint_first_checkpoint_takes_priority_over_pruner() {
1130        let retention = LATEST_CHECKPOINT - 1;
1131        let indexer = test_next_checkpoint(None, Some(2), pruner_config(retention)).await;
1132        assert_eq!(indexer.next_checkpoint, 2);
1133    }
1134
1135    /// When retention exceeds latest_checkpoint, saturating_sub clamps to 0.
1136    #[tokio::test]
1137    async fn test_next_checkpoint_retention_exceeds_latest_checkpoint() {
1138        let retention = LATEST_CHECKPOINT + 1;
1139        let indexer = test_next_checkpoint(None, None, pruner_config(retention)).await;
1140        assert_eq!(indexer.next_checkpoint, 0);
1141    }
1142
1143    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1144    #[tokio::test]
1145    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1146        let registry = Registry::new();
1147        let store = FallibleMockStore::default();
1148
1149        test_pipeline!(A, "concurrent_a");
1150        test_pipeline!(B, "concurrent_b");
1151        test_pipeline!(C, "sequential_c");
1152        test_pipeline!(D, "sequential_d");
1153
1154        let mut conn = store.connect().await.unwrap();
1155        set_committer_watermark(&mut conn, A::NAME, 5).await;
1156        set_committer_watermark(&mut conn, B::NAME, 10).await;
1157        set_committer_watermark(&mut conn, C::NAME, 15).await;
1158        set_committer_watermark(&mut conn, D::NAME, 20).await;
1159
1160        let indexer_args = IndexerArgs {
1161            last_checkpoint: Some(29),
1162            ..Default::default()
1163        };
1164        let (mut indexer, _temp_dir) =
1165            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1166
1167        add_concurrent(&mut indexer, A).await;
1168        add_concurrent(&mut indexer, B).await;
1169        add_sequential(&mut indexer, C).await;
1170        add_sequential(&mut indexer, D).await;
1171
1172        let ingestion_metrics = indexer.ingestion_metrics().clone();
1173        let indexer_metrics = indexer.indexer_metrics().clone();
1174
1175        indexer.run().await.unwrap().join().await.unwrap();
1176
1177        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1178        assert_out_of_order!(indexer_metrics, A::NAME, 0);
1179        assert_out_of_order!(indexer_metrics, B::NAME, 5);
1180        assert_out_of_order!(indexer_metrics, C::NAME, 10);
1181        assert_out_of_order!(indexer_metrics, D::NAME, 15);
1182    }
1183
1184    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1185    #[tokio::test]
1186    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1187        let registry = Registry::new();
1188        let store = FallibleMockStore::default();
1189
1190        test_pipeline!(A, "concurrent_a");
1191        test_pipeline!(B, "concurrent_b");
1192        test_pipeline!(C, "sequential_c");
1193        test_pipeline!(D, "sequential_d");
1194
1195        let mut conn = store.connect().await.unwrap();
1196        set_committer_watermark(&mut conn, A::NAME, 5).await;
1197        set_committer_watermark(&mut conn, B::NAME, 10).await;
1198        set_committer_watermark(&mut conn, C::NAME, 15).await;
1199        set_committer_watermark(&mut conn, D::NAME, 20).await;
1200
1201        let indexer_args = IndexerArgs {
1202            first_checkpoint: Some(3),
1203            last_checkpoint: Some(29),
1204            ..Default::default()
1205        };
1206        let (mut indexer, _temp_dir) =
1207            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1208
1209        add_concurrent(&mut indexer, A).await;
1210        add_concurrent(&mut indexer, B).await;
1211        add_sequential(&mut indexer, C).await;
1212        add_sequential(&mut indexer, D).await;
1213
1214        let ingestion_metrics = indexer.ingestion_metrics().clone();
1215        let metrics = indexer.indexer_metrics().clone();
1216        indexer.run().await.unwrap().join().await.unwrap();
1217
1218        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1219        assert_out_of_order!(metrics, A::NAME, 0);
1220        assert_out_of_order!(metrics, B::NAME, 5);
1221        assert_out_of_order!(metrics, C::NAME, 10);
1222        assert_out_of_order!(metrics, D::NAME, 15);
1223    }
1224
1225    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1226    #[tokio::test]
1227    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1228        let registry = Registry::new();
1229        let store = FallibleMockStore::default();
1230
1231        test_pipeline!(A, "concurrent_a");
1232        test_pipeline!(B, "concurrent_b");
1233        test_pipeline!(C, "sequential_c");
1234        test_pipeline!(D, "sequential_d");
1235
1236        let mut conn = store.connect().await.unwrap();
1237        set_committer_watermark(&mut conn, B::NAME, 10).await;
1238        set_committer_watermark(&mut conn, C::NAME, 15).await;
1239        set_committer_watermark(&mut conn, D::NAME, 20).await;
1240
1241        let indexer_args = IndexerArgs {
1242            last_checkpoint: Some(29),
1243            ..Default::default()
1244        };
1245        let (mut indexer, _temp_dir) =
1246            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1247
1248        add_concurrent(&mut indexer, A).await;
1249        add_concurrent(&mut indexer, B).await;
1250        add_sequential(&mut indexer, C).await;
1251        add_sequential(&mut indexer, D).await;
1252
1253        let ingestion_metrics = indexer.ingestion_metrics().clone();
1254        let metrics = indexer.indexer_metrics().clone();
1255        indexer.run().await.unwrap().join().await.unwrap();
1256
1257        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1258        assert_out_of_order!(metrics, A::NAME, 0);
1259        assert_out_of_order!(metrics, B::NAME, 11);
1260        assert_out_of_order!(metrics, C::NAME, 16);
1261        assert_out_of_order!(metrics, D::NAME, 21);
1262    }
1263
1264    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1265    #[tokio::test]
1266    async fn test_indexer_ingestion_use_first_checkpoint() {
1267        let registry = Registry::new();
1268        let store = FallibleMockStore::default();
1269
1270        test_pipeline!(A, "concurrent_a");
1271        test_pipeline!(B, "concurrent_b");
1272        test_pipeline!(C, "sequential_c");
1273        test_pipeline!(D, "sequential_d");
1274
1275        let mut conn = store.connect().await.unwrap();
1276        set_committer_watermark(&mut conn, B::NAME, 10).await;
1277        set_committer_watermark(&mut conn, C::NAME, 15).await;
1278        set_committer_watermark(&mut conn, D::NAME, 20).await;
1279
1280        let indexer_args = IndexerArgs {
1281            first_checkpoint: Some(10),
1282            last_checkpoint: Some(29),
1283            ..Default::default()
1284        };
1285        let (mut indexer, _temp_dir) =
1286            create_test_indexer(store.clone(), indexer_args, &registry, Some((30, 1))).await;
1287
1288        add_concurrent(&mut indexer, A).await;
1289        add_concurrent(&mut indexer, B).await;
1290        add_sequential(&mut indexer, C).await;
1291        add_sequential(&mut indexer, D).await;
1292
1293        let ingestion_metrics = indexer.ingestion_metrics().clone();
1294        let metrics = indexer.indexer_metrics().clone();
1295        indexer.run().await.unwrap().join().await.unwrap();
1296
1297        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1298        assert_out_of_order!(metrics, A::NAME, 0);
1299        assert_out_of_order!(metrics, B::NAME, 1);
1300        assert_out_of_order!(metrics, C::NAME, 6);
1301        assert_out_of_order!(metrics, D::NAME, 11);
1302    }
1303
1304    #[tokio::test]
1305    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1306        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1307        assert_eq!(committer_watermark, None);
1308        assert_eq!(pruner_watermark, None);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1313        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1314        assert_eq!(committer_watermark, None);
1315        assert_eq!(pruner_watermark, None);
1316    }
1317
1318    #[tokio::test]
1319    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1320        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1321
1322        let committer_watermark = committer_watermark.unwrap();
1323        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1324
1325        let pruner_watermark = pruner_watermark.unwrap();
1326        assert_eq!(pruner_watermark.reader_lo, 1);
1327        assert_eq!(pruner_watermark.pruner_hi, 1);
1328    }
1329
1330    #[tokio::test]
1331    async fn test_init_watermark_sequential() {
1332        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1333
1334        let committer_watermark = committer_watermark.unwrap();
1335        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1336
1337        let pruner_watermark = pruner_watermark.unwrap();
1338        assert_eq!(pruner_watermark.reader_lo, 1);
1339        assert_eq!(pruner_watermark.pruner_hi, 1);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1344        let registry = Registry::new();
1345        let store = FallibleMockStore::default();
1346
1347        let mut conn = store.connect().await.unwrap();
1348        set_committer_watermark(&mut conn, MockHandler::NAME, 10).await;
1349        set_committer_watermark(&mut conn, SequentialHandler::NAME, 5).await;
1350
1351        let indexer_args = IndexerArgs {
1352            first_checkpoint: None,
1353            last_checkpoint: Some(19),
1354            pipeline: vec![],
1355            ..Default::default()
1356        };
1357        let (mut indexer, _temp_dir) =
1358            create_test_indexer(store.clone(), indexer_args, &registry, Some((20, 2))).await;
1359
1360        // Add first sequential pipeline
1361        add_sequential(&mut indexer, MockHandler).await;
1362
1363        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1364        assert_eq!(
1365            indexer.next_sequential_checkpoint(),
1366            Some(11),
1367            "next_sequential_checkpoint should be 11"
1368        );
1369
1370        // Add second sequential pipeline
1371        add_sequential(&mut indexer, SequentialHandler).await;
1372
1373        // Should change to 6 (minimum of 6 and 11)
1374        assert_eq!(
1375            indexer.next_sequential_checkpoint(),
1376            Some(6),
1377            "next_sequential_checkpoint should still be 6"
1378        );
1379
1380        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1381        indexer.run().await.unwrap().join().await.unwrap();
1382
1383        // Verify each pipeline made some progress independently
1384        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1385        let watermark2 = conn
1386            .committer_watermark(SequentialHandler::NAME)
1387            .await
1388            .unwrap();
1389
1390        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1391        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1392    }
1393
1394    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1395    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1396    /// committing checkpoints less than the main pipeline's reader watermark.
1397    #[tokio::test]
1398    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1399        let registry = Registry::new();
1400        let store = FallibleMockStore::default();
1401
1402        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1403        // reader watermark at `7`.
1404        let mut conn = store.connect().await.unwrap();
1405        set_committer_watermark(&mut conn, MockCheckpointSequenceNumberHandler::NAME, 10).await;
1406        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1407            .await
1408            .unwrap();
1409
1410        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1411        // be ignored by the tasked indexer.
1412        let indexer_args = IndexerArgs {
1413            first_checkpoint: Some(0),
1414            last_checkpoint: Some(15),
1415            task: TaskArgs::tasked("task".to_string(), 10),
1416            ..Default::default()
1417        };
1418        let (mut tasked_indexer, _temp_dir) =
1419            create_test_indexer(store.clone(), indexer_args, &registry, Some((16, 2))).await;
1420
1421        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1422
1423        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1424        let metrics = tasked_indexer.indexer_metrics().clone();
1425
1426        tasked_indexer.run().await.unwrap().join().await.unwrap();
1427
1428        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1429        assert_eq!(
1430            metrics
1431                .total_collector_skipped_checkpoints
1432                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1433                .unwrap()
1434                .get(),
1435            7
1436        );
1437        let data = store
1438            .data
1439            .get(MockCheckpointSequenceNumberHandler::NAME)
1440            .unwrap();
1441        assert_eq!(data.len(), 9);
1442        for i in 0..7 {
1443            assert!(data.get(&i).is_none());
1444        }
1445        for i in 7..16 {
1446            assert!(data.get(&i).is_some());
1447        }
1448    }
1449
1450    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1451    #[tokio::test]
1452    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1453        let registry = Registry::new();
1454        let store = FallibleMockStore::default();
1455
1456        let mut conn = store.connect().await.unwrap();
1457        set_committer_watermark(&mut conn, "test", 10).await;
1458        conn.set_reader_watermark("test", 5).await.unwrap();
1459
1460        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1461        // watermarks.
1462        let indexer_args = IndexerArgs {
1463            first_checkpoint: Some(9),
1464            last_checkpoint: Some(25),
1465            task: TaskArgs::tasked("task".to_string(), 10),
1466            ..Default::default()
1467        };
1468        let (mut tasked_indexer, _temp_dir) =
1469            create_test_indexer(store.clone(), indexer_args, &registry, Some((26, 2))).await;
1470
1471        add_concurrent(&mut tasked_indexer, MockCheckpointSequenceNumberHandler).await;
1472
1473        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1474        let metrics = tasked_indexer.indexer_metrics().clone();
1475
1476        tasked_indexer.run().await.unwrap().join().await.unwrap();
1477
1478        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
1479        assert_out_of_order!(metrics, "test", 0);
1480        assert_eq!(
1481            metrics
1482                .total_collector_skipped_checkpoints
1483                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1484                .unwrap()
1485                .get(),
1486            0
1487        );
1488
1489        let data = store.data.get("test").unwrap();
1490        assert_eq!(data.len(), 17);
1491        for i in 0..9 {
1492            assert!(data.get(&i).is_none());
1493        }
1494        for i in 9..26 {
1495            assert!(data.get(&i).is_some());
1496        }
1497        let main_pipeline_watermark = store.watermark("test").unwrap();
1498        // assert that the main pipeline's watermarks are not updated
1499        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
1500        assert_eq!(main_pipeline_watermark.reader_lo, 5);
1501        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
1502        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
1503        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
1504    }
1505
1506    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
1507    /// committed, and any checkpoints inflight < X will be skipped.
1508    #[tokio::test]
1509    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
1510        let registry = Registry::new();
1511        let store = FallibleMockStore::default();
1512        let mut conn = store.connect().await.unwrap();
1513        // Set the main pipeline watermark.
1514        set_committer_watermark(&mut conn, ControllableHandler::NAME, 11).await;
1515
1516        // Generate 500 checkpoints upfront, for the indexer to process all at once.
1517        let indexer_args = IndexerArgs {
1518            first_checkpoint: Some(0),
1519            last_checkpoint: Some(500),
1520            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
1521            ..Default::default()
1522        };
1523        let (mut tasked_indexer, _temp_dir) =
1524            create_test_indexer(store.clone(), indexer_args, &registry, Some((501, 2))).await;
1525        let mut allow_process = 10;
1526        // Limit the pipeline to process only checkpoints `[0, 10]`.
1527        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
1528        let _ = tasked_indexer
1529            .concurrent_pipeline(
1530                controllable_handler,
1531                ConcurrentConfig {
1532                    committer: CommitterConfig {
1533                        collect_interval_ms: 10,
1534                        watermark_interval_ms: 10,
1535                        ..Default::default()
1536                    },
1537                    // High fixed concurrency so all checkpoints can be processed
1538                    // concurrently despite out-of-order arrival.
1539                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
1540                    ..Default::default()
1541                },
1542            )
1543            .await;
1544        let metrics = tasked_indexer.indexer_metrics().clone();
1545
1546        let mut s_indexer = tasked_indexer.run().await.unwrap();
1547
1548        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
1549        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
1550        // committed.
1551        store
1552            .wait_for_watermark(
1553                &pipeline_task::<FallibleMockStore>(ControllableHandler::NAME, Some("task"))
1554                    .unwrap(),
1555                10,
1556                Duration::from_secs(10),
1557            )
1558            .await;
1559
1560        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
1561        // track_main_reader_lo task will eventually pick this up and update the atomic. The
1562        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
1563        // one at a time until the collector_reader_lo metric shows the new value.
1564        conn.set_reader_watermark(ControllableHandler::NAME, 250)
1565            .await
1566            .unwrap();
1567
1568        let reader_lo = metrics
1569            .collector_reader_lo
1570            .with_label_values(&[ControllableHandler::NAME]);
1571
1572        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
1573        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
1574        // checkpoints have been processed.
1575        let mut interval = tokio::time::interval(Duration::from_millis(10));
1576        while reader_lo.get() != 250 {
1577            interval.tick().await;
1578            // allow_process is initialized to 11, bump to 11 for the next checkpoint
1579            allow_process += 1;
1580            assert!(
1581                allow_process <= 500,
1582                "Released all checkpoints but collector never observed new reader_lo"
1583            );
1584            process_below.send(allow_process).ok();
1585        }
1586
1587        // At this point, the collector has observed reader_lo = 250. Release all remaining
1588        // checkpoints. Guarantees:
1589        // - [0, 10]: committed (before reader_lo was set)
1590        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
1591        // - (allow_process, 250): skipped (in-flight, filtered by collector)
1592        // - [250, 500]: committed (>= reader_lo)
1593        process_below.send(500).ok();
1594
1595        s_indexer.join().await.unwrap();
1596
1597        let data = store.data.get(ControllableHandler::NAME).unwrap();
1598
1599        // Checkpoints (allow_process, 250) must be skipped.
1600        for chkpt in (allow_process + 1)..250 {
1601            assert!(
1602                data.get(&chkpt).is_none(),
1603                "Checkpoint {chkpt} should have been skipped"
1604            );
1605        }
1606
1607        // Checkpoints >= reader_lo must be committed.
1608        for chkpt in 250..=500 {
1609            assert!(
1610                data.get(&chkpt).is_some(),
1611                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
1612            );
1613        }
1614
1615        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
1616        for chkpt in 0..=10 {
1617            assert!(
1618                data.get(&chkpt).is_some(),
1619                "Checkpoint {chkpt} should have been committed (baseline)"
1620            );
1621        }
1622    }
1623}