sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50/// Command-line arguments for the indexer
51#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53    /// Override the next checkpoint for all pipelines without a committer watermark to start
54    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
55    /// setting and always resume from their committer watermark + 1.
56    ///
57    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
58    /// is the minimum across all pipelines' next checkpoints.
59    #[arg(long)]
60    pub first_checkpoint: Option<u64>,
61
62    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
63    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
64    #[arg(long)]
65    pub last_checkpoint: Option<u64>,
66
67    /// Only run the following pipelines. If not provided, all pipelines found in the
68    /// configuration file will be run.
69    #[arg(long, action = clap::ArgAction::Append)]
70    pub pipeline: Vec<String>,
71
72    /// Additional configurations for running a tasked indexer.
73    #[clap(flatten)]
74    pub task: TaskArgs,
75}
76
77/// Command-line arguments for configuring a tasked indexer.
78#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
81    /// delimiter defined on the store. This allows the same pipelines to run under multiple
82    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
83    /// entries in the database.
84    ///
85    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
86    ///
87    /// Sequential pipelines cannot be attached to a tasked indexer.
88    ///
89    /// The framework ensures that tasked pipelines never commit checkpoints below the main
90    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
91    #[arg(long, requires = "reader_interval_ms")]
92    task: Option<String>,
93
94    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
95    /// refetch its main pipeline's reader watermark.
96    ///
97    /// This is required when `--task` is set and should should ideally be set to a value that is
98    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
99    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
100    /// it.
101    ///
102    /// If the main pipeline does not have pruning enabled, this value can be set to some high
103    /// value, as the tasked pipeline will never see an updated reader watermark.
104    #[arg(long, requires = "task")]
105    reader_interval_ms: Option<u64>,
106}
107
108pub struct Indexer<S: Store> {
109    /// The storage backend that the indexer uses to write and query indexed data. This
110    /// generic implementation allows for plugging in different storage solutions that implement the
111    /// `Store` trait.
112    store: S,
113
114    /// Prometheus Metrics.
115    metrics: Arc<IndexerMetrics>,
116
117    /// Service for downloading and disseminating checkpoint data.
118    ingestion_service: IngestionService,
119
120    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
121    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
122    /// always resume from their committer watermark + 1.
123    ///
124    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
125    /// is the minimum across all pipelines' next checkpoints.
126    default_next_checkpoint: u64,
127
128    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
129    /// this checkpoint.
130    last_checkpoint: Option<u64>,
131
132    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
133    /// delimiter defined on the store. This allows the same pipelines to run under multiple
134    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
135    /// entries in the database.
136    ///
137    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
138    ///
139    /// Sequential pipelines cannot be attached to a tasked indexer.
140    ///
141    /// The framework ensures that tasked pipelines never commit checkpoints below the main
142    /// pipeline’s pruner watermark.
143    task: Option<Task>,
144
145    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
146    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
147    /// a warning when the indexer is run.
148    enabled_pipelines: Option<BTreeSet<String>>,
149
150    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
151    /// with the same name isn't added twice.
152    added_pipelines: BTreeSet<&'static str>,
153
154    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
155    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
156    /// unless overridden by [Self::default_next_checkpoint].
157    first_ingestion_checkpoint: u64,
158
159    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
160    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
161    next_sequential_checkpoint: Option<u64>,
162
163    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
164    pipelines: Vec<Service>,
165}
166
167/// Configuration for a tasked indexer.
168#[derive(Clone)]
169pub(crate) struct Task {
170    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
171    /// record pipeline watermarks.
172    task: String,
173    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
174    /// pipeline's reader watermark.
175    reader_interval: Duration,
176}
177
178impl TaskArgs {
179    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
180        Self {
181            task: Some(task),
182            reader_interval_ms: Some(reader_interval_ms),
183        }
184    }
185
186    fn into_task(self) -> Option<Task> {
187        Some(Task {
188            task: self.task?,
189            reader_interval: Duration::from_millis(self.reader_interval_ms?),
190        })
191    }
192}
193
194impl<S: Store> Indexer<S> {
195    /// Create a new instance of the indexer framework from a store that implements the `Store`
196    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
197    /// arguments configure the following:
198    ///
199    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
200    ///   watermarks) and where to serve metrics from,
201    /// - Where to download checkpoints from,
202    /// - Concurrency and buffering parameters for downloading checkpoints.
203    ///
204    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
205    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
206    pub async fn new(
207        store: S,
208        indexer_args: IndexerArgs,
209        client_args: ClientArgs,
210        ingestion_config: IngestionConfig,
211        metrics_prefix: Option<&str>,
212        registry: &Registry,
213    ) -> Result<Self> {
214        let IndexerArgs {
215            first_checkpoint,
216            last_checkpoint,
217            pipeline,
218            task,
219        } = indexer_args;
220
221        let metrics = IndexerMetrics::new(metrics_prefix, registry);
222
223        let ingestion_service =
224            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
225
226        Ok(Self {
227            store,
228            metrics,
229            ingestion_service,
230            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
231            last_checkpoint,
232            task: task.into_task(),
233            enabled_pipelines: if pipeline.is_empty() {
234                None
235            } else {
236                Some(pipeline.into_iter().collect())
237            },
238            added_pipelines: BTreeSet::new(),
239            first_ingestion_checkpoint: u64::MAX,
240            next_sequential_checkpoint: None,
241            pipelines: vec![],
242        })
243    }
244
245    /// The store used by the indexer.
246    pub fn store(&self) -> &S {
247        &self.store
248    }
249
250    /// The ingestion client used by the indexer to fetch checkpoints.
251    pub fn ingestion_client(&self) -> &IngestionClient {
252        self.ingestion_service.ingestion_client()
253    }
254
255    /// The indexer's metrics.
256    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
257        &self.metrics
258    }
259
260    /// The ingestion service's metrics.
261    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
262        self.ingestion_service.metrics()
263    }
264
265    /// The pipelines that this indexer will run.
266    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
267        self.added_pipelines.iter().copied().filter(|p| {
268            self.enabled_pipelines
269                .as_ref()
270                .is_none_or(|e| e.contains(*p))
271        })
272    }
273
274    /// The minimum next checkpoint across all sequential pipelines. This value is used to
275    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
276    /// too far ahead of sequential pipelines.
277    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
278        self.next_sequential_checkpoint
279    }
280
281    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
282    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
283    ///
284    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
285    /// keep the watermark table up-to-date with the highest point they can guarantee all data
286    /// exists for, for their pipeline.
287    pub async fn concurrent_pipeline<H>(
288        &mut self,
289        handler: H,
290        config: ConcurrentConfig,
291    ) -> Result<()>
292    where
293        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
294    {
295        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
296            return Ok(());
297        };
298
299        self.pipelines.push(concurrent::pipeline::<H>(
300            handler,
301            next_checkpoint,
302            config,
303            self.store.clone(),
304            self.task.clone(),
305            self.ingestion_service.subscribe().0,
306            self.metrics.clone(),
307        ));
308
309        Ok(())
310    }
311
312    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
313    /// will start processing and committing once the ingestion service has caught up to their
314    /// respective watermarks.
315    ///
316    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
317    pub async fn run(self) -> Result<Service> {
318        if let Some(enabled_pipelines) = self.enabled_pipelines {
319            ensure!(
320                enabled_pipelines.is_empty(),
321                "Tried to enable pipelines that this indexer does not know about: \
322                {enabled_pipelines:#?}",
323            );
324        }
325
326        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
327
328        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
329
330        let mut service = self
331            .ingestion_service
332            .run(
333                self.first_ingestion_checkpoint..=last_checkpoint,
334                self.next_sequential_checkpoint,
335            )
336            .await
337            .context("Failed to start ingestion service")?;
338
339        for pipeline in self.pipelines {
340            service = service.merge(pipeline);
341        }
342
343        Ok(service)
344    }
345
346    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
347    /// checkpoint after its watermark, or if that doesn't exist, then the provided
348    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
349    ///
350    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
351    /// calculated above.
352    ///
353    /// Returns `Ok(None)` if the pipeline is disabled.
354    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
355        ensure!(
356            self.added_pipelines.insert(P::NAME),
357            "Pipeline {:?} already added",
358            P::NAME,
359        );
360
361        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
362            && !enabled_pipelines.remove(P::NAME)
363        {
364            info!(pipeline = P::NAME, "Skipping");
365            return Ok(None);
366        }
367
368        let mut conn = self
369            .store
370            .connect()
371            .await
372            .context("Failed to establish connection to store")?;
373
374        let pipeline_task =
375            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
376
377        let checkpoint_hi_inclusive = conn
378            .init_watermark(&pipeline_task, self.default_next_checkpoint)
379            .await
380            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
381
382        let next_checkpoint =
383            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
384
385        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
386
387        Ok(Some(next_checkpoint))
388    }
389}
390
391impl<T: TransactionalStore> Indexer<T> {
392    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
393    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
394    ///
395    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
396    /// required to handle pipelines that modify data in-place (where each update is not an insert,
397    /// but could be a modification of an existing row, where ordering between updates is
398    /// important).
399    ///
400    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
401    /// number of checkpoints (configured by `checkpoint_lag`).
402    pub async fn sequential_pipeline<H>(
403        &mut self,
404        handler: H,
405        config: SequentialConfig,
406    ) -> Result<()>
407    where
408        H: Handler<Store = T> + Send + Sync + 'static,
409    {
410        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
411            return Ok(());
412        };
413
414        if self.task.is_some() {
415            bail!(
416                "Sequential pipelines do not support pipeline tasks. \
417                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
418                Running the same pipeline under a different task would violate these guarantees."
419            );
420        }
421
422        // Track the minimum next_checkpoint across all sequential pipelines
423        self.next_sequential_checkpoint = Some(
424            self.next_sequential_checkpoint
425                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
426        );
427
428        let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
429
430        self.pipelines.push(sequential::pipeline::<H>(
431            handler,
432            next_checkpoint,
433            config,
434            self.store.clone(),
435            checkpoint_rx,
436            watermark_tx,
437            self.metrics.clone(),
438        ));
439
440        Ok(())
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::sync::Arc;
447
448    use async_trait::async_trait;
449    use clap::Parser;
450    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
451    use sui_synthetic_ingestion::synthetic_ingestion;
452    use tokio::sync::watch;
453
454    use crate::FieldCount;
455    use crate::ingestion::ingestion_client::IngestionClientArgs;
456    use crate::mocks::store::MockStore;
457    use crate::pipeline::CommitterConfig;
458    use crate::pipeline::Processor;
459    use crate::pipeline::concurrent::ConcurrentConfig;
460    use crate::store::CommitterWatermark;
461
462    use super::*;
463
464    #[allow(dead_code)]
465    #[derive(Clone, FieldCount)]
466    struct MockValue(u64);
467
468    /// A handler that can be controlled externally to block checkpoint processing.
469    struct ControllableHandler {
470        /// Process checkpoints less than or equal to this value.
471        process_below: watch::Receiver<u64>,
472    }
473
474    impl ControllableHandler {
475        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
476            let (tx, rx) = watch::channel(limit);
477            (Self { process_below: rx }, tx)
478        }
479    }
480
481    #[async_trait]
482    impl Processor for ControllableHandler {
483        const NAME: &'static str = "controllable";
484        /// The checkpoints to process come out of order. To account for potential flakiness in test
485        /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to
486        /// the total number of checkpoints to process, so we can ensure the correct checkpoints are
487        /// processed.
488        const FANOUT: usize = 501;
489        type Value = MockValue;
490
491        async fn process(
492            &self,
493            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
494        ) -> anyhow::Result<Vec<Self::Value>> {
495            let cp_num = checkpoint.summary.sequence_number;
496
497            // Wait until the checkpoint is allowed to be processed
498            self.process_below
499                .clone()
500                .wait_for(|&limit| cp_num <= limit)
501                .await
502                .ok();
503
504            Ok(vec![MockValue(cp_num)])
505        }
506    }
507
508    #[async_trait]
509    impl crate::pipeline::concurrent::Handler for ControllableHandler {
510        type Store = MockStore;
511        type Batch = Vec<MockValue>;
512
513        fn batch(
514            &self,
515            batch: &mut Self::Batch,
516            values: &mut std::vec::IntoIter<Self::Value>,
517        ) -> crate::pipeline::concurrent::BatchStatus {
518            batch.extend(values);
519            crate::pipeline::concurrent::BatchStatus::Ready
520        }
521
522        async fn commit<'a>(
523            &self,
524            batch: &Self::Batch,
525            conn: &mut <Self::Store as Store>::Connection<'a>,
526        ) -> anyhow::Result<usize> {
527            for value in batch {
528                conn.0
529                    .commit_data(Self::NAME, value.0, vec![value.0])
530                    .await?;
531            }
532            Ok(batch.len())
533        }
534    }
535
536    macro_rules! test_pipeline {
537        ($handler:ident, $name:literal) => {
538            struct $handler;
539
540            #[async_trait]
541            impl Processor for $handler {
542                const NAME: &'static str = $name;
543                type Value = MockValue;
544                async fn process(
545                    &self,
546                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
547                ) -> anyhow::Result<Vec<Self::Value>> {
548                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
549                }
550            }
551
552            #[async_trait]
553            impl crate::pipeline::concurrent::Handler for $handler {
554                type Store = MockStore;
555                type Batch = Vec<Self::Value>;
556
557                fn batch(
558                    &self,
559                    batch: &mut Self::Batch,
560                    values: &mut std::vec::IntoIter<Self::Value>,
561                ) -> crate::pipeline::concurrent::BatchStatus {
562                    batch.extend(values);
563                    crate::pipeline::concurrent::BatchStatus::Pending
564                }
565
566                async fn commit<'a>(
567                    &self,
568                    batch: &Self::Batch,
569                    conn: &mut <Self::Store as Store>::Connection<'a>,
570                ) -> anyhow::Result<usize> {
571                    for value in batch {
572                        conn.0
573                            .commit_data(Self::NAME, value.0, vec![value.0])
574                            .await?;
575                    }
576                    Ok(batch.len())
577                }
578            }
579
580            #[async_trait]
581            impl crate::pipeline::sequential::Handler for $handler {
582                type Store = MockStore;
583                type Batch = Vec<Self::Value>;
584
585                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
586                    batch.extend(values);
587                }
588
589                async fn commit<'a>(
590                    &self,
591                    _batch: &Self::Batch,
592                    _conn: &mut <Self::Store as Store>::Connection<'a>,
593                ) -> anyhow::Result<usize> {
594                    Ok(1)
595                }
596            }
597        };
598    }
599
600    test_pipeline!(MockHandler, "test_processor");
601    test_pipeline!(SequentialHandler, "sequential_handler");
602    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
603
604    async fn test_init_watermark(
605        first_checkpoint: Option<u64>,
606        is_concurrent: bool,
607    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
608        let registry = Registry::new();
609        let store = MockStore::default();
610
611        test_pipeline!(A, "pipeline_name");
612
613        let mut conn = store.connect().await.unwrap();
614
615        let indexer_args = IndexerArgs {
616            first_checkpoint,
617            ..IndexerArgs::default()
618        };
619        let temp_dir = tempfile::tempdir().unwrap();
620        let client_args = ClientArgs {
621            ingestion: IngestionClientArgs {
622                local_ingestion_path: Some(temp_dir.path().to_owned()),
623                ..Default::default()
624            },
625            ..Default::default()
626        };
627        let ingestion_config = IngestionConfig::default();
628
629        let mut indexer = Indexer::new(
630            store.clone(),
631            indexer_args,
632            client_args,
633            ingestion_config,
634            None,
635            &registry,
636        )
637        .await
638        .unwrap();
639
640        if is_concurrent {
641            indexer
642                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
643                .await
644                .unwrap();
645        } else {
646            indexer
647                .sequential_pipeline::<A>(A, SequentialConfig::default())
648                .await
649                .unwrap();
650        }
651
652        (
653            conn.committer_watermark(A::NAME).await.unwrap(),
654            conn.pruner_watermark(A::NAME, Duration::ZERO)
655                .await
656                .unwrap(),
657        )
658    }
659
660    #[test]
661    fn test_arg_parsing() {
662        #[derive(Parser)]
663        struct Args {
664            #[clap(flatten)]
665            indexer: IndexerArgs,
666        }
667
668        let args = Args::try_parse_from([
669            "cmd",
670            "--first-checkpoint",
671            "10",
672            "--last-checkpoint",
673            "100",
674            "--pipeline",
675            "a",
676            "--pipeline",
677            "b",
678            "--task",
679            "t",
680            "--reader-interval-ms",
681            "5000",
682        ])
683        .unwrap();
684
685        assert_eq!(args.indexer.first_checkpoint, Some(10));
686        assert_eq!(args.indexer.last_checkpoint, Some(100));
687        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
688        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
689        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
690    }
691
692    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
693    #[tokio::test]
694    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
695        let registry = Registry::new();
696        let store = MockStore::default();
697
698        test_pipeline!(A, "concurrent_a");
699        test_pipeline!(B, "concurrent_b");
700        test_pipeline!(C, "sequential_c");
701        test_pipeline!(D, "sequential_d");
702
703        let mut conn = store.connect().await.unwrap();
704        conn.set_committer_watermark(
705            A::NAME,
706            CommitterWatermark {
707                checkpoint_hi_inclusive: 100,
708                ..Default::default()
709            },
710        )
711        .await
712        .unwrap();
713        conn.set_committer_watermark(
714            B::NAME,
715            CommitterWatermark {
716                checkpoint_hi_inclusive: 10,
717                ..Default::default()
718            },
719        )
720        .await
721        .unwrap();
722        conn.set_committer_watermark(
723            C::NAME,
724            CommitterWatermark {
725                checkpoint_hi_inclusive: 1,
726                ..Default::default()
727            },
728        )
729        .await
730        .unwrap();
731        conn.set_committer_watermark(
732            D::NAME,
733            CommitterWatermark {
734                checkpoint_hi_inclusive: 50,
735                ..Default::default()
736            },
737        )
738        .await
739        .unwrap();
740
741        let indexer_args = IndexerArgs::default();
742        let temp_dir = tempfile::tempdir().unwrap();
743        let client_args = ClientArgs {
744            ingestion: IngestionClientArgs {
745                local_ingestion_path: Some(temp_dir.path().to_owned()),
746                ..Default::default()
747            },
748            ..Default::default()
749        };
750        let ingestion_config = IngestionConfig::default();
751
752        let mut indexer = Indexer::new(
753            store,
754            indexer_args,
755            client_args,
756            ingestion_config,
757            None,
758            &registry,
759        )
760        .await
761        .unwrap();
762
763        indexer
764            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
765            .await
766            .unwrap();
767        indexer
768            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
769            .await
770            .unwrap();
771        indexer
772            .sequential_pipeline::<C>(C, SequentialConfig::default())
773            .await
774            .unwrap();
775        indexer
776            .sequential_pipeline::<D>(D, SequentialConfig::default())
777            .await
778            .unwrap();
779
780        assert_eq!(indexer.first_ingestion_checkpoint, 2);
781    }
782
783    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
784    #[tokio::test]
785    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
786        let registry = Registry::new();
787        let store = MockStore::default();
788
789        test_pipeline!(A, "concurrent_a");
790        test_pipeline!(B, "concurrent_b");
791        test_pipeline!(C, "sequential_c");
792        test_pipeline!(D, "sequential_d");
793
794        let mut conn = store.connect().await.unwrap();
795        conn.set_committer_watermark(
796            B::NAME,
797            CommitterWatermark {
798                checkpoint_hi_inclusive: 10,
799                ..Default::default()
800            },
801        )
802        .await
803        .unwrap();
804        conn.set_committer_watermark(
805            C::NAME,
806            CommitterWatermark {
807                checkpoint_hi_inclusive: 1,
808                ..Default::default()
809            },
810        )
811        .await
812        .unwrap();
813
814        let indexer_args = IndexerArgs::default();
815        let temp_dir = tempfile::tempdir().unwrap();
816        let client_args = ClientArgs {
817            ingestion: IngestionClientArgs {
818                local_ingestion_path: Some(temp_dir.path().to_owned()),
819                ..Default::default()
820            },
821            ..Default::default()
822        };
823        let ingestion_config = IngestionConfig::default();
824
825        let mut indexer = Indexer::new(
826            store,
827            indexer_args,
828            client_args,
829            ingestion_config,
830            None,
831            &registry,
832        )
833        .await
834        .unwrap();
835
836        indexer
837            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
838            .await
839            .unwrap();
840        indexer
841            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
842            .await
843            .unwrap();
844        indexer
845            .sequential_pipeline::<C>(C, SequentialConfig::default())
846            .await
847            .unwrap();
848        indexer
849            .sequential_pipeline::<D>(D, SequentialConfig::default())
850            .await
851            .unwrap();
852
853        assert_eq!(indexer.first_ingestion_checkpoint, 0);
854    }
855
856    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
857    #[tokio::test]
858    async fn test_first_ingestion_checkpoint_smallest_is_0() {
859        let registry = Registry::new();
860        let store = MockStore::default();
861
862        test_pipeline!(A, "concurrent_a");
863        test_pipeline!(B, "concurrent_b");
864        test_pipeline!(C, "sequential_c");
865        test_pipeline!(D, "sequential_d");
866
867        let mut conn = store.connect().await.unwrap();
868        conn.set_committer_watermark(
869            A::NAME,
870            CommitterWatermark {
871                checkpoint_hi_inclusive: 100,
872                ..Default::default()
873            },
874        )
875        .await
876        .unwrap();
877        conn.set_committer_watermark(
878            B::NAME,
879            CommitterWatermark {
880                checkpoint_hi_inclusive: 10,
881                ..Default::default()
882            },
883        )
884        .await
885        .unwrap();
886        conn.set_committer_watermark(
887            C::NAME,
888            CommitterWatermark {
889                checkpoint_hi_inclusive: 1,
890                ..Default::default()
891            },
892        )
893        .await
894        .unwrap();
895        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
896            .await
897            .unwrap();
898
899        let indexer_args = IndexerArgs::default();
900        let temp_dir = tempfile::tempdir().unwrap();
901        let client_args = ClientArgs {
902            ingestion: IngestionClientArgs {
903                local_ingestion_path: Some(temp_dir.path().to_owned()),
904                ..Default::default()
905            },
906            ..Default::default()
907        };
908        let ingestion_config = IngestionConfig::default();
909
910        let mut indexer = Indexer::new(
911            store,
912            indexer_args,
913            client_args,
914            ingestion_config,
915            None,
916            &registry,
917        )
918        .await
919        .unwrap();
920
921        indexer
922            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
923            .await
924            .unwrap();
925        indexer
926            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
927            .await
928            .unwrap();
929        indexer
930            .sequential_pipeline::<C>(C, SequentialConfig::default())
931            .await
932            .unwrap();
933        indexer
934            .sequential_pipeline::<D>(D, SequentialConfig::default())
935            .await
936            .unwrap();
937
938        assert_eq!(indexer.first_ingestion_checkpoint, 1);
939    }
940
941    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
942    /// watermark, and first_checkpoint is smallest.
943    #[tokio::test]
944    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
945        let registry = Registry::new();
946        let store = MockStore::default();
947
948        test_pipeline!(A, "concurrent_a");
949        test_pipeline!(B, "concurrent_b");
950        test_pipeline!(C, "sequential_c");
951        test_pipeline!(D, "sequential_d");
952
953        let mut conn = store.connect().await.unwrap();
954        conn.set_committer_watermark(
955            B::NAME,
956            CommitterWatermark {
957                checkpoint_hi_inclusive: 50,
958                ..Default::default()
959            },
960        )
961        .await
962        .unwrap();
963        conn.set_committer_watermark(
964            C::NAME,
965            CommitterWatermark {
966                checkpoint_hi_inclusive: 10,
967                ..Default::default()
968            },
969        )
970        .await
971        .unwrap();
972
973        let indexer_args = IndexerArgs {
974            first_checkpoint: Some(5),
975            ..Default::default()
976        };
977        let temp_dir = tempfile::tempdir().unwrap();
978        let client_args = ClientArgs {
979            ingestion: IngestionClientArgs {
980                local_ingestion_path: Some(temp_dir.path().to_owned()),
981                ..Default::default()
982            },
983            ..Default::default()
984        };
985        let ingestion_config = IngestionConfig::default();
986
987        let mut indexer = Indexer::new(
988            store,
989            indexer_args,
990            client_args,
991            ingestion_config,
992            None,
993            &registry,
994        )
995        .await
996        .unwrap();
997
998        indexer
999            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1000            .await
1001            .unwrap();
1002        indexer
1003            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1004            .await
1005            .unwrap();
1006        indexer
1007            .sequential_pipeline::<C>(C, SequentialConfig::default())
1008            .await
1009            .unwrap();
1010        indexer
1011            .sequential_pipeline::<D>(D, SequentialConfig::default())
1012            .await
1013            .unwrap();
1014
1015        assert_eq!(indexer.first_ingestion_checkpoint, 5);
1016    }
1017
1018    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1019    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1020    #[tokio::test]
1021    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1022        let registry = Registry::new();
1023        let store = MockStore::default();
1024
1025        test_pipeline!(B, "concurrent_b");
1026        test_pipeline!(C, "sequential_c");
1027
1028        let mut conn = store.connect().await.unwrap();
1029        conn.set_committer_watermark(
1030            B::NAME,
1031            CommitterWatermark {
1032                checkpoint_hi_inclusive: 50,
1033                ..Default::default()
1034            },
1035        )
1036        .await
1037        .unwrap();
1038        conn.set_committer_watermark(
1039            C::NAME,
1040            CommitterWatermark {
1041                checkpoint_hi_inclusive: 10,
1042                ..Default::default()
1043            },
1044        )
1045        .await
1046        .unwrap();
1047
1048        let indexer_args = IndexerArgs {
1049            first_checkpoint: Some(5),
1050            ..Default::default()
1051        };
1052        let temp_dir = tempfile::tempdir().unwrap();
1053        let client_args = ClientArgs {
1054            ingestion: IngestionClientArgs {
1055                local_ingestion_path: Some(temp_dir.path().to_owned()),
1056                ..Default::default()
1057            },
1058            ..Default::default()
1059        };
1060        let ingestion_config = IngestionConfig::default();
1061
1062        let mut indexer = Indexer::new(
1063            store,
1064            indexer_args,
1065            client_args,
1066            ingestion_config,
1067            None,
1068            &registry,
1069        )
1070        .await
1071        .unwrap();
1072
1073        indexer
1074            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1075            .await
1076            .unwrap();
1077        indexer
1078            .sequential_pipeline::<C>(C, SequentialConfig::default())
1079            .await
1080            .unwrap();
1081
1082        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1083    }
1084
1085    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1086    /// will not be used as the starting point if it is not the smallest valid committer watermark
1087    /// to resume ingesting from.
1088    #[tokio::test]
1089    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1090        let registry = Registry::new();
1091        let store = MockStore::default();
1092
1093        test_pipeline!(A, "concurrent_a");
1094        test_pipeline!(B, "concurrent_b");
1095        test_pipeline!(C, "sequential_c");
1096
1097        let mut conn = store.connect().await.unwrap();
1098        conn.set_committer_watermark(
1099            B::NAME,
1100            CommitterWatermark {
1101                checkpoint_hi_inclusive: 50,
1102                ..Default::default()
1103            },
1104        )
1105        .await
1106        .unwrap();
1107        conn.set_committer_watermark(
1108            C::NAME,
1109            CommitterWatermark {
1110                checkpoint_hi_inclusive: 10,
1111                ..Default::default()
1112            },
1113        )
1114        .await
1115        .unwrap();
1116
1117        let indexer_args = IndexerArgs {
1118            first_checkpoint: Some(24),
1119            ..Default::default()
1120        };
1121        let temp_dir = tempfile::tempdir().unwrap();
1122        let client_args = ClientArgs {
1123            ingestion: IngestionClientArgs {
1124                local_ingestion_path: Some(temp_dir.path().to_owned()),
1125                ..Default::default()
1126            },
1127            ..Default::default()
1128        };
1129        let ingestion_config = IngestionConfig::default();
1130
1131        let mut indexer = Indexer::new(
1132            store,
1133            indexer_args,
1134            client_args,
1135            ingestion_config,
1136            None,
1137            &registry,
1138        )
1139        .await
1140        .unwrap();
1141
1142        indexer
1143            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1144            .await
1145            .unwrap();
1146
1147        indexer
1148            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1149            .await
1150            .unwrap();
1151
1152        indexer
1153            .sequential_pipeline::<C>(C, SequentialConfig::default())
1154            .await
1155            .unwrap();
1156
1157        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1158    }
1159
1160    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1161    #[tokio::test]
1162    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1163        let registry = Registry::new();
1164        let store = MockStore::default();
1165
1166        test_pipeline!(A, "concurrent_a");
1167        test_pipeline!(B, "concurrent_b");
1168        test_pipeline!(C, "sequential_c");
1169        test_pipeline!(D, "sequential_d");
1170
1171        let mut conn = store.connect().await.unwrap();
1172        conn.set_committer_watermark(
1173            A::NAME,
1174            CommitterWatermark {
1175                checkpoint_hi_inclusive: 5,
1176                ..Default::default()
1177            },
1178        )
1179        .await
1180        .unwrap();
1181        conn.set_committer_watermark(
1182            B::NAME,
1183            CommitterWatermark {
1184                checkpoint_hi_inclusive: 10,
1185                ..Default::default()
1186            },
1187        )
1188        .await
1189        .unwrap();
1190        conn.set_committer_watermark(
1191            C::NAME,
1192            CommitterWatermark {
1193                checkpoint_hi_inclusive: 15,
1194                ..Default::default()
1195            },
1196        )
1197        .await
1198        .unwrap();
1199        conn.set_committer_watermark(
1200            D::NAME,
1201            CommitterWatermark {
1202                checkpoint_hi_inclusive: 20,
1203                ..Default::default()
1204            },
1205        )
1206        .await
1207        .unwrap();
1208
1209        // Create synthetic ingestion data
1210        let temp_dir = tempfile::tempdir().unwrap();
1211        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1212            ingestion_dir: temp_dir.path().to_owned(),
1213            starting_checkpoint: 5,
1214            num_checkpoints: 25,
1215            checkpoint_size: 1,
1216        })
1217        .await;
1218
1219        let indexer_args = IndexerArgs {
1220            last_checkpoint: Some(29),
1221            ..Default::default()
1222        };
1223
1224        let client_args = ClientArgs {
1225            ingestion: IngestionClientArgs {
1226                local_ingestion_path: Some(temp_dir.path().to_owned()),
1227                ..Default::default()
1228            },
1229            ..Default::default()
1230        };
1231
1232        let ingestion_config = IngestionConfig::default();
1233
1234        let mut indexer = Indexer::new(
1235            store.clone(),
1236            indexer_args,
1237            client_args,
1238            ingestion_config,
1239            None,
1240            &registry,
1241        )
1242        .await
1243        .unwrap();
1244
1245        indexer
1246            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1247            .await
1248            .unwrap();
1249        indexer
1250            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1251            .await
1252            .unwrap();
1253        indexer
1254            .sequential_pipeline::<C>(C, SequentialConfig::default())
1255            .await
1256            .unwrap();
1257        indexer
1258            .sequential_pipeline::<D>(D, SequentialConfig::default())
1259            .await
1260            .unwrap();
1261
1262        let ingestion_metrics = indexer.ingestion_metrics().clone();
1263        let indexer_metrics = indexer.indexer_metrics().clone();
1264
1265        indexer.run().await.unwrap().join().await.unwrap();
1266
1267        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1268        assert_eq!(
1269            indexer_metrics
1270                .total_watermarks_out_of_order
1271                .get_metric_with_label_values(&[A::NAME])
1272                .unwrap()
1273                .get(),
1274            0
1275        );
1276        assert_eq!(
1277            indexer_metrics
1278                .total_watermarks_out_of_order
1279                .get_metric_with_label_values(&[B::NAME])
1280                .unwrap()
1281                .get(),
1282            5
1283        );
1284        assert_eq!(
1285            indexer_metrics
1286                .total_watermarks_out_of_order
1287                .get_metric_with_label_values(&[C::NAME])
1288                .unwrap()
1289                .get(),
1290            10
1291        );
1292        assert_eq!(
1293            indexer_metrics
1294                .total_watermarks_out_of_order
1295                .get_metric_with_label_values(&[D::NAME])
1296                .unwrap()
1297                .get(),
1298            15
1299        );
1300    }
1301
1302    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1303    #[tokio::test]
1304    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1305        let registry = Registry::new();
1306        let store = MockStore::default();
1307
1308        test_pipeline!(A, "concurrent_a");
1309        test_pipeline!(B, "concurrent_b");
1310        test_pipeline!(C, "sequential_c");
1311        test_pipeline!(D, "sequential_d");
1312
1313        let mut conn = store.connect().await.unwrap();
1314        conn.set_committer_watermark(
1315            A::NAME,
1316            CommitterWatermark {
1317                checkpoint_hi_inclusive: 5,
1318                ..Default::default()
1319            },
1320        )
1321        .await
1322        .unwrap();
1323        conn.set_committer_watermark(
1324            B::NAME,
1325            CommitterWatermark {
1326                checkpoint_hi_inclusive: 10,
1327                ..Default::default()
1328            },
1329        )
1330        .await
1331        .unwrap();
1332        conn.set_committer_watermark(
1333            C::NAME,
1334            CommitterWatermark {
1335                checkpoint_hi_inclusive: 15,
1336                ..Default::default()
1337            },
1338        )
1339        .await
1340        .unwrap();
1341        conn.set_committer_watermark(
1342            D::NAME,
1343            CommitterWatermark {
1344                checkpoint_hi_inclusive: 20,
1345                ..Default::default()
1346            },
1347        )
1348        .await
1349        .unwrap();
1350
1351        // Create synthetic ingestion data
1352        let temp_dir = tempfile::tempdir().unwrap();
1353        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1354            ingestion_dir: temp_dir.path().to_owned(),
1355            starting_checkpoint: 5,
1356            num_checkpoints: 25,
1357            checkpoint_size: 1,
1358        })
1359        .await;
1360
1361        let indexer_args = IndexerArgs {
1362            first_checkpoint: Some(3),
1363            last_checkpoint: Some(29),
1364            ..Default::default()
1365        };
1366
1367        let client_args = ClientArgs {
1368            ingestion: IngestionClientArgs {
1369                local_ingestion_path: Some(temp_dir.path().to_owned()),
1370                ..Default::default()
1371            },
1372            ..Default::default()
1373        };
1374
1375        let ingestion_config = IngestionConfig::default();
1376
1377        let mut indexer = Indexer::new(
1378            store.clone(),
1379            indexer_args,
1380            client_args,
1381            ingestion_config,
1382            None,
1383            &registry,
1384        )
1385        .await
1386        .unwrap();
1387
1388        indexer
1389            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1390            .await
1391            .unwrap();
1392        indexer
1393            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1394            .await
1395            .unwrap();
1396        indexer
1397            .sequential_pipeline::<C>(C, SequentialConfig::default())
1398            .await
1399            .unwrap();
1400        indexer
1401            .sequential_pipeline::<D>(D, SequentialConfig::default())
1402            .await
1403            .unwrap();
1404
1405        let ingestion_metrics = indexer.ingestion_metrics().clone();
1406        let metrics = indexer.indexer_metrics().clone();
1407        indexer.run().await.unwrap().join().await.unwrap();
1408
1409        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1410        assert_eq!(
1411            metrics
1412                .total_watermarks_out_of_order
1413                .get_metric_with_label_values(&[A::NAME])
1414                .unwrap()
1415                .get(),
1416            0
1417        );
1418        assert_eq!(
1419            metrics
1420                .total_watermarks_out_of_order
1421                .get_metric_with_label_values(&[B::NAME])
1422                .unwrap()
1423                .get(),
1424            5
1425        );
1426        assert_eq!(
1427            metrics
1428                .total_watermarks_out_of_order
1429                .get_metric_with_label_values(&[C::NAME])
1430                .unwrap()
1431                .get(),
1432            10
1433        );
1434        assert_eq!(
1435            metrics
1436                .total_watermarks_out_of_order
1437                .get_metric_with_label_values(&[D::NAME])
1438                .unwrap()
1439                .get(),
1440            15
1441        );
1442    }
1443
1444    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1445    #[tokio::test]
1446    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1447        let registry = Registry::new();
1448        let store = MockStore::default();
1449
1450        test_pipeline!(A, "concurrent_a");
1451        test_pipeline!(B, "concurrent_b");
1452        test_pipeline!(C, "sequential_c");
1453        test_pipeline!(D, "sequential_d");
1454
1455        let mut conn = store.connect().await.unwrap();
1456        conn.set_committer_watermark(
1457            B::NAME,
1458            CommitterWatermark {
1459                checkpoint_hi_inclusive: 10,
1460                ..Default::default()
1461            },
1462        )
1463        .await
1464        .unwrap();
1465        conn.set_committer_watermark(
1466            C::NAME,
1467            CommitterWatermark {
1468                checkpoint_hi_inclusive: 15,
1469                ..Default::default()
1470            },
1471        )
1472        .await
1473        .unwrap();
1474        conn.set_committer_watermark(
1475            D::NAME,
1476            CommitterWatermark {
1477                checkpoint_hi_inclusive: 20,
1478                ..Default::default()
1479            },
1480        )
1481        .await
1482        .unwrap();
1483
1484        // Create synthetic ingestion data
1485        let temp_dir = tempfile::tempdir().unwrap();
1486        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1487            ingestion_dir: temp_dir.path().to_owned(),
1488            starting_checkpoint: 0,
1489            num_checkpoints: 30,
1490            checkpoint_size: 1,
1491        })
1492        .await;
1493
1494        let indexer_args = IndexerArgs {
1495            last_checkpoint: Some(29),
1496            ..Default::default()
1497        };
1498
1499        let client_args = ClientArgs {
1500            ingestion: IngestionClientArgs {
1501                local_ingestion_path: Some(temp_dir.path().to_owned()),
1502                ..Default::default()
1503            },
1504            ..Default::default()
1505        };
1506
1507        let ingestion_config = IngestionConfig::default();
1508
1509        let mut indexer = Indexer::new(
1510            store.clone(),
1511            indexer_args,
1512            client_args,
1513            ingestion_config,
1514            None,
1515            &registry,
1516        )
1517        .await
1518        .unwrap();
1519
1520        indexer
1521            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1522            .await
1523            .unwrap();
1524        indexer
1525            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1526            .await
1527            .unwrap();
1528        indexer
1529            .sequential_pipeline::<C>(C, SequentialConfig::default())
1530            .await
1531            .unwrap();
1532        indexer
1533            .sequential_pipeline::<D>(D, SequentialConfig::default())
1534            .await
1535            .unwrap();
1536
1537        let ingestion_metrics = indexer.ingestion_metrics().clone();
1538        let metrics = indexer.indexer_metrics().clone();
1539        indexer.run().await.unwrap().join().await.unwrap();
1540
1541        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1542        assert_eq!(
1543            metrics
1544                .total_watermarks_out_of_order
1545                .get_metric_with_label_values(&[A::NAME])
1546                .unwrap()
1547                .get(),
1548            0
1549        );
1550        assert_eq!(
1551            metrics
1552                .total_watermarks_out_of_order
1553                .get_metric_with_label_values(&[B::NAME])
1554                .unwrap()
1555                .get(),
1556            11
1557        );
1558        assert_eq!(
1559            metrics
1560                .total_watermarks_out_of_order
1561                .get_metric_with_label_values(&[C::NAME])
1562                .unwrap()
1563                .get(),
1564            16
1565        );
1566        assert_eq!(
1567            metrics
1568                .total_watermarks_out_of_order
1569                .get_metric_with_label_values(&[D::NAME])
1570                .unwrap()
1571                .get(),
1572            21
1573        );
1574    }
1575
1576    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1577    #[tokio::test]
1578    async fn test_indexer_ingestion_use_first_checkpoint() {
1579        let registry = Registry::new();
1580        let store = MockStore::default();
1581
1582        test_pipeline!(A, "concurrent_a");
1583        test_pipeline!(B, "concurrent_b");
1584        test_pipeline!(C, "sequential_c");
1585        test_pipeline!(D, "sequential_d");
1586
1587        let mut conn = store.connect().await.unwrap();
1588        conn.set_committer_watermark(
1589            B::NAME,
1590            CommitterWatermark {
1591                checkpoint_hi_inclusive: 10,
1592                ..Default::default()
1593            },
1594        )
1595        .await
1596        .unwrap();
1597        conn.set_committer_watermark(
1598            C::NAME,
1599            CommitterWatermark {
1600                checkpoint_hi_inclusive: 15,
1601                ..Default::default()
1602            },
1603        )
1604        .await
1605        .unwrap();
1606        conn.set_committer_watermark(
1607            D::NAME,
1608            CommitterWatermark {
1609                checkpoint_hi_inclusive: 20,
1610                ..Default::default()
1611            },
1612        )
1613        .await
1614        .unwrap();
1615
1616        // Create synthetic ingestion data
1617        let temp_dir = tempfile::tempdir().unwrap();
1618        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1619            ingestion_dir: temp_dir.path().to_owned(),
1620            starting_checkpoint: 5,
1621            num_checkpoints: 25,
1622            checkpoint_size: 1,
1623        })
1624        .await;
1625
1626        let indexer_args = IndexerArgs {
1627            first_checkpoint: Some(10),
1628            last_checkpoint: Some(29),
1629            ..Default::default()
1630        };
1631
1632        let client_args = ClientArgs {
1633            ingestion: IngestionClientArgs {
1634                local_ingestion_path: Some(temp_dir.path().to_owned()),
1635                ..Default::default()
1636            },
1637            ..Default::default()
1638        };
1639
1640        let ingestion_config = IngestionConfig::default();
1641
1642        let mut indexer = Indexer::new(
1643            store.clone(),
1644            indexer_args,
1645            client_args,
1646            ingestion_config,
1647            None,
1648            &registry,
1649        )
1650        .await
1651        .unwrap();
1652
1653        indexer
1654            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1655            .await
1656            .unwrap();
1657        indexer
1658            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1659            .await
1660            .unwrap();
1661        indexer
1662            .sequential_pipeline::<C>(C, SequentialConfig::default())
1663            .await
1664            .unwrap();
1665        indexer
1666            .sequential_pipeline::<D>(D, SequentialConfig::default())
1667            .await
1668            .unwrap();
1669
1670        let ingestion_metrics = indexer.ingestion_metrics().clone();
1671        let metrics = indexer.indexer_metrics().clone();
1672        indexer.run().await.unwrap().join().await.unwrap();
1673
1674        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1675        assert_eq!(
1676            metrics
1677                .total_watermarks_out_of_order
1678                .get_metric_with_label_values(&[A::NAME])
1679                .unwrap()
1680                .get(),
1681            0
1682        );
1683        assert_eq!(
1684            metrics
1685                .total_watermarks_out_of_order
1686                .get_metric_with_label_values(&[B::NAME])
1687                .unwrap()
1688                .get(),
1689            1
1690        );
1691        assert_eq!(
1692            metrics
1693                .total_watermarks_out_of_order
1694                .get_metric_with_label_values(&[C::NAME])
1695                .unwrap()
1696                .get(),
1697            6
1698        );
1699        assert_eq!(
1700            metrics
1701                .total_watermarks_out_of_order
1702                .get_metric_with_label_values(&[D::NAME])
1703                .unwrap()
1704                .get(),
1705            11
1706        );
1707    }
1708
1709    #[tokio::test]
1710    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1711        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1712        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1713        assert_eq!(committer_watermark, None);
1714        assert_eq!(pruner_watermark, None);
1715    }
1716
1717    #[tokio::test]
1718    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1719        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1720        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1721        assert_eq!(committer_watermark, None);
1722        assert_eq!(pruner_watermark, None);
1723    }
1724
1725    #[tokio::test]
1726    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1727        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1728
1729        let committer_watermark = committer_watermark.unwrap();
1730        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1731
1732        let pruner_watermark = pruner_watermark.unwrap();
1733        assert_eq!(pruner_watermark.reader_lo, 1);
1734        assert_eq!(pruner_watermark.pruner_hi, 1);
1735    }
1736
1737    #[tokio::test]
1738    async fn test_init_watermark_sequential() {
1739        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1740
1741        let committer_watermark = committer_watermark.unwrap();
1742        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1743
1744        let pruner_watermark = pruner_watermark.unwrap();
1745        assert_eq!(pruner_watermark.reader_lo, 1);
1746        assert_eq!(pruner_watermark.pruner_hi, 1);
1747    }
1748
1749    #[tokio::test]
1750    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1751        let registry = Registry::new();
1752        let store = MockStore::default();
1753
1754        // Set up different watermarks for three different sequential pipelines
1755        let mut conn = store.connect().await.unwrap();
1756
1757        // First handler at checkpoint 10
1758        conn.set_committer_watermark(
1759            MockHandler::NAME,
1760            CommitterWatermark {
1761                epoch_hi_inclusive: 0,
1762                checkpoint_hi_inclusive: 10,
1763                tx_hi: 20,
1764                timestamp_ms_hi_inclusive: 10000,
1765            },
1766        )
1767        .await
1768        .unwrap();
1769
1770        // SequentialHandler at checkpoint 5
1771        conn.set_committer_watermark(
1772            SequentialHandler::NAME,
1773            CommitterWatermark {
1774                epoch_hi_inclusive: 0,
1775                checkpoint_hi_inclusive: 5,
1776                tx_hi: 10,
1777                timestamp_ms_hi_inclusive: 5000,
1778            },
1779        )
1780        .await
1781        .unwrap();
1782
1783        // Create synthetic ingestion data
1784        let temp_dir = tempfile::tempdir().unwrap();
1785        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1786            ingestion_dir: temp_dir.path().to_owned(),
1787            starting_checkpoint: 0,
1788            num_checkpoints: 20,
1789            checkpoint_size: 2,
1790        })
1791        .await;
1792
1793        let indexer_args = IndexerArgs {
1794            first_checkpoint: None,
1795            last_checkpoint: Some(19),
1796            pipeline: vec![],
1797            ..Default::default()
1798        };
1799
1800        let client_args = ClientArgs {
1801            ingestion: IngestionClientArgs {
1802                local_ingestion_path: Some(temp_dir.path().to_owned()),
1803                ..Default::default()
1804            },
1805            ..Default::default()
1806        };
1807
1808        let ingestion_config = IngestionConfig::default();
1809
1810        let mut indexer = Indexer::new(
1811            store.clone(),
1812            indexer_args,
1813            client_args,
1814            ingestion_config,
1815            None,
1816            &registry,
1817        )
1818        .await
1819        .unwrap();
1820
1821        // Add first sequential pipeline
1822        indexer
1823            .sequential_pipeline(
1824                MockHandler,
1825                pipeline::sequential::SequentialConfig::default(),
1826            )
1827            .await
1828            .unwrap();
1829
1830        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1831        assert_eq!(
1832            indexer.next_sequential_checkpoint(),
1833            Some(11),
1834            "next_sequential_checkpoint should be 11"
1835        );
1836
1837        // Add second sequential pipeline
1838        indexer
1839            .sequential_pipeline(
1840                SequentialHandler,
1841                pipeline::sequential::SequentialConfig::default(),
1842            )
1843            .await
1844            .unwrap();
1845
1846        // Should change to 6 (minimum of 6 and 11)
1847        assert_eq!(
1848            indexer.next_sequential_checkpoint(),
1849            Some(6),
1850            "next_sequential_checkpoint should still be 6"
1851        );
1852
1853        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1854        indexer.run().await.unwrap().join().await.unwrap();
1855
1856        // Verify each pipeline made some progress independently
1857        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1858        let watermark2 = conn
1859            .committer_watermark(SequentialHandler::NAME)
1860            .await
1861            .unwrap();
1862
1863        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1864        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1865    }
1866
1867    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1868    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1869    /// committing checkpoints less than the main pipeline's reader watermark.
1870    #[tokio::test]
1871    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1872        let registry = Registry::new();
1873        let store = MockStore::default();
1874
1875        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1876        // reader watermark at `7`.
1877        let mut conn = store.connect().await.unwrap();
1878        conn.set_committer_watermark(
1879            MockCheckpointSequenceNumberHandler::NAME,
1880            CommitterWatermark {
1881                checkpoint_hi_inclusive: 10,
1882                ..Default::default()
1883            },
1884        )
1885        .await
1886        .unwrap();
1887        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1888            .await
1889            .unwrap();
1890
1891        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1892        // be ignored by the tasked indexer.
1893        let indexer_args = IndexerArgs {
1894            first_checkpoint: Some(0),
1895            last_checkpoint: Some(15),
1896            pipeline: vec![],
1897            task: TaskArgs::tasked("task".to_string(), 10),
1898        };
1899        let temp_dir = tempfile::tempdir().unwrap();
1900        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1901            ingestion_dir: temp_dir.path().to_owned(),
1902            starting_checkpoint: 0,
1903            num_checkpoints: 16,
1904            checkpoint_size: 2,
1905        })
1906        .await;
1907
1908        let client_args = ClientArgs {
1909            ingestion: IngestionClientArgs {
1910                local_ingestion_path: Some(temp_dir.path().to_owned()),
1911                ..Default::default()
1912            },
1913            ..Default::default()
1914        };
1915
1916        let ingestion_config = IngestionConfig::default();
1917
1918        let mut tasked_indexer = Indexer::new(
1919            store.clone(),
1920            indexer_args,
1921            client_args,
1922            ingestion_config,
1923            None,
1924            &registry,
1925        )
1926        .await
1927        .unwrap();
1928
1929        let _ = tasked_indexer
1930            .concurrent_pipeline(
1931                MockCheckpointSequenceNumberHandler,
1932                ConcurrentConfig::default(),
1933            )
1934            .await;
1935
1936        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1937        let metrics = tasked_indexer.indexer_metrics().clone();
1938
1939        tasked_indexer.run().await.unwrap().join().await.unwrap();
1940
1941        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1942        assert_eq!(
1943            metrics
1944                .total_collector_skipped_checkpoints
1945                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1946                .unwrap()
1947                .get(),
1948            7
1949        );
1950        let data = store
1951            .data
1952            .get(MockCheckpointSequenceNumberHandler::NAME)
1953            .unwrap();
1954        assert_eq!(data.len(), 9);
1955        for i in 0..7 {
1956            assert!(data.get(&i).is_none());
1957        }
1958        for i in 7..16 {
1959            assert!(data.get(&i).is_some());
1960        }
1961    }
1962
1963    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1964    #[tokio::test]
1965    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1966        let registry = Registry::new();
1967        let store = MockStore::default();
1968
1969        let mut conn = store.connect().await.unwrap();
1970        conn.set_committer_watermark(
1971            "test",
1972            CommitterWatermark {
1973                checkpoint_hi_inclusive: 10,
1974                ..Default::default()
1975            },
1976        )
1977        .await
1978        .unwrap();
1979        conn.set_reader_watermark("test", 5).await.unwrap();
1980
1981        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1982        // watermarks.
1983        let indexer_args = IndexerArgs {
1984            first_checkpoint: Some(9),
1985            last_checkpoint: Some(25),
1986            pipeline: vec![],
1987            task: TaskArgs::tasked("task".to_string(), 10),
1988        };
1989        let temp_dir = tempfile::tempdir().unwrap();
1990        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1991            ingestion_dir: temp_dir.path().to_owned(),
1992            starting_checkpoint: 9,
1993            num_checkpoints: 17,
1994            checkpoint_size: 2,
1995        })
1996        .await;
1997
1998        let client_args = ClientArgs {
1999            ingestion: IngestionClientArgs {
2000                local_ingestion_path: Some(temp_dir.path().to_owned()),
2001                ..Default::default()
2002            },
2003            ..Default::default()
2004        };
2005
2006        let ingestion_config = IngestionConfig::default();
2007
2008        let mut tasked_indexer = Indexer::new(
2009            store.clone(),
2010            indexer_args,
2011            client_args,
2012            ingestion_config,
2013            None,
2014            &registry,
2015        )
2016        .await
2017        .unwrap();
2018
2019        let _ = tasked_indexer
2020            .concurrent_pipeline(
2021                MockCheckpointSequenceNumberHandler,
2022                ConcurrentConfig::default(),
2023            )
2024            .await;
2025
2026        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2027        let metrics = tasked_indexer.indexer_metrics().clone();
2028
2029        tasked_indexer.run().await.unwrap().join().await.unwrap();
2030
2031        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2032        assert_eq!(
2033            metrics
2034                .total_watermarks_out_of_order
2035                .get_metric_with_label_values(&["test"])
2036                .unwrap()
2037                .get(),
2038            0
2039        );
2040        assert_eq!(
2041            metrics
2042                .total_collector_skipped_checkpoints
2043                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2044                .unwrap()
2045                .get(),
2046            0
2047        );
2048
2049        let data = store.data.get("test").unwrap();
2050        assert!(data.len() == 17);
2051        for i in 0..9 {
2052            assert!(data.get(&i).is_none());
2053        }
2054        for i in 9..26 {
2055            assert!(data.get(&i).is_some());
2056        }
2057        let main_pipeline_watermark = store.watermark("test").unwrap();
2058        // assert that the main pipeline's watermarks are not updated
2059        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2060        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2061        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2062        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2063        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2064    }
2065
2066    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2067    /// committed, and any checkpoints inflight < X will be skipped.
2068    #[tokio::test]
2069    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2070        let registry = Registry::new();
2071        let store = MockStore::default();
2072        let mut conn = store.connect().await.unwrap();
2073        // Set the main pipeline watermark.
2074        conn.set_committer_watermark(
2075            ControllableHandler::NAME,
2076            CommitterWatermark {
2077                checkpoint_hi_inclusive: 11,
2078                ..Default::default()
2079            },
2080        )
2081        .await
2082        .unwrap();
2083
2084        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2085        let temp_dir = tempfile::tempdir().unwrap();
2086        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2087            ingestion_dir: temp_dir.path().to_owned(),
2088            starting_checkpoint: 0,
2089            num_checkpoints: 501,
2090            checkpoint_size: 2,
2091        })
2092        .await;
2093        let indexer_args = IndexerArgs {
2094            first_checkpoint: Some(0),
2095            last_checkpoint: Some(500),
2096            pipeline: vec![],
2097            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2098        };
2099        let client_args = ClientArgs {
2100            ingestion: IngestionClientArgs {
2101                local_ingestion_path: Some(temp_dir.path().to_owned()),
2102                ..Default::default()
2103            },
2104            ..Default::default()
2105        };
2106        let ingestion_config = IngestionConfig::default();
2107        let mut tasked_indexer = Indexer::new(
2108            store.clone(),
2109            indexer_args,
2110            client_args,
2111            ingestion_config,
2112            None,
2113            &registry,
2114        )
2115        .await
2116        .unwrap();
2117        let mut allow_process = 10;
2118        // Limit the pipeline to process only checkpoints `[0, 10]`.
2119        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2120        let _ = tasked_indexer
2121            .concurrent_pipeline(
2122                controllable_handler,
2123                ConcurrentConfig {
2124                    committer: CommitterConfig {
2125                        collect_interval_ms: 10,
2126                        watermark_interval_ms: 10,
2127                        ..Default::default()
2128                    },
2129                    ..Default::default()
2130                },
2131            )
2132            .await;
2133        let metrics = tasked_indexer.indexer_metrics().clone();
2134
2135        let mut s_indexer = tasked_indexer.run().await.unwrap();
2136
2137        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2138        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2139        // committed.
2140        store
2141            .wait_for_watermark(
2142                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2143                10,
2144                std::time::Duration::from_secs(10),
2145            )
2146            .await;
2147
2148        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2149        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2150        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2151        // one at a time until the collector_reader_lo metric shows the new value.
2152        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2153            .await
2154            .unwrap();
2155
2156        let reader_lo = metrics
2157            .collector_reader_lo
2158            .with_label_values(&[ControllableHandler::NAME]);
2159
2160        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2161        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2162        // checkpoints have been processed.
2163        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2164        while reader_lo.get() != 250 {
2165            interval.tick().await;
2166            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2167            allow_process += 1;
2168            assert!(
2169                allow_process <= 500,
2170                "Released all checkpoints but collector never observed new reader_lo"
2171            );
2172            process_below.send(allow_process).ok();
2173        }
2174
2175        // At this point, the collector has observed reader_lo = 250. Release all remaining
2176        // checkpoints. Guarantees:
2177        // - [0, 10]: committed (before reader_lo was set)
2178        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2179        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2180        // - [250, 500]: committed (>= reader_lo)
2181        process_below.send(500).ok();
2182
2183        s_indexer.join().await.unwrap();
2184
2185        let data = store.data.get(ControllableHandler::NAME).unwrap();
2186
2187        // Checkpoints (allow_process, 250) must be skipped.
2188        for chkpt in (allow_process + 1)..250 {
2189            assert!(
2190                data.get(&chkpt).is_none(),
2191                "Checkpoint {chkpt} should have been skipped"
2192            );
2193        }
2194
2195        // Checkpoints >= reader_lo must be committed.
2196        for chkpt in 250..=500 {
2197            assert!(
2198                data.get(&chkpt).is_some(),
2199                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2200            );
2201        }
2202
2203        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2204        for chkpt in 0..=10 {
2205            assert!(
2206                data.get(&chkpt).is_some(),
2207                "Checkpoint {chkpt} should have been committed (baseline)"
2208            );
2209        }
2210    }
2211}