sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50/// Command-line arguments for the indexer
51#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53    /// Override the next checkpoint for all pipelines without a committer watermark to start
54    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
55    /// setting and always resume from their committer watermark + 1.
56    ///
57    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
58    /// is the minimum across all pipelines' next checkpoints.
59    #[arg(long)]
60    pub first_checkpoint: Option<u64>,
61
62    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
63    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
64    #[arg(long)]
65    pub last_checkpoint: Option<u64>,
66
67    /// Only run the following pipelines. If not provided, all pipelines found in the
68    /// configuration file will be run.
69    #[arg(long, action = clap::ArgAction::Append)]
70    pub pipeline: Vec<String>,
71
72    /// Additional configurations for running a tasked indexer.
73    #[clap(flatten)]
74    pub task: TaskArgs,
75}
76
77/// Command-line arguments for configuring a tasked indexer.
78#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
81    /// delimiter defined on the store. This allows the same pipelines to run under multiple
82    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
83    /// entries in the database.
84    ///
85    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
86    ///
87    /// Sequential pipelines cannot be attached to a tasked indexer.
88    ///
89    /// The framework ensures that tasked pipelines never commit checkpoints below the main
90    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
91    #[arg(long, requires = "reader_interval_ms")]
92    task: Option<String>,
93
94    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
95    /// refetch its main pipeline's reader watermark.
96    ///
97    /// This is required when `--task` is set and should should ideally be set to a value that is
98    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
99    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
100    /// it.
101    ///
102    /// If the main pipeline does not have pruning enabled, this value can be set to some high
103    /// value, as the tasked pipeline will never see an updated reader watermark.
104    #[arg(long, requires = "task")]
105    reader_interval_ms: Option<u64>,
106}
107
108pub struct Indexer<S: Store> {
109    /// The storage backend that the indexer uses to write and query indexed data. This
110    /// generic implementation allows for plugging in different storage solutions that implement the
111    /// `Store` trait.
112    store: S,
113
114    /// Prometheus Metrics.
115    metrics: Arc<IndexerMetrics>,
116
117    /// Service for downloading and disseminating checkpoint data.
118    ingestion_service: IngestionService,
119
120    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
121    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
122    /// always resume from their committer watermark + 1.
123    ///
124    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
125    /// is the minimum across all pipelines' next checkpoints.
126    default_next_checkpoint: u64,
127
128    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
129    /// this checkpoint.
130    last_checkpoint: Option<u64>,
131
132    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
133    /// delimiter defined on the store. This allows the same pipelines to run under multiple
134    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
135    /// entries in the database.
136    ///
137    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
138    ///
139    /// Sequential pipelines cannot be attached to a tasked indexer.
140    ///
141    /// The framework ensures that tasked pipelines never commit checkpoints below the main
142    /// pipeline’s pruner watermark.
143    task: Option<Task>,
144
145    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
146    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
147    /// a warning when the indexer is run.
148    enabled_pipelines: Option<BTreeSet<String>>,
149
150    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
151    /// with the same name isn't added twice.
152    added_pipelines: BTreeSet<&'static str>,
153
154    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
155    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
156    /// unless overridden by [Self::default_next_checkpoint].
157    first_ingestion_checkpoint: u64,
158
159    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
160    pipelines: Vec<Service>,
161}
162
163/// Configuration for a tasked indexer.
164#[derive(Clone)]
165pub(crate) struct Task {
166    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
167    /// record pipeline watermarks.
168    task: String,
169    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
170    /// pipeline's reader watermark.
171    reader_interval: Duration,
172}
173
174impl TaskArgs {
175    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
176        Self {
177            task: Some(task),
178            reader_interval_ms: Some(reader_interval_ms),
179        }
180    }
181
182    fn into_task(self) -> Option<Task> {
183        Some(Task {
184            task: self.task?,
185            reader_interval: Duration::from_millis(self.reader_interval_ms?),
186        })
187    }
188}
189
190impl<S: Store> Indexer<S> {
191    /// Create a new instance of the indexer framework from a store that implements the `Store`
192    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
193    /// arguments configure the following:
194    ///
195    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
196    ///   watermarks) and where to serve metrics from,
197    /// - Where to download checkpoints from,
198    /// - Concurrency and buffering parameters for downloading checkpoints.
199    ///
200    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
201    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
202    pub async fn new(
203        store: S,
204        indexer_args: IndexerArgs,
205        client_args: ClientArgs,
206        ingestion_config: IngestionConfig,
207        metrics_prefix: Option<&str>,
208        registry: &Registry,
209    ) -> Result<Self> {
210        let IndexerArgs {
211            first_checkpoint,
212            last_checkpoint,
213            pipeline,
214            task,
215        } = indexer_args;
216
217        let metrics = IndexerMetrics::new(metrics_prefix, registry);
218
219        let ingestion_service =
220            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
221
222        Ok(Self {
223            store,
224            metrics,
225            ingestion_service,
226            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
227            last_checkpoint,
228            task: task.into_task(),
229            enabled_pipelines: if pipeline.is_empty() {
230                None
231            } else {
232                Some(pipeline.into_iter().collect())
233            },
234            added_pipelines: BTreeSet::new(),
235            first_ingestion_checkpoint: u64::MAX,
236            pipelines: vec![],
237        })
238    }
239
240    /// The store used by the indexer.
241    pub fn store(&self) -> &S {
242        &self.store
243    }
244
245    /// The ingestion client used by the indexer to fetch checkpoints.
246    pub fn ingestion_client(&self) -> &IngestionClient {
247        self.ingestion_service.ingestion_client()
248    }
249
250    /// The indexer's metrics.
251    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
252        &self.metrics
253    }
254
255    /// The ingestion service's metrics.
256    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
257        self.ingestion_service.metrics()
258    }
259
260    /// The pipelines that this indexer will run.
261    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
262        self.added_pipelines.iter().copied().filter(|p| {
263            self.enabled_pipelines
264                .as_ref()
265                .is_none_or(|e| e.contains(*p))
266        })
267    }
268
269    /// The minimum next checkpoint across all pipelines, used to initialize the ingestion
270    /// regulator's high watermark to prevent ingestion from running too far ahead. Returns `None`
271    /// if no pipelines have been added yet.
272    pub fn initial_commit_hi(&self) -> Option<u64> {
273        (self.first_ingestion_checkpoint != u64::MAX).then_some(self.first_ingestion_checkpoint)
274    }
275
276    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
277    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
278    ///
279    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
280    /// keep the watermark table up-to-date with the highest point they can guarantee all data
281    /// exists for, for their pipeline.
282    pub async fn concurrent_pipeline<H>(
283        &mut self,
284        handler: H,
285        config: ConcurrentConfig,
286    ) -> Result<()>
287    where
288        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
289    {
290        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
291            return Ok(());
292        };
293
294        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
295        self.pipelines.push(concurrent::pipeline::<H>(
296            handler,
297            next_checkpoint,
298            config,
299            self.store.clone(),
300            self.task.clone(),
301            checkpoint_rx,
302            commit_hi_tx,
303            self.metrics.clone(),
304        ));
305
306        Ok(())
307    }
308
309    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
310    /// will start processing and committing once the ingestion service has caught up to their
311    /// respective watermarks.
312    ///
313    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
314    pub async fn run(self) -> Result<Service> {
315        let initial_commit_hi = self.initial_commit_hi();
316
317        if let Some(enabled_pipelines) = self.enabled_pipelines {
318            ensure!(
319                enabled_pipelines.is_empty(),
320                "Tried to enable pipelines that this indexer does not know about: \
321                {enabled_pipelines:#?}",
322            );
323        }
324
325        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
326
327        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
328
329        let mut service = self
330            .ingestion_service
331            .run(
332                self.first_ingestion_checkpoint..=last_checkpoint,
333                initial_commit_hi,
334            )
335            .await
336            .context("Failed to start ingestion service")?;
337
338        for pipeline in self.pipelines {
339            service = service.merge(pipeline);
340        }
341
342        Ok(service)
343    }
344
345    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
346    /// checkpoint after its watermark, or if that doesn't exist, then the provided
347    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
348    ///
349    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
350    /// calculated above.
351    ///
352    /// Returns `Ok(None)` if the pipeline is disabled.
353    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
354        ensure!(
355            self.added_pipelines.insert(P::NAME),
356            "Pipeline {:?} already added",
357            P::NAME,
358        );
359
360        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
361            && !enabled_pipelines.remove(P::NAME)
362        {
363            info!(pipeline = P::NAME, "Skipping");
364            return Ok(None);
365        }
366
367        let mut conn = self
368            .store
369            .connect()
370            .await
371            .context("Failed to establish connection to store")?;
372
373        let pipeline_task =
374            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
375
376        let checkpoint_hi_inclusive = conn
377            .init_watermark(&pipeline_task, self.default_next_checkpoint)
378            .await
379            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
380
381        let next_checkpoint =
382            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
383
384        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
385
386        Ok(Some(next_checkpoint))
387    }
388}
389
390impl<T: TransactionalStore> Indexer<T> {
391    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
392    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
393    ///
394    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
395    /// required to handle pipelines that modify data in-place (where each update is not an insert,
396    /// but could be a modification of an existing row, where ordering between updates is
397    /// important).
398    ///
399    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
400    /// number of checkpoints (configured by `checkpoint_lag`).
401    pub async fn sequential_pipeline<H>(
402        &mut self,
403        handler: H,
404        config: SequentialConfig,
405    ) -> Result<()>
406    where
407        H: Handler<Store = T> + Send + Sync + 'static,
408    {
409        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
410            return Ok(());
411        };
412
413        if self.task.is_some() {
414            bail!(
415                "Sequential pipelines do not support pipeline tasks. \
416                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417                Running the same pipeline under a different task would violate these guarantees."
418            );
419        }
420
421        let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
422
423        self.pipelines.push(sequential::pipeline::<H>(
424            handler,
425            next_checkpoint,
426            config,
427            self.store.clone(),
428            checkpoint_rx,
429            watermark_tx,
430            self.metrics.clone(),
431        ));
432
433        Ok(())
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use std::sync::Arc;
440
441    use async_trait::async_trait;
442    use clap::Parser;
443    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
444    use sui_synthetic_ingestion::synthetic_ingestion;
445    use tokio::sync::watch;
446
447    use crate::FieldCount;
448    use crate::ingestion::ingestion_client::IngestionClientArgs;
449    use crate::mocks::store::MockStore;
450    use crate::pipeline::CommitterConfig;
451    use crate::pipeline::Processor;
452    use crate::pipeline::concurrent::ConcurrentConfig;
453    use crate::store::CommitterWatermark;
454
455    use super::*;
456
457    #[allow(dead_code)]
458    #[derive(Clone, FieldCount)]
459    struct MockValue(u64);
460
461    /// A handler that can be controlled externally to block checkpoint processing.
462    struct ControllableHandler {
463        /// Process checkpoints less than or equal to this value.
464        process_below: watch::Receiver<u64>,
465    }
466
467    impl ControllableHandler {
468        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
469            let (tx, rx) = watch::channel(limit);
470            (Self { process_below: rx }, tx)
471        }
472    }
473
474    #[async_trait]
475    impl Processor for ControllableHandler {
476        const NAME: &'static str = "controllable";
477        /// The checkpoints to process come out of order. To account for potential flakiness in test
478        /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to
479        /// the total number of checkpoints to process, so we can ensure the correct checkpoints are
480        /// processed.
481        const FANOUT: usize = 501;
482        type Value = MockValue;
483
484        async fn process(
485            &self,
486            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
487        ) -> anyhow::Result<Vec<Self::Value>> {
488            let cp_num = checkpoint.summary.sequence_number;
489
490            // Wait until the checkpoint is allowed to be processed
491            self.process_below
492                .clone()
493                .wait_for(|&limit| cp_num <= limit)
494                .await
495                .ok();
496
497            Ok(vec![MockValue(cp_num)])
498        }
499    }
500
501    #[async_trait]
502    impl crate::pipeline::concurrent::Handler for ControllableHandler {
503        type Store = MockStore;
504        type Batch = Vec<MockValue>;
505
506        fn batch(
507            &self,
508            batch: &mut Self::Batch,
509            values: &mut std::vec::IntoIter<Self::Value>,
510        ) -> crate::pipeline::concurrent::BatchStatus {
511            batch.extend(values);
512            crate::pipeline::concurrent::BatchStatus::Ready
513        }
514
515        async fn commit<'a>(
516            &self,
517            batch: &Self::Batch,
518            conn: &mut <Self::Store as Store>::Connection<'a>,
519        ) -> anyhow::Result<usize> {
520            for value in batch {
521                conn.0
522                    .commit_data(Self::NAME, value.0, vec![value.0])
523                    .await?;
524            }
525            Ok(batch.len())
526        }
527    }
528
529    macro_rules! test_pipeline {
530        ($handler:ident, $name:literal) => {
531            struct $handler;
532
533            #[async_trait]
534            impl Processor for $handler {
535                const NAME: &'static str = $name;
536                type Value = MockValue;
537                async fn process(
538                    &self,
539                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
540                ) -> anyhow::Result<Vec<Self::Value>> {
541                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
542                }
543            }
544
545            #[async_trait]
546            impl crate::pipeline::concurrent::Handler for $handler {
547                type Store = MockStore;
548                type Batch = Vec<Self::Value>;
549
550                fn batch(
551                    &self,
552                    batch: &mut Self::Batch,
553                    values: &mut std::vec::IntoIter<Self::Value>,
554                ) -> crate::pipeline::concurrent::BatchStatus {
555                    batch.extend(values);
556                    crate::pipeline::concurrent::BatchStatus::Pending
557                }
558
559                async fn commit<'a>(
560                    &self,
561                    batch: &Self::Batch,
562                    conn: &mut <Self::Store as Store>::Connection<'a>,
563                ) -> anyhow::Result<usize> {
564                    for value in batch {
565                        conn.0
566                            .commit_data(Self::NAME, value.0, vec![value.0])
567                            .await?;
568                    }
569                    Ok(batch.len())
570                }
571            }
572
573            #[async_trait]
574            impl crate::pipeline::sequential::Handler for $handler {
575                type Store = MockStore;
576                type Batch = Vec<Self::Value>;
577
578                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
579                    batch.extend(values);
580                }
581
582                async fn commit<'a>(
583                    &self,
584                    _batch: &Self::Batch,
585                    _conn: &mut <Self::Store as Store>::Connection<'a>,
586                ) -> anyhow::Result<usize> {
587                    Ok(1)
588                }
589            }
590        };
591    }
592
593    test_pipeline!(MockHandler, "test_processor");
594    test_pipeline!(SequentialHandler, "sequential_handler");
595    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
596
597    async fn test_init_watermark(
598        first_checkpoint: Option<u64>,
599        is_concurrent: bool,
600    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
601        let registry = Registry::new();
602        let store = MockStore::default();
603
604        test_pipeline!(A, "pipeline_name");
605
606        let mut conn = store.connect().await.unwrap();
607
608        let indexer_args = IndexerArgs {
609            first_checkpoint,
610            ..IndexerArgs::default()
611        };
612        let temp_dir = tempfile::tempdir().unwrap();
613        let client_args = ClientArgs {
614            ingestion: IngestionClientArgs {
615                local_ingestion_path: Some(temp_dir.path().to_owned()),
616                ..Default::default()
617            },
618            ..Default::default()
619        };
620        let ingestion_config = IngestionConfig::default();
621
622        let mut indexer = Indexer::new(
623            store.clone(),
624            indexer_args,
625            client_args,
626            ingestion_config,
627            None,
628            &registry,
629        )
630        .await
631        .unwrap();
632
633        if is_concurrent {
634            indexer
635                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
636                .await
637                .unwrap();
638        } else {
639            indexer
640                .sequential_pipeline::<A>(A, SequentialConfig::default())
641                .await
642                .unwrap();
643        }
644
645        (
646            conn.committer_watermark(A::NAME).await.unwrap(),
647            conn.pruner_watermark(A::NAME, Duration::ZERO)
648                .await
649                .unwrap(),
650        )
651    }
652
653    #[test]
654    fn test_arg_parsing() {
655        #[derive(Parser)]
656        struct Args {
657            #[clap(flatten)]
658            indexer: IndexerArgs,
659        }
660
661        let args = Args::try_parse_from([
662            "cmd",
663            "--first-checkpoint",
664            "10",
665            "--last-checkpoint",
666            "100",
667            "--pipeline",
668            "a",
669            "--pipeline",
670            "b",
671            "--task",
672            "t",
673            "--reader-interval-ms",
674            "5000",
675        ])
676        .unwrap();
677
678        assert_eq!(args.indexer.first_checkpoint, Some(10));
679        assert_eq!(args.indexer.last_checkpoint, Some(100));
680        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
681        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
682        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
683    }
684
685    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
686    #[tokio::test]
687    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
688        let registry = Registry::new();
689        let store = MockStore::default();
690
691        test_pipeline!(A, "concurrent_a");
692        test_pipeline!(B, "concurrent_b");
693        test_pipeline!(C, "sequential_c");
694        test_pipeline!(D, "sequential_d");
695
696        let mut conn = store.connect().await.unwrap();
697        conn.set_committer_watermark(
698            A::NAME,
699            CommitterWatermark {
700                checkpoint_hi_inclusive: 100,
701                ..Default::default()
702            },
703        )
704        .await
705        .unwrap();
706        conn.set_committer_watermark(
707            B::NAME,
708            CommitterWatermark {
709                checkpoint_hi_inclusive: 10,
710                ..Default::default()
711            },
712        )
713        .await
714        .unwrap();
715        conn.set_committer_watermark(
716            C::NAME,
717            CommitterWatermark {
718                checkpoint_hi_inclusive: 1,
719                ..Default::default()
720            },
721        )
722        .await
723        .unwrap();
724        conn.set_committer_watermark(
725            D::NAME,
726            CommitterWatermark {
727                checkpoint_hi_inclusive: 50,
728                ..Default::default()
729            },
730        )
731        .await
732        .unwrap();
733
734        let indexer_args = IndexerArgs::default();
735        let temp_dir = tempfile::tempdir().unwrap();
736        let client_args = ClientArgs {
737            ingestion: IngestionClientArgs {
738                local_ingestion_path: Some(temp_dir.path().to_owned()),
739                ..Default::default()
740            },
741            ..Default::default()
742        };
743        let ingestion_config = IngestionConfig::default();
744
745        let mut indexer = Indexer::new(
746            store,
747            indexer_args,
748            client_args,
749            ingestion_config,
750            None,
751            &registry,
752        )
753        .await
754        .unwrap();
755
756        indexer
757            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
758            .await
759            .unwrap();
760        indexer
761            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
762            .await
763            .unwrap();
764        indexer
765            .sequential_pipeline::<C>(C, SequentialConfig::default())
766            .await
767            .unwrap();
768        indexer
769            .sequential_pipeline::<D>(D, SequentialConfig::default())
770            .await
771            .unwrap();
772
773        assert_eq!(indexer.first_ingestion_checkpoint, 2);
774    }
775
776    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
777    #[tokio::test]
778    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
779        let registry = Registry::new();
780        let store = MockStore::default();
781
782        test_pipeline!(A, "concurrent_a");
783        test_pipeline!(B, "concurrent_b");
784        test_pipeline!(C, "sequential_c");
785        test_pipeline!(D, "sequential_d");
786
787        let mut conn = store.connect().await.unwrap();
788        conn.set_committer_watermark(
789            B::NAME,
790            CommitterWatermark {
791                checkpoint_hi_inclusive: 10,
792                ..Default::default()
793            },
794        )
795        .await
796        .unwrap();
797        conn.set_committer_watermark(
798            C::NAME,
799            CommitterWatermark {
800                checkpoint_hi_inclusive: 1,
801                ..Default::default()
802            },
803        )
804        .await
805        .unwrap();
806
807        let indexer_args = IndexerArgs::default();
808        let temp_dir = tempfile::tempdir().unwrap();
809        let client_args = ClientArgs {
810            ingestion: IngestionClientArgs {
811                local_ingestion_path: Some(temp_dir.path().to_owned()),
812                ..Default::default()
813            },
814            ..Default::default()
815        };
816        let ingestion_config = IngestionConfig::default();
817
818        let mut indexer = Indexer::new(
819            store,
820            indexer_args,
821            client_args,
822            ingestion_config,
823            None,
824            &registry,
825        )
826        .await
827        .unwrap();
828
829        indexer
830            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
831            .await
832            .unwrap();
833        indexer
834            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
835            .await
836            .unwrap();
837        indexer
838            .sequential_pipeline::<C>(C, SequentialConfig::default())
839            .await
840            .unwrap();
841        indexer
842            .sequential_pipeline::<D>(D, SequentialConfig::default())
843            .await
844            .unwrap();
845
846        assert_eq!(indexer.first_ingestion_checkpoint, 0);
847    }
848
849    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
850    #[tokio::test]
851    async fn test_first_ingestion_checkpoint_smallest_is_0() {
852        let registry = Registry::new();
853        let store = MockStore::default();
854
855        test_pipeline!(A, "concurrent_a");
856        test_pipeline!(B, "concurrent_b");
857        test_pipeline!(C, "sequential_c");
858        test_pipeline!(D, "sequential_d");
859
860        let mut conn = store.connect().await.unwrap();
861        conn.set_committer_watermark(
862            A::NAME,
863            CommitterWatermark {
864                checkpoint_hi_inclusive: 100,
865                ..Default::default()
866            },
867        )
868        .await
869        .unwrap();
870        conn.set_committer_watermark(
871            B::NAME,
872            CommitterWatermark {
873                checkpoint_hi_inclusive: 10,
874                ..Default::default()
875            },
876        )
877        .await
878        .unwrap();
879        conn.set_committer_watermark(
880            C::NAME,
881            CommitterWatermark {
882                checkpoint_hi_inclusive: 1,
883                ..Default::default()
884            },
885        )
886        .await
887        .unwrap();
888        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
889            .await
890            .unwrap();
891
892        let indexer_args = IndexerArgs::default();
893        let temp_dir = tempfile::tempdir().unwrap();
894        let client_args = ClientArgs {
895            ingestion: IngestionClientArgs {
896                local_ingestion_path: Some(temp_dir.path().to_owned()),
897                ..Default::default()
898            },
899            ..Default::default()
900        };
901        let ingestion_config = IngestionConfig::default();
902
903        let mut indexer = Indexer::new(
904            store,
905            indexer_args,
906            client_args,
907            ingestion_config,
908            None,
909            &registry,
910        )
911        .await
912        .unwrap();
913
914        indexer
915            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
916            .await
917            .unwrap();
918        indexer
919            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
920            .await
921            .unwrap();
922        indexer
923            .sequential_pipeline::<C>(C, SequentialConfig::default())
924            .await
925            .unwrap();
926        indexer
927            .sequential_pipeline::<D>(D, SequentialConfig::default())
928            .await
929            .unwrap();
930
931        assert_eq!(indexer.first_ingestion_checkpoint, 1);
932    }
933
934    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
935    /// watermark, and first_checkpoint is smallest.
936    #[tokio::test]
937    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
938        let registry = Registry::new();
939        let store = MockStore::default();
940
941        test_pipeline!(A, "concurrent_a");
942        test_pipeline!(B, "concurrent_b");
943        test_pipeline!(C, "sequential_c");
944        test_pipeline!(D, "sequential_d");
945
946        let mut conn = store.connect().await.unwrap();
947        conn.set_committer_watermark(
948            B::NAME,
949            CommitterWatermark {
950                checkpoint_hi_inclusive: 50,
951                ..Default::default()
952            },
953        )
954        .await
955        .unwrap();
956        conn.set_committer_watermark(
957            C::NAME,
958            CommitterWatermark {
959                checkpoint_hi_inclusive: 10,
960                ..Default::default()
961            },
962        )
963        .await
964        .unwrap();
965
966        let indexer_args = IndexerArgs {
967            first_checkpoint: Some(5),
968            ..Default::default()
969        };
970        let temp_dir = tempfile::tempdir().unwrap();
971        let client_args = ClientArgs {
972            ingestion: IngestionClientArgs {
973                local_ingestion_path: Some(temp_dir.path().to_owned()),
974                ..Default::default()
975            },
976            ..Default::default()
977        };
978        let ingestion_config = IngestionConfig::default();
979
980        let mut indexer = Indexer::new(
981            store,
982            indexer_args,
983            client_args,
984            ingestion_config,
985            None,
986            &registry,
987        )
988        .await
989        .unwrap();
990
991        indexer
992            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
993            .await
994            .unwrap();
995        indexer
996            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
997            .await
998            .unwrap();
999        indexer
1000            .sequential_pipeline::<C>(C, SequentialConfig::default())
1001            .await
1002            .unwrap();
1003        indexer
1004            .sequential_pipeline::<D>(D, SequentialConfig::default())
1005            .await
1006            .unwrap();
1007
1008        assert_eq!(indexer.first_ingestion_checkpoint, 5);
1009    }
1010
1011    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1012    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1013    #[tokio::test]
1014    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1015        let registry = Registry::new();
1016        let store = MockStore::default();
1017
1018        test_pipeline!(B, "concurrent_b");
1019        test_pipeline!(C, "sequential_c");
1020
1021        let mut conn = store.connect().await.unwrap();
1022        conn.set_committer_watermark(
1023            B::NAME,
1024            CommitterWatermark {
1025                checkpoint_hi_inclusive: 50,
1026                ..Default::default()
1027            },
1028        )
1029        .await
1030        .unwrap();
1031        conn.set_committer_watermark(
1032            C::NAME,
1033            CommitterWatermark {
1034                checkpoint_hi_inclusive: 10,
1035                ..Default::default()
1036            },
1037        )
1038        .await
1039        .unwrap();
1040
1041        let indexer_args = IndexerArgs {
1042            first_checkpoint: Some(5),
1043            ..Default::default()
1044        };
1045        let temp_dir = tempfile::tempdir().unwrap();
1046        let client_args = ClientArgs {
1047            ingestion: IngestionClientArgs {
1048                local_ingestion_path: Some(temp_dir.path().to_owned()),
1049                ..Default::default()
1050            },
1051            ..Default::default()
1052        };
1053        let ingestion_config = IngestionConfig::default();
1054
1055        let mut indexer = Indexer::new(
1056            store,
1057            indexer_args,
1058            client_args,
1059            ingestion_config,
1060            None,
1061            &registry,
1062        )
1063        .await
1064        .unwrap();
1065
1066        indexer
1067            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1068            .await
1069            .unwrap();
1070        indexer
1071            .sequential_pipeline::<C>(C, SequentialConfig::default())
1072            .await
1073            .unwrap();
1074
1075        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1076    }
1077
1078    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1079    /// will not be used as the starting point if it is not the smallest valid committer watermark
1080    /// to resume ingesting from.
1081    #[tokio::test]
1082    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1083        let registry = Registry::new();
1084        let store = MockStore::default();
1085
1086        test_pipeline!(A, "concurrent_a");
1087        test_pipeline!(B, "concurrent_b");
1088        test_pipeline!(C, "sequential_c");
1089
1090        let mut conn = store.connect().await.unwrap();
1091        conn.set_committer_watermark(
1092            B::NAME,
1093            CommitterWatermark {
1094                checkpoint_hi_inclusive: 50,
1095                ..Default::default()
1096            },
1097        )
1098        .await
1099        .unwrap();
1100        conn.set_committer_watermark(
1101            C::NAME,
1102            CommitterWatermark {
1103                checkpoint_hi_inclusive: 10,
1104                ..Default::default()
1105            },
1106        )
1107        .await
1108        .unwrap();
1109
1110        let indexer_args = IndexerArgs {
1111            first_checkpoint: Some(24),
1112            ..Default::default()
1113        };
1114        let temp_dir = tempfile::tempdir().unwrap();
1115        let client_args = ClientArgs {
1116            ingestion: IngestionClientArgs {
1117                local_ingestion_path: Some(temp_dir.path().to_owned()),
1118                ..Default::default()
1119            },
1120            ..Default::default()
1121        };
1122        let ingestion_config = IngestionConfig::default();
1123
1124        let mut indexer = Indexer::new(
1125            store,
1126            indexer_args,
1127            client_args,
1128            ingestion_config,
1129            None,
1130            &registry,
1131        )
1132        .await
1133        .unwrap();
1134
1135        indexer
1136            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1137            .await
1138            .unwrap();
1139
1140        indexer
1141            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1142            .await
1143            .unwrap();
1144
1145        indexer
1146            .sequential_pipeline::<C>(C, SequentialConfig::default())
1147            .await
1148            .unwrap();
1149
1150        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1151    }
1152
1153    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1154    #[tokio::test]
1155    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1156        let registry = Registry::new();
1157        let store = MockStore::default();
1158
1159        test_pipeline!(A, "concurrent_a");
1160        test_pipeline!(B, "concurrent_b");
1161        test_pipeline!(C, "sequential_c");
1162        test_pipeline!(D, "sequential_d");
1163
1164        let mut conn = store.connect().await.unwrap();
1165        conn.set_committer_watermark(
1166            A::NAME,
1167            CommitterWatermark {
1168                checkpoint_hi_inclusive: 5,
1169                ..Default::default()
1170            },
1171        )
1172        .await
1173        .unwrap();
1174        conn.set_committer_watermark(
1175            B::NAME,
1176            CommitterWatermark {
1177                checkpoint_hi_inclusive: 10,
1178                ..Default::default()
1179            },
1180        )
1181        .await
1182        .unwrap();
1183        conn.set_committer_watermark(
1184            C::NAME,
1185            CommitterWatermark {
1186                checkpoint_hi_inclusive: 15,
1187                ..Default::default()
1188            },
1189        )
1190        .await
1191        .unwrap();
1192        conn.set_committer_watermark(
1193            D::NAME,
1194            CommitterWatermark {
1195                checkpoint_hi_inclusive: 20,
1196                ..Default::default()
1197            },
1198        )
1199        .await
1200        .unwrap();
1201
1202        // Create synthetic ingestion data
1203        let temp_dir = tempfile::tempdir().unwrap();
1204        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1205            ingestion_dir: temp_dir.path().to_owned(),
1206            starting_checkpoint: 5,
1207            num_checkpoints: 25,
1208            checkpoint_size: 1,
1209        })
1210        .await;
1211
1212        let indexer_args = IndexerArgs {
1213            last_checkpoint: Some(29),
1214            ..Default::default()
1215        };
1216
1217        let client_args = ClientArgs {
1218            ingestion: IngestionClientArgs {
1219                local_ingestion_path: Some(temp_dir.path().to_owned()),
1220                ..Default::default()
1221            },
1222            ..Default::default()
1223        };
1224
1225        let ingestion_config = IngestionConfig::default();
1226
1227        let mut indexer = Indexer::new(
1228            store.clone(),
1229            indexer_args,
1230            client_args,
1231            ingestion_config,
1232            None,
1233            &registry,
1234        )
1235        .await
1236        .unwrap();
1237
1238        indexer
1239            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1240            .await
1241            .unwrap();
1242        indexer
1243            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1244            .await
1245            .unwrap();
1246        indexer
1247            .sequential_pipeline::<C>(C, SequentialConfig::default())
1248            .await
1249            .unwrap();
1250        indexer
1251            .sequential_pipeline::<D>(D, SequentialConfig::default())
1252            .await
1253            .unwrap();
1254
1255        let ingestion_metrics = indexer.ingestion_metrics().clone();
1256        let indexer_metrics = indexer.indexer_metrics().clone();
1257
1258        indexer.run().await.unwrap().join().await.unwrap();
1259
1260        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1261        assert_eq!(
1262            indexer_metrics
1263                .total_watermarks_out_of_order
1264                .get_metric_with_label_values(&[A::NAME])
1265                .unwrap()
1266                .get(),
1267            0
1268        );
1269        assert_eq!(
1270            indexer_metrics
1271                .total_watermarks_out_of_order
1272                .get_metric_with_label_values(&[B::NAME])
1273                .unwrap()
1274                .get(),
1275            5
1276        );
1277        assert_eq!(
1278            indexer_metrics
1279                .total_watermarks_out_of_order
1280                .get_metric_with_label_values(&[C::NAME])
1281                .unwrap()
1282                .get(),
1283            10
1284        );
1285        assert_eq!(
1286            indexer_metrics
1287                .total_watermarks_out_of_order
1288                .get_metric_with_label_values(&[D::NAME])
1289                .unwrap()
1290                .get(),
1291            15
1292        );
1293    }
1294
1295    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1296    #[tokio::test]
1297    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1298        let registry = Registry::new();
1299        let store = MockStore::default();
1300
1301        test_pipeline!(A, "concurrent_a");
1302        test_pipeline!(B, "concurrent_b");
1303        test_pipeline!(C, "sequential_c");
1304        test_pipeline!(D, "sequential_d");
1305
1306        let mut conn = store.connect().await.unwrap();
1307        conn.set_committer_watermark(
1308            A::NAME,
1309            CommitterWatermark {
1310                checkpoint_hi_inclusive: 5,
1311                ..Default::default()
1312            },
1313        )
1314        .await
1315        .unwrap();
1316        conn.set_committer_watermark(
1317            B::NAME,
1318            CommitterWatermark {
1319                checkpoint_hi_inclusive: 10,
1320                ..Default::default()
1321            },
1322        )
1323        .await
1324        .unwrap();
1325        conn.set_committer_watermark(
1326            C::NAME,
1327            CommitterWatermark {
1328                checkpoint_hi_inclusive: 15,
1329                ..Default::default()
1330            },
1331        )
1332        .await
1333        .unwrap();
1334        conn.set_committer_watermark(
1335            D::NAME,
1336            CommitterWatermark {
1337                checkpoint_hi_inclusive: 20,
1338                ..Default::default()
1339            },
1340        )
1341        .await
1342        .unwrap();
1343
1344        // Create synthetic ingestion data
1345        let temp_dir = tempfile::tempdir().unwrap();
1346        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1347            ingestion_dir: temp_dir.path().to_owned(),
1348            starting_checkpoint: 5,
1349            num_checkpoints: 25,
1350            checkpoint_size: 1,
1351        })
1352        .await;
1353
1354        let indexer_args = IndexerArgs {
1355            first_checkpoint: Some(3),
1356            last_checkpoint: Some(29),
1357            ..Default::default()
1358        };
1359
1360        let client_args = ClientArgs {
1361            ingestion: IngestionClientArgs {
1362                local_ingestion_path: Some(temp_dir.path().to_owned()),
1363                ..Default::default()
1364            },
1365            ..Default::default()
1366        };
1367
1368        let ingestion_config = IngestionConfig::default();
1369
1370        let mut indexer = Indexer::new(
1371            store.clone(),
1372            indexer_args,
1373            client_args,
1374            ingestion_config,
1375            None,
1376            &registry,
1377        )
1378        .await
1379        .unwrap();
1380
1381        indexer
1382            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1383            .await
1384            .unwrap();
1385        indexer
1386            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1387            .await
1388            .unwrap();
1389        indexer
1390            .sequential_pipeline::<C>(C, SequentialConfig::default())
1391            .await
1392            .unwrap();
1393        indexer
1394            .sequential_pipeline::<D>(D, SequentialConfig::default())
1395            .await
1396            .unwrap();
1397
1398        let ingestion_metrics = indexer.ingestion_metrics().clone();
1399        let metrics = indexer.indexer_metrics().clone();
1400        indexer.run().await.unwrap().join().await.unwrap();
1401
1402        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1403        assert_eq!(
1404            metrics
1405                .total_watermarks_out_of_order
1406                .get_metric_with_label_values(&[A::NAME])
1407                .unwrap()
1408                .get(),
1409            0
1410        );
1411        assert_eq!(
1412            metrics
1413                .total_watermarks_out_of_order
1414                .get_metric_with_label_values(&[B::NAME])
1415                .unwrap()
1416                .get(),
1417            5
1418        );
1419        assert_eq!(
1420            metrics
1421                .total_watermarks_out_of_order
1422                .get_metric_with_label_values(&[C::NAME])
1423                .unwrap()
1424                .get(),
1425            10
1426        );
1427        assert_eq!(
1428            metrics
1429                .total_watermarks_out_of_order
1430                .get_metric_with_label_values(&[D::NAME])
1431                .unwrap()
1432                .get(),
1433            15
1434        );
1435    }
1436
1437    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1438    #[tokio::test]
1439    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1440        let registry = Registry::new();
1441        let store = MockStore::default();
1442
1443        test_pipeline!(A, "concurrent_a");
1444        test_pipeline!(B, "concurrent_b");
1445        test_pipeline!(C, "sequential_c");
1446        test_pipeline!(D, "sequential_d");
1447
1448        let mut conn = store.connect().await.unwrap();
1449        conn.set_committer_watermark(
1450            B::NAME,
1451            CommitterWatermark {
1452                checkpoint_hi_inclusive: 10,
1453                ..Default::default()
1454            },
1455        )
1456        .await
1457        .unwrap();
1458        conn.set_committer_watermark(
1459            C::NAME,
1460            CommitterWatermark {
1461                checkpoint_hi_inclusive: 15,
1462                ..Default::default()
1463            },
1464        )
1465        .await
1466        .unwrap();
1467        conn.set_committer_watermark(
1468            D::NAME,
1469            CommitterWatermark {
1470                checkpoint_hi_inclusive: 20,
1471                ..Default::default()
1472            },
1473        )
1474        .await
1475        .unwrap();
1476
1477        // Create synthetic ingestion data
1478        let temp_dir = tempfile::tempdir().unwrap();
1479        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1480            ingestion_dir: temp_dir.path().to_owned(),
1481            starting_checkpoint: 0,
1482            num_checkpoints: 30,
1483            checkpoint_size: 1,
1484        })
1485        .await;
1486
1487        let indexer_args = IndexerArgs {
1488            last_checkpoint: Some(29),
1489            ..Default::default()
1490        };
1491
1492        let client_args = ClientArgs {
1493            ingestion: IngestionClientArgs {
1494                local_ingestion_path: Some(temp_dir.path().to_owned()),
1495                ..Default::default()
1496            },
1497            ..Default::default()
1498        };
1499
1500        let ingestion_config = IngestionConfig::default();
1501
1502        let mut indexer = Indexer::new(
1503            store.clone(),
1504            indexer_args,
1505            client_args,
1506            ingestion_config,
1507            None,
1508            &registry,
1509        )
1510        .await
1511        .unwrap();
1512
1513        indexer
1514            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1515            .await
1516            .unwrap();
1517        indexer
1518            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1519            .await
1520            .unwrap();
1521        indexer
1522            .sequential_pipeline::<C>(C, SequentialConfig::default())
1523            .await
1524            .unwrap();
1525        indexer
1526            .sequential_pipeline::<D>(D, SequentialConfig::default())
1527            .await
1528            .unwrap();
1529
1530        let ingestion_metrics = indexer.ingestion_metrics().clone();
1531        let metrics = indexer.indexer_metrics().clone();
1532        indexer.run().await.unwrap().join().await.unwrap();
1533
1534        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1535        assert_eq!(
1536            metrics
1537                .total_watermarks_out_of_order
1538                .get_metric_with_label_values(&[A::NAME])
1539                .unwrap()
1540                .get(),
1541            0
1542        );
1543        assert_eq!(
1544            metrics
1545                .total_watermarks_out_of_order
1546                .get_metric_with_label_values(&[B::NAME])
1547                .unwrap()
1548                .get(),
1549            11
1550        );
1551        assert_eq!(
1552            metrics
1553                .total_watermarks_out_of_order
1554                .get_metric_with_label_values(&[C::NAME])
1555                .unwrap()
1556                .get(),
1557            16
1558        );
1559        assert_eq!(
1560            metrics
1561                .total_watermarks_out_of_order
1562                .get_metric_with_label_values(&[D::NAME])
1563                .unwrap()
1564                .get(),
1565            21
1566        );
1567    }
1568
1569    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1570    #[tokio::test]
1571    async fn test_indexer_ingestion_use_first_checkpoint() {
1572        let registry = Registry::new();
1573        let store = MockStore::default();
1574
1575        test_pipeline!(A, "concurrent_a");
1576        test_pipeline!(B, "concurrent_b");
1577        test_pipeline!(C, "sequential_c");
1578        test_pipeline!(D, "sequential_d");
1579
1580        let mut conn = store.connect().await.unwrap();
1581        conn.set_committer_watermark(
1582            B::NAME,
1583            CommitterWatermark {
1584                checkpoint_hi_inclusive: 10,
1585                ..Default::default()
1586            },
1587        )
1588        .await
1589        .unwrap();
1590        conn.set_committer_watermark(
1591            C::NAME,
1592            CommitterWatermark {
1593                checkpoint_hi_inclusive: 15,
1594                ..Default::default()
1595            },
1596        )
1597        .await
1598        .unwrap();
1599        conn.set_committer_watermark(
1600            D::NAME,
1601            CommitterWatermark {
1602                checkpoint_hi_inclusive: 20,
1603                ..Default::default()
1604            },
1605        )
1606        .await
1607        .unwrap();
1608
1609        // Create synthetic ingestion data
1610        let temp_dir = tempfile::tempdir().unwrap();
1611        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1612            ingestion_dir: temp_dir.path().to_owned(),
1613            starting_checkpoint: 5,
1614            num_checkpoints: 25,
1615            checkpoint_size: 1,
1616        })
1617        .await;
1618
1619        let indexer_args = IndexerArgs {
1620            first_checkpoint: Some(10),
1621            last_checkpoint: Some(29),
1622            ..Default::default()
1623        };
1624
1625        let client_args = ClientArgs {
1626            ingestion: IngestionClientArgs {
1627                local_ingestion_path: Some(temp_dir.path().to_owned()),
1628                ..Default::default()
1629            },
1630            ..Default::default()
1631        };
1632
1633        let ingestion_config = IngestionConfig::default();
1634
1635        let mut indexer = Indexer::new(
1636            store.clone(),
1637            indexer_args,
1638            client_args,
1639            ingestion_config,
1640            None,
1641            &registry,
1642        )
1643        .await
1644        .unwrap();
1645
1646        indexer
1647            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1648            .await
1649            .unwrap();
1650        indexer
1651            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1652            .await
1653            .unwrap();
1654        indexer
1655            .sequential_pipeline::<C>(C, SequentialConfig::default())
1656            .await
1657            .unwrap();
1658        indexer
1659            .sequential_pipeline::<D>(D, SequentialConfig::default())
1660            .await
1661            .unwrap();
1662
1663        let ingestion_metrics = indexer.ingestion_metrics().clone();
1664        let metrics = indexer.indexer_metrics().clone();
1665        indexer.run().await.unwrap().join().await.unwrap();
1666
1667        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1668        assert_eq!(
1669            metrics
1670                .total_watermarks_out_of_order
1671                .get_metric_with_label_values(&[A::NAME])
1672                .unwrap()
1673                .get(),
1674            0
1675        );
1676        assert_eq!(
1677            metrics
1678                .total_watermarks_out_of_order
1679                .get_metric_with_label_values(&[B::NAME])
1680                .unwrap()
1681                .get(),
1682            1
1683        );
1684        assert_eq!(
1685            metrics
1686                .total_watermarks_out_of_order
1687                .get_metric_with_label_values(&[C::NAME])
1688                .unwrap()
1689                .get(),
1690            6
1691        );
1692        assert_eq!(
1693            metrics
1694                .total_watermarks_out_of_order
1695                .get_metric_with_label_values(&[D::NAME])
1696                .unwrap()
1697                .get(),
1698            11
1699        );
1700    }
1701
1702    #[tokio::test]
1703    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1704        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1705        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1706        assert_eq!(committer_watermark, None);
1707        assert_eq!(pruner_watermark, None);
1708    }
1709
1710    #[tokio::test]
1711    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1712        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1713        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1714        assert_eq!(committer_watermark, None);
1715        assert_eq!(pruner_watermark, None);
1716    }
1717
1718    #[tokio::test]
1719    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1720        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1721
1722        let committer_watermark = committer_watermark.unwrap();
1723        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1724
1725        let pruner_watermark = pruner_watermark.unwrap();
1726        assert_eq!(pruner_watermark.reader_lo, 1);
1727        assert_eq!(pruner_watermark.pruner_hi, 1);
1728    }
1729
1730    #[tokio::test]
1731    async fn test_init_watermark_sequential() {
1732        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1733
1734        let committer_watermark = committer_watermark.unwrap();
1735        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1736
1737        let pruner_watermark = pruner_watermark.unwrap();
1738        assert_eq!(pruner_watermark.reader_lo, 1);
1739        assert_eq!(pruner_watermark.pruner_hi, 1);
1740    }
1741
1742    #[tokio::test]
1743    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1744        let registry = Registry::new();
1745        let store = MockStore::default();
1746
1747        // Set up different watermarks for three different sequential pipelines
1748        let mut conn = store.connect().await.unwrap();
1749
1750        // First handler at checkpoint 10
1751        conn.set_committer_watermark(
1752            MockHandler::NAME,
1753            CommitterWatermark {
1754                epoch_hi_inclusive: 0,
1755                checkpoint_hi_inclusive: 10,
1756                tx_hi: 20,
1757                timestamp_ms_hi_inclusive: 10000,
1758            },
1759        )
1760        .await
1761        .unwrap();
1762
1763        // SequentialHandler at checkpoint 5
1764        conn.set_committer_watermark(
1765            SequentialHandler::NAME,
1766            CommitterWatermark {
1767                epoch_hi_inclusive: 0,
1768                checkpoint_hi_inclusive: 5,
1769                tx_hi: 10,
1770                timestamp_ms_hi_inclusive: 5000,
1771            },
1772        )
1773        .await
1774        .unwrap();
1775
1776        // Create synthetic ingestion data
1777        let temp_dir = tempfile::tempdir().unwrap();
1778        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1779            ingestion_dir: temp_dir.path().to_owned(),
1780            starting_checkpoint: 0,
1781            num_checkpoints: 20,
1782            checkpoint_size: 2,
1783        })
1784        .await;
1785
1786        let indexer_args = IndexerArgs {
1787            first_checkpoint: None,
1788            last_checkpoint: Some(19),
1789            pipeline: vec![],
1790            ..Default::default()
1791        };
1792
1793        let client_args = ClientArgs {
1794            ingestion: IngestionClientArgs {
1795                local_ingestion_path: Some(temp_dir.path().to_owned()),
1796                ..Default::default()
1797            },
1798            ..Default::default()
1799        };
1800
1801        let ingestion_config = IngestionConfig::default();
1802
1803        let mut indexer = Indexer::new(
1804            store.clone(),
1805            indexer_args,
1806            client_args,
1807            ingestion_config,
1808            None,
1809            &registry,
1810        )
1811        .await
1812        .unwrap();
1813
1814        // Add first sequential pipeline
1815        indexer
1816            .sequential_pipeline(
1817                MockHandler,
1818                pipeline::sequential::SequentialConfig::default(),
1819            )
1820            .await
1821            .unwrap();
1822
1823        // Verify initial_commit_hi is set correctly (10 + 1 = 11)
1824        assert_eq!(
1825            indexer.initial_commit_hi(),
1826            Some(11),
1827            "initial_commit_hi should be 11"
1828        );
1829
1830        // Add second sequential pipeline
1831        indexer
1832            .sequential_pipeline(
1833                SequentialHandler,
1834                pipeline::sequential::SequentialConfig::default(),
1835            )
1836            .await
1837            .unwrap();
1838
1839        // Should change to 6 (minimum of 6 and 11)
1840        assert_eq!(
1841            indexer.initial_commit_hi(),
1842            Some(6),
1843            "initial_commit_hi should still be 6"
1844        );
1845
1846        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1847        indexer.run().await.unwrap().join().await.unwrap();
1848
1849        // Verify each pipeline made some progress independently
1850        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1851        let watermark2 = conn
1852            .committer_watermark(SequentialHandler::NAME)
1853            .await
1854            .unwrap();
1855
1856        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1857        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1858    }
1859
1860    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1861    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1862    /// committing checkpoints less than the main pipeline's reader watermark.
1863    #[tokio::test]
1864    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1865        let registry = Registry::new();
1866        let store = MockStore::default();
1867
1868        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1869        // reader watermark at `7`.
1870        let mut conn = store.connect().await.unwrap();
1871        conn.set_committer_watermark(
1872            MockCheckpointSequenceNumberHandler::NAME,
1873            CommitterWatermark {
1874                checkpoint_hi_inclusive: 10,
1875                ..Default::default()
1876            },
1877        )
1878        .await
1879        .unwrap();
1880        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1881            .await
1882            .unwrap();
1883
1884        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1885        // be ignored by the tasked indexer.
1886        let indexer_args = IndexerArgs {
1887            first_checkpoint: Some(0),
1888            last_checkpoint: Some(15),
1889            pipeline: vec![],
1890            task: TaskArgs::tasked("task".to_string(), 10),
1891        };
1892        let temp_dir = tempfile::tempdir().unwrap();
1893        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1894            ingestion_dir: temp_dir.path().to_owned(),
1895            starting_checkpoint: 0,
1896            num_checkpoints: 16,
1897            checkpoint_size: 2,
1898        })
1899        .await;
1900
1901        let client_args = ClientArgs {
1902            ingestion: IngestionClientArgs {
1903                local_ingestion_path: Some(temp_dir.path().to_owned()),
1904                ..Default::default()
1905            },
1906            ..Default::default()
1907        };
1908
1909        let ingestion_config = IngestionConfig::default();
1910
1911        let mut tasked_indexer = Indexer::new(
1912            store.clone(),
1913            indexer_args,
1914            client_args,
1915            ingestion_config,
1916            None,
1917            &registry,
1918        )
1919        .await
1920        .unwrap();
1921
1922        let _ = tasked_indexer
1923            .concurrent_pipeline(
1924                MockCheckpointSequenceNumberHandler,
1925                ConcurrentConfig::default(),
1926            )
1927            .await;
1928
1929        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1930        let metrics = tasked_indexer.indexer_metrics().clone();
1931
1932        tasked_indexer.run().await.unwrap().join().await.unwrap();
1933
1934        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1935        assert_eq!(
1936            metrics
1937                .total_collector_skipped_checkpoints
1938                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1939                .unwrap()
1940                .get(),
1941            7
1942        );
1943        let data = store
1944            .data
1945            .get(MockCheckpointSequenceNumberHandler::NAME)
1946            .unwrap();
1947        assert_eq!(data.len(), 9);
1948        for i in 0..7 {
1949            assert!(data.get(&i).is_none());
1950        }
1951        for i in 7..16 {
1952            assert!(data.get(&i).is_some());
1953        }
1954    }
1955
1956    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1957    #[tokio::test]
1958    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1959        let registry = Registry::new();
1960        let store = MockStore::default();
1961
1962        let mut conn = store.connect().await.unwrap();
1963        conn.set_committer_watermark(
1964            "test",
1965            CommitterWatermark {
1966                checkpoint_hi_inclusive: 10,
1967                ..Default::default()
1968            },
1969        )
1970        .await
1971        .unwrap();
1972        conn.set_reader_watermark("test", 5).await.unwrap();
1973
1974        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1975        // watermarks.
1976        let indexer_args = IndexerArgs {
1977            first_checkpoint: Some(9),
1978            last_checkpoint: Some(25),
1979            pipeline: vec![],
1980            task: TaskArgs::tasked("task".to_string(), 10),
1981        };
1982        let temp_dir = tempfile::tempdir().unwrap();
1983        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1984            ingestion_dir: temp_dir.path().to_owned(),
1985            starting_checkpoint: 9,
1986            num_checkpoints: 17,
1987            checkpoint_size: 2,
1988        })
1989        .await;
1990
1991        let client_args = ClientArgs {
1992            ingestion: IngestionClientArgs {
1993                local_ingestion_path: Some(temp_dir.path().to_owned()),
1994                ..Default::default()
1995            },
1996            ..Default::default()
1997        };
1998
1999        let ingestion_config = IngestionConfig::default();
2000
2001        let mut tasked_indexer = Indexer::new(
2002            store.clone(),
2003            indexer_args,
2004            client_args,
2005            ingestion_config,
2006            None,
2007            &registry,
2008        )
2009        .await
2010        .unwrap();
2011
2012        let _ = tasked_indexer
2013            .concurrent_pipeline(
2014                MockCheckpointSequenceNumberHandler,
2015                ConcurrentConfig::default(),
2016            )
2017            .await;
2018
2019        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2020        let metrics = tasked_indexer.indexer_metrics().clone();
2021
2022        tasked_indexer.run().await.unwrap().join().await.unwrap();
2023
2024        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2025        assert_eq!(
2026            metrics
2027                .total_watermarks_out_of_order
2028                .get_metric_with_label_values(&["test"])
2029                .unwrap()
2030                .get(),
2031            0
2032        );
2033        assert_eq!(
2034            metrics
2035                .total_collector_skipped_checkpoints
2036                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2037                .unwrap()
2038                .get(),
2039            0
2040        );
2041
2042        let data = store.data.get("test").unwrap();
2043        assert!(data.len() == 17);
2044        for i in 0..9 {
2045            assert!(data.get(&i).is_none());
2046        }
2047        for i in 9..26 {
2048            assert!(data.get(&i).is_some());
2049        }
2050        let main_pipeline_watermark = store.watermark("test").unwrap();
2051        // assert that the main pipeline's watermarks are not updated
2052        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2053        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2054        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2055        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2056        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2057    }
2058
2059    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2060    /// committed, and any checkpoints inflight < X will be skipped.
2061    #[tokio::test]
2062    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2063        let registry = Registry::new();
2064        let store = MockStore::default();
2065        let mut conn = store.connect().await.unwrap();
2066        // Set the main pipeline watermark.
2067        conn.set_committer_watermark(
2068            ControllableHandler::NAME,
2069            CommitterWatermark {
2070                checkpoint_hi_inclusive: 11,
2071                ..Default::default()
2072            },
2073        )
2074        .await
2075        .unwrap();
2076
2077        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2078        let temp_dir = tempfile::tempdir().unwrap();
2079        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2080            ingestion_dir: temp_dir.path().to_owned(),
2081            starting_checkpoint: 0,
2082            num_checkpoints: 501,
2083            checkpoint_size: 2,
2084        })
2085        .await;
2086        let indexer_args = IndexerArgs {
2087            first_checkpoint: Some(0),
2088            last_checkpoint: Some(500),
2089            pipeline: vec![],
2090            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2091        };
2092        let client_args = ClientArgs {
2093            ingestion: IngestionClientArgs {
2094                local_ingestion_path: Some(temp_dir.path().to_owned()),
2095                ..Default::default()
2096            },
2097            ..Default::default()
2098        };
2099        let ingestion_config = IngestionConfig::default();
2100        let mut tasked_indexer = Indexer::new(
2101            store.clone(),
2102            indexer_args,
2103            client_args,
2104            ingestion_config,
2105            None,
2106            &registry,
2107        )
2108        .await
2109        .unwrap();
2110        let mut allow_process = 10;
2111        // Limit the pipeline to process only checkpoints `[0, 10]`.
2112        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2113        let _ = tasked_indexer
2114            .concurrent_pipeline(
2115                controllable_handler,
2116                ConcurrentConfig {
2117                    committer: CommitterConfig {
2118                        collect_interval_ms: 10,
2119                        watermark_interval_ms: 10,
2120                        ..Default::default()
2121                    },
2122                    ..Default::default()
2123                },
2124            )
2125            .await;
2126        let metrics = tasked_indexer.indexer_metrics().clone();
2127
2128        let mut s_indexer = tasked_indexer.run().await.unwrap();
2129
2130        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2131        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2132        // committed.
2133        store
2134            .wait_for_watermark(
2135                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2136                10,
2137                std::time::Duration::from_secs(10),
2138            )
2139            .await;
2140
2141        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2142        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2143        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2144        // one at a time until the collector_reader_lo metric shows the new value.
2145        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2146            .await
2147            .unwrap();
2148
2149        let reader_lo = metrics
2150            .collector_reader_lo
2151            .with_label_values(&[ControllableHandler::NAME]);
2152
2153        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2154        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2155        // checkpoints have been processed.
2156        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2157        while reader_lo.get() != 250 {
2158            interval.tick().await;
2159            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2160            allow_process += 1;
2161            assert!(
2162                allow_process <= 500,
2163                "Released all checkpoints but collector never observed new reader_lo"
2164            );
2165            process_below.send(allow_process).ok();
2166        }
2167
2168        // At this point, the collector has observed reader_lo = 250. Release all remaining
2169        // checkpoints. Guarantees:
2170        // - [0, 10]: committed (before reader_lo was set)
2171        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2172        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2173        // - [250, 500]: committed (>= reader_lo)
2174        process_below.send(500).ok();
2175
2176        s_indexer.join().await.unwrap();
2177
2178        let data = store.data.get(ControllableHandler::NAME).unwrap();
2179
2180        // Checkpoints (allow_process, 250) must be skipped.
2181        for chkpt in (allow_process + 1)..250 {
2182            assert!(
2183                data.get(&chkpt).is_none(),
2184                "Checkpoint {chkpt} should have been skipped"
2185            );
2186        }
2187
2188        // Checkpoints >= reader_lo must be committed.
2189        for chkpt in 250..=500 {
2190            assert!(
2191                data.get(&chkpt).is_some(),
2192                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2193            );
2194        }
2195
2196        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2197        for chkpt in 0..=10 {
2198            assert!(
2199                data.get(&chkpt).is_some(),
2200                "Checkpoint {chkpt} should have been committed (baseline)"
2201            );
2202        }
2203    }
2204}