sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50/// Command-line arguments for the indexer
51#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53    /// Override the next checkpoint for all pipelines without a committer watermark to start
54    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
55    /// setting and always resume from their committer watermark + 1.
56    ///
57    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
58    /// is the minimum across all pipelines' next checkpoints.
59    #[arg(long)]
60    pub first_checkpoint: Option<u64>,
61
62    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
63    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
64    #[arg(long)]
65    pub last_checkpoint: Option<u64>,
66
67    /// Only run the following pipelines. If not provided, all pipelines found in the
68    /// configuration file will be run.
69    #[arg(long, action = clap::ArgAction::Append)]
70    pub pipeline: Vec<String>,
71
72    /// Additional configurations for running a tasked indexer.
73    #[clap(flatten)]
74    pub task: TaskArgs,
75}
76
77/// Command-line arguments for configuring a tasked indexer.
78#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
81    /// delimiter defined on the store. This allows the same pipelines to run under multiple
82    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
83    /// entries in the database.
84    ///
85    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
86    ///
87    /// Sequential pipelines cannot be attached to a tasked indexer.
88    ///
89    /// The framework ensures that tasked pipelines never commit checkpoints below the main
90    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
91    #[arg(long, requires = "reader_interval_ms")]
92    task: Option<String>,
93
94    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
95    /// refetch its main pipeline's reader watermark. This is required when `--task` is set.
96    #[arg(long, requires = "task")]
97    reader_interval_ms: Option<u64>,
98}
99
100pub struct Indexer<S: Store> {
101    /// The storage backend that the indexer uses to write and query indexed data. This
102    /// generic implementation allows for plugging in different storage solutions that implement the
103    /// `Store` trait.
104    store: S,
105
106    /// Prometheus Metrics.
107    metrics: Arc<IndexerMetrics>,
108
109    /// Service for downloading and disseminating checkpoint data.
110    ingestion_service: IngestionService,
111
112    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
113    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
114    /// always resume from their committer watermark + 1.
115    ///
116    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
117    /// is the minimum across all pipelines' next checkpoints.
118    default_next_checkpoint: u64,
119
120    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
121    /// this checkpoint.
122    last_checkpoint: Option<u64>,
123
124    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
125    /// delimiter defined on the store. This allows the same pipelines to run under multiple
126    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
127    /// entries in the database.
128    ///
129    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
130    ///
131    /// Sequential pipelines cannot be attached to a tasked indexer.
132    ///
133    /// The framework ensures that tasked pipelines never commit checkpoints below the main
134    /// pipeline’s pruner watermark.
135    task: Option<Task>,
136
137    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
138    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
139    /// a warning when the indexer is run.
140    enabled_pipelines: Option<BTreeSet<String>>,
141
142    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
143    /// with the same name isn't added twice.
144    added_pipelines: BTreeSet<&'static str>,
145
146    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
147    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
148    /// unless overridden by [Self::default_next_checkpoint].
149    first_ingestion_checkpoint: u64,
150
151    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
152    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
153    next_sequential_checkpoint: Option<u64>,
154
155    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
156    pipelines: Vec<Service>,
157}
158
159/// Configuration for a tasked indexer.
160#[derive(Clone)]
161pub(crate) struct Task {
162    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
163    /// record pipeline watermarks.
164    task: String,
165    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
166    /// pipeline's reader watermark.
167    reader_interval: Duration,
168}
169
170impl TaskArgs {
171    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
172        Self {
173            task: Some(task),
174            reader_interval_ms: Some(reader_interval_ms),
175        }
176    }
177
178    fn into_task(self) -> Option<Task> {
179        Some(Task {
180            task: self.task?,
181            reader_interval: Duration::from_millis(self.reader_interval_ms?),
182        })
183    }
184}
185
186impl<S: Store> Indexer<S> {
187    /// Create a new instance of the indexer framework from a store that implements the `Store`
188    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
189    /// arguments configure the following:
190    ///
191    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
192    ///   watermarks) and where to serve metrics from,
193    /// - Where to download checkpoints from,
194    /// - Concurrency and buffering parameters for downloading checkpoints.
195    ///
196    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
197    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
198    pub async fn new(
199        store: S,
200        indexer_args: IndexerArgs,
201        client_args: ClientArgs,
202        ingestion_config: IngestionConfig,
203        metrics_prefix: Option<&str>,
204        registry: &Registry,
205    ) -> Result<Self> {
206        let IndexerArgs {
207            first_checkpoint,
208            last_checkpoint,
209            pipeline,
210            task,
211        } = indexer_args;
212
213        let metrics = IndexerMetrics::new(metrics_prefix, registry);
214
215        let ingestion_service =
216            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
217
218        Ok(Self {
219            store,
220            metrics,
221            ingestion_service,
222            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
223            last_checkpoint,
224            task: task.into_task(),
225            enabled_pipelines: if pipeline.is_empty() {
226                None
227            } else {
228                Some(pipeline.into_iter().collect())
229            },
230            added_pipelines: BTreeSet::new(),
231            first_ingestion_checkpoint: u64::MAX,
232            next_sequential_checkpoint: None,
233            pipelines: vec![],
234        })
235    }
236
237    /// The store used by the indexer.
238    pub fn store(&self) -> &S {
239        &self.store
240    }
241
242    /// The ingestion client used by the indexer to fetch checkpoints.
243    pub fn ingestion_client(&self) -> &IngestionClient {
244        self.ingestion_service.ingestion_client()
245    }
246
247    /// The indexer's metrics.
248    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
249        &self.metrics
250    }
251
252    /// The ingestion service's metrics.
253    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
254        self.ingestion_service.metrics()
255    }
256
257    /// The pipelines that this indexer will run.
258    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
259        self.added_pipelines.iter().copied().filter(|p| {
260            self.enabled_pipelines
261                .as_ref()
262                .is_none_or(|e| e.contains(*p))
263        })
264    }
265
266    /// The minimum next checkpoint across all sequential pipelines. This value is used to
267    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
268    /// too far ahead of sequential pipelines.
269    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
270        self.next_sequential_checkpoint
271    }
272
273    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
274    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
275    ///
276    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
277    /// keep the watermark table up-to-date with the highest point they can guarantee all data
278    /// exists for, for their pipeline.
279    pub async fn concurrent_pipeline<H>(
280        &mut self,
281        handler: H,
282        config: ConcurrentConfig,
283    ) -> Result<()>
284    where
285        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
286    {
287        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
288            return Ok(());
289        };
290
291        self.pipelines.push(concurrent::pipeline::<H>(
292            handler,
293            next_checkpoint,
294            config,
295            self.store.clone(),
296            self.task.clone(),
297            self.ingestion_service.subscribe().0,
298            self.metrics.clone(),
299        ));
300
301        Ok(())
302    }
303
304    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
305    /// will start processing and committing once the ingestion service has caught up to their
306    /// respective watermarks.
307    ///
308    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
309    pub async fn run(self) -> Result<Service> {
310        if let Some(enabled_pipelines) = self.enabled_pipelines {
311            ensure!(
312                enabled_pipelines.is_empty(),
313                "Tried to enable pipelines that this indexer does not know about: \
314                {enabled_pipelines:#?}",
315            );
316        }
317
318        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
319
320        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
321
322        let mut service = self
323            .ingestion_service
324            .run(
325                self.first_ingestion_checkpoint..=last_checkpoint,
326                self.next_sequential_checkpoint,
327            )
328            .await
329            .context("Failed to start ingestion service")?;
330
331        for pipeline in self.pipelines {
332            service = service.merge(pipeline);
333        }
334
335        Ok(service)
336    }
337
338    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
339    /// checkpoint after its watermark, or if that doesn't exist, then the provided
340    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
341    ///
342    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
343    /// calculated above.
344    ///
345    /// Returns `Ok(None)` if the pipeline is disabled.
346    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
347        ensure!(
348            self.added_pipelines.insert(P::NAME),
349            "Pipeline {:?} already added",
350            P::NAME,
351        );
352
353        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
354            && !enabled_pipelines.remove(P::NAME)
355        {
356            info!(pipeline = P::NAME, "Skipping");
357            return Ok(None);
358        }
359
360        let mut conn = self
361            .store
362            .connect()
363            .await
364            .context("Failed to establish connection to store")?;
365
366        let pipeline_task =
367            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
368
369        let checkpoint_hi_inclusive = conn
370            .init_watermark(&pipeline_task, self.default_next_checkpoint)
371            .await
372            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
373
374        let next_checkpoint =
375            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
376
377        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
378
379        Ok(Some(next_checkpoint))
380    }
381}
382
383impl<T: TransactionalStore> Indexer<T> {
384    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
385    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
386    ///
387    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
388    /// required to handle pipelines that modify data in-place (where each update is not an insert,
389    /// but could be a modification of an existing row, where ordering between updates is
390    /// important).
391    ///
392    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
393    /// number of checkpoints (configured by `checkpoint_lag`).
394    pub async fn sequential_pipeline<H>(
395        &mut self,
396        handler: H,
397        config: SequentialConfig,
398    ) -> Result<()>
399    where
400        H: Handler<Store = T> + Send + Sync + 'static,
401    {
402        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
403            return Ok(());
404        };
405
406        if self.task.is_some() {
407            bail!(
408                "Sequential pipelines do not support pipeline tasks. \
409                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
410                Running the same pipeline under a different task would violate these guarantees."
411            );
412        }
413
414        // Track the minimum next_checkpoint across all sequential pipelines
415        self.next_sequential_checkpoint = Some(
416            self.next_sequential_checkpoint
417                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
418        );
419
420        let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
421
422        self.pipelines.push(sequential::pipeline::<H>(
423            handler,
424            next_checkpoint,
425            config,
426            self.store.clone(),
427            checkpoint_rx,
428            watermark_tx,
429            self.metrics.clone(),
430        ));
431
432        Ok(())
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use std::sync::Arc;
439
440    use async_trait::async_trait;
441    use clap::Parser;
442    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
443    use sui_synthetic_ingestion::synthetic_ingestion;
444    use tokio::sync::watch;
445
446    use crate::FieldCount;
447    use crate::ingestion::ingestion_client::IngestionClientArgs;
448    use crate::mocks::store::MockStore;
449    use crate::pipeline::CommitterConfig;
450    use crate::pipeline::Processor;
451    use crate::pipeline::concurrent::ConcurrentConfig;
452    use crate::store::CommitterWatermark;
453
454    use super::*;
455
456    #[allow(dead_code)]
457    #[derive(Clone, FieldCount)]
458    struct MockValue(u64);
459
460    /// A handler that can be controlled externally to block checkpoint processing.
461    struct ControllableHandler {
462        /// Process checkpoints less than or equal to this value.
463        process_below: watch::Receiver<u64>,
464    }
465
466    impl ControllableHandler {
467        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
468            let (tx, rx) = watch::channel(limit);
469            (Self { process_below: rx }, tx)
470        }
471    }
472
473    #[async_trait]
474    impl Processor for ControllableHandler {
475        const NAME: &'static str = "controllable";
476        /// The checkpoints to process come out of order. To account for potential flakiness in test
477        /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to
478        /// the total number of checkpoints to process, so we can ensure the correct checkpoints are
479        /// processed.
480        const FANOUT: usize = 501;
481        type Value = MockValue;
482
483        async fn process(
484            &self,
485            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
486        ) -> anyhow::Result<Vec<Self::Value>> {
487            let cp_num = checkpoint.summary.sequence_number;
488
489            // Wait until the checkpoint is allowed to be processed
490            self.process_below
491                .clone()
492                .wait_for(|&limit| cp_num <= limit)
493                .await
494                .ok();
495
496            Ok(vec![MockValue(cp_num)])
497        }
498    }
499
500    #[async_trait]
501    impl crate::pipeline::concurrent::Handler for ControllableHandler {
502        type Store = MockStore;
503        type Batch = Vec<MockValue>;
504
505        fn batch(
506            &self,
507            batch: &mut Self::Batch,
508            values: &mut std::vec::IntoIter<Self::Value>,
509        ) -> crate::pipeline::concurrent::BatchStatus {
510            batch.extend(values);
511            crate::pipeline::concurrent::BatchStatus::Ready
512        }
513
514        async fn commit<'a>(
515            &self,
516            batch: &Self::Batch,
517            conn: &mut <Self::Store as Store>::Connection<'a>,
518        ) -> anyhow::Result<usize> {
519            for value in batch {
520                conn.0
521                    .commit_data(Self::NAME, value.0, vec![value.0])
522                    .await?;
523            }
524            Ok(batch.len())
525        }
526    }
527
528    macro_rules! test_pipeline {
529        ($handler:ident, $name:literal) => {
530            struct $handler;
531
532            #[async_trait]
533            impl Processor for $handler {
534                const NAME: &'static str = $name;
535                type Value = MockValue;
536                async fn process(
537                    &self,
538                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
539                ) -> anyhow::Result<Vec<Self::Value>> {
540                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
541                }
542            }
543
544            #[async_trait]
545            impl crate::pipeline::concurrent::Handler for $handler {
546                type Store = MockStore;
547                type Batch = Vec<Self::Value>;
548
549                fn batch(
550                    &self,
551                    batch: &mut Self::Batch,
552                    values: &mut std::vec::IntoIter<Self::Value>,
553                ) -> crate::pipeline::concurrent::BatchStatus {
554                    batch.extend(values);
555                    crate::pipeline::concurrent::BatchStatus::Pending
556                }
557
558                async fn commit<'a>(
559                    &self,
560                    batch: &Self::Batch,
561                    conn: &mut <Self::Store as Store>::Connection<'a>,
562                ) -> anyhow::Result<usize> {
563                    for value in batch {
564                        conn.0
565                            .commit_data(Self::NAME, value.0, vec![value.0])
566                            .await?;
567                    }
568                    Ok(batch.len())
569                }
570            }
571
572            #[async_trait]
573            impl crate::pipeline::sequential::Handler for $handler {
574                type Store = MockStore;
575                type Batch = Vec<Self::Value>;
576
577                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
578                    batch.extend(values);
579                }
580
581                async fn commit<'a>(
582                    &self,
583                    _batch: &Self::Batch,
584                    _conn: &mut <Self::Store as Store>::Connection<'a>,
585                ) -> anyhow::Result<usize> {
586                    Ok(1)
587                }
588            }
589        };
590    }
591
592    test_pipeline!(MockHandler, "test_processor");
593    test_pipeline!(SequentialHandler, "sequential_handler");
594    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
595
596    async fn test_init_watermark(
597        first_checkpoint: Option<u64>,
598        is_concurrent: bool,
599    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
600        let registry = Registry::new();
601        let store = MockStore::default();
602
603        test_pipeline!(A, "pipeline_name");
604
605        let mut conn = store.connect().await.unwrap();
606
607        let indexer_args = IndexerArgs {
608            first_checkpoint,
609            ..IndexerArgs::default()
610        };
611        let temp_dir = tempfile::tempdir().unwrap();
612        let client_args = ClientArgs {
613            ingestion: IngestionClientArgs {
614                local_ingestion_path: Some(temp_dir.path().to_owned()),
615                ..Default::default()
616            },
617            ..Default::default()
618        };
619        let ingestion_config = IngestionConfig::default();
620
621        let mut indexer = Indexer::new(
622            store.clone(),
623            indexer_args,
624            client_args,
625            ingestion_config,
626            None,
627            &registry,
628        )
629        .await
630        .unwrap();
631
632        if is_concurrent {
633            indexer
634                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
635                .await
636                .unwrap();
637        } else {
638            indexer
639                .sequential_pipeline::<A>(A, SequentialConfig::default())
640                .await
641                .unwrap();
642        }
643
644        (
645            conn.committer_watermark(A::NAME).await.unwrap(),
646            conn.pruner_watermark(A::NAME, Duration::ZERO)
647                .await
648                .unwrap(),
649        )
650    }
651
652    #[test]
653    fn test_arg_parsing() {
654        #[derive(Parser)]
655        struct Args {
656            #[clap(flatten)]
657            indexer: IndexerArgs,
658        }
659
660        let args = Args::try_parse_from([
661            "cmd",
662            "--first-checkpoint",
663            "10",
664            "--last-checkpoint",
665            "100",
666            "--pipeline",
667            "a",
668            "--pipeline",
669            "b",
670            "--task",
671            "t",
672            "--reader-interval-ms",
673            "5000",
674        ])
675        .unwrap();
676
677        assert_eq!(args.indexer.first_checkpoint, Some(10));
678        assert_eq!(args.indexer.last_checkpoint, Some(100));
679        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
680        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
681        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
682    }
683
684    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
685    #[tokio::test]
686    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
687        let registry = Registry::new();
688        let store = MockStore::default();
689
690        test_pipeline!(A, "concurrent_a");
691        test_pipeline!(B, "concurrent_b");
692        test_pipeline!(C, "sequential_c");
693        test_pipeline!(D, "sequential_d");
694
695        let mut conn = store.connect().await.unwrap();
696        conn.set_committer_watermark(
697            A::NAME,
698            CommitterWatermark {
699                checkpoint_hi_inclusive: 100,
700                ..Default::default()
701            },
702        )
703        .await
704        .unwrap();
705        conn.set_committer_watermark(
706            B::NAME,
707            CommitterWatermark {
708                checkpoint_hi_inclusive: 10,
709                ..Default::default()
710            },
711        )
712        .await
713        .unwrap();
714        conn.set_committer_watermark(
715            C::NAME,
716            CommitterWatermark {
717                checkpoint_hi_inclusive: 1,
718                ..Default::default()
719            },
720        )
721        .await
722        .unwrap();
723        conn.set_committer_watermark(
724            D::NAME,
725            CommitterWatermark {
726                checkpoint_hi_inclusive: 50,
727                ..Default::default()
728            },
729        )
730        .await
731        .unwrap();
732
733        let indexer_args = IndexerArgs::default();
734        let temp_dir = tempfile::tempdir().unwrap();
735        let client_args = ClientArgs {
736            ingestion: IngestionClientArgs {
737                local_ingestion_path: Some(temp_dir.path().to_owned()),
738                ..Default::default()
739            },
740            ..Default::default()
741        };
742        let ingestion_config = IngestionConfig::default();
743
744        let mut indexer = Indexer::new(
745            store,
746            indexer_args,
747            client_args,
748            ingestion_config,
749            None,
750            &registry,
751        )
752        .await
753        .unwrap();
754
755        indexer
756            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
757            .await
758            .unwrap();
759        indexer
760            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
761            .await
762            .unwrap();
763        indexer
764            .sequential_pipeline::<C>(C, SequentialConfig::default())
765            .await
766            .unwrap();
767        indexer
768            .sequential_pipeline::<D>(D, SequentialConfig::default())
769            .await
770            .unwrap();
771
772        assert_eq!(indexer.first_ingestion_checkpoint, 2);
773    }
774
775    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
776    #[tokio::test]
777    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
778        let registry = Registry::new();
779        let store = MockStore::default();
780
781        test_pipeline!(A, "concurrent_a");
782        test_pipeline!(B, "concurrent_b");
783        test_pipeline!(C, "sequential_c");
784        test_pipeline!(D, "sequential_d");
785
786        let mut conn = store.connect().await.unwrap();
787        conn.set_committer_watermark(
788            B::NAME,
789            CommitterWatermark {
790                checkpoint_hi_inclusive: 10,
791                ..Default::default()
792            },
793        )
794        .await
795        .unwrap();
796        conn.set_committer_watermark(
797            C::NAME,
798            CommitterWatermark {
799                checkpoint_hi_inclusive: 1,
800                ..Default::default()
801            },
802        )
803        .await
804        .unwrap();
805
806        let indexer_args = IndexerArgs::default();
807        let temp_dir = tempfile::tempdir().unwrap();
808        let client_args = ClientArgs {
809            ingestion: IngestionClientArgs {
810                local_ingestion_path: Some(temp_dir.path().to_owned()),
811                ..Default::default()
812            },
813            ..Default::default()
814        };
815        let ingestion_config = IngestionConfig::default();
816
817        let mut indexer = Indexer::new(
818            store,
819            indexer_args,
820            client_args,
821            ingestion_config,
822            None,
823            &registry,
824        )
825        .await
826        .unwrap();
827
828        indexer
829            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
830            .await
831            .unwrap();
832        indexer
833            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
834            .await
835            .unwrap();
836        indexer
837            .sequential_pipeline::<C>(C, SequentialConfig::default())
838            .await
839            .unwrap();
840        indexer
841            .sequential_pipeline::<D>(D, SequentialConfig::default())
842            .await
843            .unwrap();
844
845        assert_eq!(indexer.first_ingestion_checkpoint, 0);
846    }
847
848    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
849    #[tokio::test]
850    async fn test_first_ingestion_checkpoint_smallest_is_0() {
851        let registry = Registry::new();
852        let store = MockStore::default();
853
854        test_pipeline!(A, "concurrent_a");
855        test_pipeline!(B, "concurrent_b");
856        test_pipeline!(C, "sequential_c");
857        test_pipeline!(D, "sequential_d");
858
859        let mut conn = store.connect().await.unwrap();
860        conn.set_committer_watermark(
861            A::NAME,
862            CommitterWatermark {
863                checkpoint_hi_inclusive: 100,
864                ..Default::default()
865            },
866        )
867        .await
868        .unwrap();
869        conn.set_committer_watermark(
870            B::NAME,
871            CommitterWatermark {
872                checkpoint_hi_inclusive: 10,
873                ..Default::default()
874            },
875        )
876        .await
877        .unwrap();
878        conn.set_committer_watermark(
879            C::NAME,
880            CommitterWatermark {
881                checkpoint_hi_inclusive: 1,
882                ..Default::default()
883            },
884        )
885        .await
886        .unwrap();
887        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
888            .await
889            .unwrap();
890
891        let indexer_args = IndexerArgs::default();
892        let temp_dir = tempfile::tempdir().unwrap();
893        let client_args = ClientArgs {
894            ingestion: IngestionClientArgs {
895                local_ingestion_path: Some(temp_dir.path().to_owned()),
896                ..Default::default()
897            },
898            ..Default::default()
899        };
900        let ingestion_config = IngestionConfig::default();
901
902        let mut indexer = Indexer::new(
903            store,
904            indexer_args,
905            client_args,
906            ingestion_config,
907            None,
908            &registry,
909        )
910        .await
911        .unwrap();
912
913        indexer
914            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
915            .await
916            .unwrap();
917        indexer
918            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
919            .await
920            .unwrap();
921        indexer
922            .sequential_pipeline::<C>(C, SequentialConfig::default())
923            .await
924            .unwrap();
925        indexer
926            .sequential_pipeline::<D>(D, SequentialConfig::default())
927            .await
928            .unwrap();
929
930        assert_eq!(indexer.first_ingestion_checkpoint, 1);
931    }
932
933    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
934    /// watermark, and first_checkpoint is smallest.
935    #[tokio::test]
936    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
937        let registry = Registry::new();
938        let store = MockStore::default();
939
940        test_pipeline!(A, "concurrent_a");
941        test_pipeline!(B, "concurrent_b");
942        test_pipeline!(C, "sequential_c");
943        test_pipeline!(D, "sequential_d");
944
945        let mut conn = store.connect().await.unwrap();
946        conn.set_committer_watermark(
947            B::NAME,
948            CommitterWatermark {
949                checkpoint_hi_inclusive: 50,
950                ..Default::default()
951            },
952        )
953        .await
954        .unwrap();
955        conn.set_committer_watermark(
956            C::NAME,
957            CommitterWatermark {
958                checkpoint_hi_inclusive: 10,
959                ..Default::default()
960            },
961        )
962        .await
963        .unwrap();
964
965        let indexer_args = IndexerArgs {
966            first_checkpoint: Some(5),
967            ..Default::default()
968        };
969        let temp_dir = tempfile::tempdir().unwrap();
970        let client_args = ClientArgs {
971            ingestion: IngestionClientArgs {
972                local_ingestion_path: Some(temp_dir.path().to_owned()),
973                ..Default::default()
974            },
975            ..Default::default()
976        };
977        let ingestion_config = IngestionConfig::default();
978
979        let mut indexer = Indexer::new(
980            store,
981            indexer_args,
982            client_args,
983            ingestion_config,
984            None,
985            &registry,
986        )
987        .await
988        .unwrap();
989
990        indexer
991            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
992            .await
993            .unwrap();
994        indexer
995            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
996            .await
997            .unwrap();
998        indexer
999            .sequential_pipeline::<C>(C, SequentialConfig::default())
1000            .await
1001            .unwrap();
1002        indexer
1003            .sequential_pipeline::<D>(D, SequentialConfig::default())
1004            .await
1005            .unwrap();
1006
1007        assert_eq!(indexer.first_ingestion_checkpoint, 5);
1008    }
1009
1010    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1011    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1012    #[tokio::test]
1013    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1014        let registry = Registry::new();
1015        let store = MockStore::default();
1016
1017        test_pipeline!(B, "concurrent_b");
1018        test_pipeline!(C, "sequential_c");
1019
1020        let mut conn = store.connect().await.unwrap();
1021        conn.set_committer_watermark(
1022            B::NAME,
1023            CommitterWatermark {
1024                checkpoint_hi_inclusive: 50,
1025                ..Default::default()
1026            },
1027        )
1028        .await
1029        .unwrap();
1030        conn.set_committer_watermark(
1031            C::NAME,
1032            CommitterWatermark {
1033                checkpoint_hi_inclusive: 10,
1034                ..Default::default()
1035            },
1036        )
1037        .await
1038        .unwrap();
1039
1040        let indexer_args = IndexerArgs {
1041            first_checkpoint: Some(5),
1042            ..Default::default()
1043        };
1044        let temp_dir = tempfile::tempdir().unwrap();
1045        let client_args = ClientArgs {
1046            ingestion: IngestionClientArgs {
1047                local_ingestion_path: Some(temp_dir.path().to_owned()),
1048                ..Default::default()
1049            },
1050            ..Default::default()
1051        };
1052        let ingestion_config = IngestionConfig::default();
1053
1054        let mut indexer = Indexer::new(
1055            store,
1056            indexer_args,
1057            client_args,
1058            ingestion_config,
1059            None,
1060            &registry,
1061        )
1062        .await
1063        .unwrap();
1064
1065        indexer
1066            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1067            .await
1068            .unwrap();
1069        indexer
1070            .sequential_pipeline::<C>(C, SequentialConfig::default())
1071            .await
1072            .unwrap();
1073
1074        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1075    }
1076
1077    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1078    /// will not be used as the starting point if it is not the smallest valid committer watermark
1079    /// to resume ingesting from.
1080    #[tokio::test]
1081    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1082        let registry = Registry::new();
1083        let store = MockStore::default();
1084
1085        test_pipeline!(A, "concurrent_a");
1086        test_pipeline!(B, "concurrent_b");
1087        test_pipeline!(C, "sequential_c");
1088
1089        let mut conn = store.connect().await.unwrap();
1090        conn.set_committer_watermark(
1091            B::NAME,
1092            CommitterWatermark {
1093                checkpoint_hi_inclusive: 50,
1094                ..Default::default()
1095            },
1096        )
1097        .await
1098        .unwrap();
1099        conn.set_committer_watermark(
1100            C::NAME,
1101            CommitterWatermark {
1102                checkpoint_hi_inclusive: 10,
1103                ..Default::default()
1104            },
1105        )
1106        .await
1107        .unwrap();
1108
1109        let indexer_args = IndexerArgs {
1110            first_checkpoint: Some(24),
1111            ..Default::default()
1112        };
1113        let temp_dir = tempfile::tempdir().unwrap();
1114        let client_args = ClientArgs {
1115            ingestion: IngestionClientArgs {
1116                local_ingestion_path: Some(temp_dir.path().to_owned()),
1117                ..Default::default()
1118            },
1119            ..Default::default()
1120        };
1121        let ingestion_config = IngestionConfig::default();
1122
1123        let mut indexer = Indexer::new(
1124            store,
1125            indexer_args,
1126            client_args,
1127            ingestion_config,
1128            None,
1129            &registry,
1130        )
1131        .await
1132        .unwrap();
1133
1134        indexer
1135            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1136            .await
1137            .unwrap();
1138
1139        indexer
1140            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1141            .await
1142            .unwrap();
1143
1144        indexer
1145            .sequential_pipeline::<C>(C, SequentialConfig::default())
1146            .await
1147            .unwrap();
1148
1149        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1150    }
1151
1152    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1153    #[tokio::test]
1154    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1155        let registry = Registry::new();
1156        let store = MockStore::default();
1157
1158        test_pipeline!(A, "concurrent_a");
1159        test_pipeline!(B, "concurrent_b");
1160        test_pipeline!(C, "sequential_c");
1161        test_pipeline!(D, "sequential_d");
1162
1163        let mut conn = store.connect().await.unwrap();
1164        conn.set_committer_watermark(
1165            A::NAME,
1166            CommitterWatermark {
1167                checkpoint_hi_inclusive: 5,
1168                ..Default::default()
1169            },
1170        )
1171        .await
1172        .unwrap();
1173        conn.set_committer_watermark(
1174            B::NAME,
1175            CommitterWatermark {
1176                checkpoint_hi_inclusive: 10,
1177                ..Default::default()
1178            },
1179        )
1180        .await
1181        .unwrap();
1182        conn.set_committer_watermark(
1183            C::NAME,
1184            CommitterWatermark {
1185                checkpoint_hi_inclusive: 15,
1186                ..Default::default()
1187            },
1188        )
1189        .await
1190        .unwrap();
1191        conn.set_committer_watermark(
1192            D::NAME,
1193            CommitterWatermark {
1194                checkpoint_hi_inclusive: 20,
1195                ..Default::default()
1196            },
1197        )
1198        .await
1199        .unwrap();
1200
1201        // Create synthetic ingestion data
1202        let temp_dir = tempfile::tempdir().unwrap();
1203        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1204            ingestion_dir: temp_dir.path().to_owned(),
1205            starting_checkpoint: 5,
1206            num_checkpoints: 25,
1207            checkpoint_size: 1,
1208        })
1209        .await;
1210
1211        let indexer_args = IndexerArgs {
1212            last_checkpoint: Some(29),
1213            ..Default::default()
1214        };
1215
1216        let client_args = ClientArgs {
1217            ingestion: IngestionClientArgs {
1218                local_ingestion_path: Some(temp_dir.path().to_owned()),
1219                ..Default::default()
1220            },
1221            ..Default::default()
1222        };
1223
1224        let ingestion_config = IngestionConfig::default();
1225
1226        let mut indexer = Indexer::new(
1227            store.clone(),
1228            indexer_args,
1229            client_args,
1230            ingestion_config,
1231            None,
1232            &registry,
1233        )
1234        .await
1235        .unwrap();
1236
1237        indexer
1238            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1239            .await
1240            .unwrap();
1241        indexer
1242            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1243            .await
1244            .unwrap();
1245        indexer
1246            .sequential_pipeline::<C>(C, SequentialConfig::default())
1247            .await
1248            .unwrap();
1249        indexer
1250            .sequential_pipeline::<D>(D, SequentialConfig::default())
1251            .await
1252            .unwrap();
1253
1254        let ingestion_metrics = indexer.ingestion_metrics().clone();
1255        let indexer_metrics = indexer.indexer_metrics().clone();
1256
1257        indexer.run().await.unwrap().join().await.unwrap();
1258
1259        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1260        assert_eq!(
1261            indexer_metrics
1262                .total_watermarks_out_of_order
1263                .get_metric_with_label_values(&[A::NAME])
1264                .unwrap()
1265                .get(),
1266            0
1267        );
1268        assert_eq!(
1269            indexer_metrics
1270                .total_watermarks_out_of_order
1271                .get_metric_with_label_values(&[B::NAME])
1272                .unwrap()
1273                .get(),
1274            5
1275        );
1276        assert_eq!(
1277            indexer_metrics
1278                .total_watermarks_out_of_order
1279                .get_metric_with_label_values(&[C::NAME])
1280                .unwrap()
1281                .get(),
1282            10
1283        );
1284        assert_eq!(
1285            indexer_metrics
1286                .total_watermarks_out_of_order
1287                .get_metric_with_label_values(&[D::NAME])
1288                .unwrap()
1289                .get(),
1290            15
1291        );
1292    }
1293
1294    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1295    #[tokio::test]
1296    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1297        let registry = Registry::new();
1298        let store = MockStore::default();
1299
1300        test_pipeline!(A, "concurrent_a");
1301        test_pipeline!(B, "concurrent_b");
1302        test_pipeline!(C, "sequential_c");
1303        test_pipeline!(D, "sequential_d");
1304
1305        let mut conn = store.connect().await.unwrap();
1306        conn.set_committer_watermark(
1307            A::NAME,
1308            CommitterWatermark {
1309                checkpoint_hi_inclusive: 5,
1310                ..Default::default()
1311            },
1312        )
1313        .await
1314        .unwrap();
1315        conn.set_committer_watermark(
1316            B::NAME,
1317            CommitterWatermark {
1318                checkpoint_hi_inclusive: 10,
1319                ..Default::default()
1320            },
1321        )
1322        .await
1323        .unwrap();
1324        conn.set_committer_watermark(
1325            C::NAME,
1326            CommitterWatermark {
1327                checkpoint_hi_inclusive: 15,
1328                ..Default::default()
1329            },
1330        )
1331        .await
1332        .unwrap();
1333        conn.set_committer_watermark(
1334            D::NAME,
1335            CommitterWatermark {
1336                checkpoint_hi_inclusive: 20,
1337                ..Default::default()
1338            },
1339        )
1340        .await
1341        .unwrap();
1342
1343        // Create synthetic ingestion data
1344        let temp_dir = tempfile::tempdir().unwrap();
1345        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1346            ingestion_dir: temp_dir.path().to_owned(),
1347            starting_checkpoint: 5,
1348            num_checkpoints: 25,
1349            checkpoint_size: 1,
1350        })
1351        .await;
1352
1353        let indexer_args = IndexerArgs {
1354            first_checkpoint: Some(3),
1355            last_checkpoint: Some(29),
1356            ..Default::default()
1357        };
1358
1359        let client_args = ClientArgs {
1360            ingestion: IngestionClientArgs {
1361                local_ingestion_path: Some(temp_dir.path().to_owned()),
1362                ..Default::default()
1363            },
1364            ..Default::default()
1365        };
1366
1367        let ingestion_config = IngestionConfig::default();
1368
1369        let mut indexer = Indexer::new(
1370            store.clone(),
1371            indexer_args,
1372            client_args,
1373            ingestion_config,
1374            None,
1375            &registry,
1376        )
1377        .await
1378        .unwrap();
1379
1380        indexer
1381            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1382            .await
1383            .unwrap();
1384        indexer
1385            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1386            .await
1387            .unwrap();
1388        indexer
1389            .sequential_pipeline::<C>(C, SequentialConfig::default())
1390            .await
1391            .unwrap();
1392        indexer
1393            .sequential_pipeline::<D>(D, SequentialConfig::default())
1394            .await
1395            .unwrap();
1396
1397        let ingestion_metrics = indexer.ingestion_metrics().clone();
1398        let metrics = indexer.indexer_metrics().clone();
1399        indexer.run().await.unwrap().join().await.unwrap();
1400
1401        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1402        assert_eq!(
1403            metrics
1404                .total_watermarks_out_of_order
1405                .get_metric_with_label_values(&[A::NAME])
1406                .unwrap()
1407                .get(),
1408            0
1409        );
1410        assert_eq!(
1411            metrics
1412                .total_watermarks_out_of_order
1413                .get_metric_with_label_values(&[B::NAME])
1414                .unwrap()
1415                .get(),
1416            5
1417        );
1418        assert_eq!(
1419            metrics
1420                .total_watermarks_out_of_order
1421                .get_metric_with_label_values(&[C::NAME])
1422                .unwrap()
1423                .get(),
1424            10
1425        );
1426        assert_eq!(
1427            metrics
1428                .total_watermarks_out_of_order
1429                .get_metric_with_label_values(&[D::NAME])
1430                .unwrap()
1431                .get(),
1432            15
1433        );
1434    }
1435
1436    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1437    #[tokio::test]
1438    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1439        let registry = Registry::new();
1440        let store = MockStore::default();
1441
1442        test_pipeline!(A, "concurrent_a");
1443        test_pipeline!(B, "concurrent_b");
1444        test_pipeline!(C, "sequential_c");
1445        test_pipeline!(D, "sequential_d");
1446
1447        let mut conn = store.connect().await.unwrap();
1448        conn.set_committer_watermark(
1449            B::NAME,
1450            CommitterWatermark {
1451                checkpoint_hi_inclusive: 10,
1452                ..Default::default()
1453            },
1454        )
1455        .await
1456        .unwrap();
1457        conn.set_committer_watermark(
1458            C::NAME,
1459            CommitterWatermark {
1460                checkpoint_hi_inclusive: 15,
1461                ..Default::default()
1462            },
1463        )
1464        .await
1465        .unwrap();
1466        conn.set_committer_watermark(
1467            D::NAME,
1468            CommitterWatermark {
1469                checkpoint_hi_inclusive: 20,
1470                ..Default::default()
1471            },
1472        )
1473        .await
1474        .unwrap();
1475
1476        // Create synthetic ingestion data
1477        let temp_dir = tempfile::tempdir().unwrap();
1478        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1479            ingestion_dir: temp_dir.path().to_owned(),
1480            starting_checkpoint: 0,
1481            num_checkpoints: 30,
1482            checkpoint_size: 1,
1483        })
1484        .await;
1485
1486        let indexer_args = IndexerArgs {
1487            last_checkpoint: Some(29),
1488            ..Default::default()
1489        };
1490
1491        let client_args = ClientArgs {
1492            ingestion: IngestionClientArgs {
1493                local_ingestion_path: Some(temp_dir.path().to_owned()),
1494                ..Default::default()
1495            },
1496            ..Default::default()
1497        };
1498
1499        let ingestion_config = IngestionConfig::default();
1500
1501        let mut indexer = Indexer::new(
1502            store.clone(),
1503            indexer_args,
1504            client_args,
1505            ingestion_config,
1506            None,
1507            &registry,
1508        )
1509        .await
1510        .unwrap();
1511
1512        indexer
1513            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1514            .await
1515            .unwrap();
1516        indexer
1517            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1518            .await
1519            .unwrap();
1520        indexer
1521            .sequential_pipeline::<C>(C, SequentialConfig::default())
1522            .await
1523            .unwrap();
1524        indexer
1525            .sequential_pipeline::<D>(D, SequentialConfig::default())
1526            .await
1527            .unwrap();
1528
1529        let ingestion_metrics = indexer.ingestion_metrics().clone();
1530        let metrics = indexer.indexer_metrics().clone();
1531        indexer.run().await.unwrap().join().await.unwrap();
1532
1533        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1534        assert_eq!(
1535            metrics
1536                .total_watermarks_out_of_order
1537                .get_metric_with_label_values(&[A::NAME])
1538                .unwrap()
1539                .get(),
1540            0
1541        );
1542        assert_eq!(
1543            metrics
1544                .total_watermarks_out_of_order
1545                .get_metric_with_label_values(&[B::NAME])
1546                .unwrap()
1547                .get(),
1548            11
1549        );
1550        assert_eq!(
1551            metrics
1552                .total_watermarks_out_of_order
1553                .get_metric_with_label_values(&[C::NAME])
1554                .unwrap()
1555                .get(),
1556            16
1557        );
1558        assert_eq!(
1559            metrics
1560                .total_watermarks_out_of_order
1561                .get_metric_with_label_values(&[D::NAME])
1562                .unwrap()
1563                .get(),
1564            21
1565        );
1566    }
1567
1568    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1569    #[tokio::test]
1570    async fn test_indexer_ingestion_use_first_checkpoint() {
1571        let registry = Registry::new();
1572        let store = MockStore::default();
1573
1574        test_pipeline!(A, "concurrent_a");
1575        test_pipeline!(B, "concurrent_b");
1576        test_pipeline!(C, "sequential_c");
1577        test_pipeline!(D, "sequential_d");
1578
1579        let mut conn = store.connect().await.unwrap();
1580        conn.set_committer_watermark(
1581            B::NAME,
1582            CommitterWatermark {
1583                checkpoint_hi_inclusive: 10,
1584                ..Default::default()
1585            },
1586        )
1587        .await
1588        .unwrap();
1589        conn.set_committer_watermark(
1590            C::NAME,
1591            CommitterWatermark {
1592                checkpoint_hi_inclusive: 15,
1593                ..Default::default()
1594            },
1595        )
1596        .await
1597        .unwrap();
1598        conn.set_committer_watermark(
1599            D::NAME,
1600            CommitterWatermark {
1601                checkpoint_hi_inclusive: 20,
1602                ..Default::default()
1603            },
1604        )
1605        .await
1606        .unwrap();
1607
1608        // Create synthetic ingestion data
1609        let temp_dir = tempfile::tempdir().unwrap();
1610        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1611            ingestion_dir: temp_dir.path().to_owned(),
1612            starting_checkpoint: 5,
1613            num_checkpoints: 25,
1614            checkpoint_size: 1,
1615        })
1616        .await;
1617
1618        let indexer_args = IndexerArgs {
1619            first_checkpoint: Some(10),
1620            last_checkpoint: Some(29),
1621            ..Default::default()
1622        };
1623
1624        let client_args = ClientArgs {
1625            ingestion: IngestionClientArgs {
1626                local_ingestion_path: Some(temp_dir.path().to_owned()),
1627                ..Default::default()
1628            },
1629            ..Default::default()
1630        };
1631
1632        let ingestion_config = IngestionConfig::default();
1633
1634        let mut indexer = Indexer::new(
1635            store.clone(),
1636            indexer_args,
1637            client_args,
1638            ingestion_config,
1639            None,
1640            &registry,
1641        )
1642        .await
1643        .unwrap();
1644
1645        indexer
1646            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1647            .await
1648            .unwrap();
1649        indexer
1650            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1651            .await
1652            .unwrap();
1653        indexer
1654            .sequential_pipeline::<C>(C, SequentialConfig::default())
1655            .await
1656            .unwrap();
1657        indexer
1658            .sequential_pipeline::<D>(D, SequentialConfig::default())
1659            .await
1660            .unwrap();
1661
1662        let ingestion_metrics = indexer.ingestion_metrics().clone();
1663        let metrics = indexer.indexer_metrics().clone();
1664        indexer.run().await.unwrap().join().await.unwrap();
1665
1666        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1667        assert_eq!(
1668            metrics
1669                .total_watermarks_out_of_order
1670                .get_metric_with_label_values(&[A::NAME])
1671                .unwrap()
1672                .get(),
1673            0
1674        );
1675        assert_eq!(
1676            metrics
1677                .total_watermarks_out_of_order
1678                .get_metric_with_label_values(&[B::NAME])
1679                .unwrap()
1680                .get(),
1681            1
1682        );
1683        assert_eq!(
1684            metrics
1685                .total_watermarks_out_of_order
1686                .get_metric_with_label_values(&[C::NAME])
1687                .unwrap()
1688                .get(),
1689            6
1690        );
1691        assert_eq!(
1692            metrics
1693                .total_watermarks_out_of_order
1694                .get_metric_with_label_values(&[D::NAME])
1695                .unwrap()
1696                .get(),
1697            11
1698        );
1699    }
1700
1701    #[tokio::test]
1702    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1703        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1704        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1705        assert_eq!(committer_watermark, None);
1706        assert_eq!(pruner_watermark, None);
1707    }
1708
1709    #[tokio::test]
1710    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1711        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1712        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1713        assert_eq!(committer_watermark, None);
1714        assert_eq!(pruner_watermark, None);
1715    }
1716
1717    #[tokio::test]
1718    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1719        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1720
1721        let committer_watermark = committer_watermark.unwrap();
1722        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1723
1724        let pruner_watermark = pruner_watermark.unwrap();
1725        assert_eq!(pruner_watermark.reader_lo, 1);
1726        assert_eq!(pruner_watermark.pruner_hi, 1);
1727    }
1728
1729    #[tokio::test]
1730    async fn test_init_watermark_sequential() {
1731        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1732
1733        let committer_watermark = committer_watermark.unwrap();
1734        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1735
1736        let pruner_watermark = pruner_watermark.unwrap();
1737        assert_eq!(pruner_watermark.reader_lo, 1);
1738        assert_eq!(pruner_watermark.pruner_hi, 1);
1739    }
1740
1741    #[tokio::test]
1742    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1743        let registry = Registry::new();
1744        let store = MockStore::default();
1745
1746        // Set up different watermarks for three different sequential pipelines
1747        let mut conn = store.connect().await.unwrap();
1748
1749        // First handler at checkpoint 10
1750        conn.set_committer_watermark(
1751            MockHandler::NAME,
1752            CommitterWatermark {
1753                epoch_hi_inclusive: 0,
1754                checkpoint_hi_inclusive: 10,
1755                tx_hi: 20,
1756                timestamp_ms_hi_inclusive: 10000,
1757            },
1758        )
1759        .await
1760        .unwrap();
1761
1762        // SequentialHandler at checkpoint 5
1763        conn.set_committer_watermark(
1764            SequentialHandler::NAME,
1765            CommitterWatermark {
1766                epoch_hi_inclusive: 0,
1767                checkpoint_hi_inclusive: 5,
1768                tx_hi: 10,
1769                timestamp_ms_hi_inclusive: 5000,
1770            },
1771        )
1772        .await
1773        .unwrap();
1774
1775        // Create synthetic ingestion data
1776        let temp_dir = tempfile::tempdir().unwrap();
1777        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1778            ingestion_dir: temp_dir.path().to_owned(),
1779            starting_checkpoint: 0,
1780            num_checkpoints: 20,
1781            checkpoint_size: 2,
1782        })
1783        .await;
1784
1785        let indexer_args = IndexerArgs {
1786            first_checkpoint: None,
1787            last_checkpoint: Some(19),
1788            pipeline: vec![],
1789            ..Default::default()
1790        };
1791
1792        let client_args = ClientArgs {
1793            ingestion: IngestionClientArgs {
1794                local_ingestion_path: Some(temp_dir.path().to_owned()),
1795                ..Default::default()
1796            },
1797            ..Default::default()
1798        };
1799
1800        let ingestion_config = IngestionConfig::default();
1801
1802        let mut indexer = Indexer::new(
1803            store.clone(),
1804            indexer_args,
1805            client_args,
1806            ingestion_config,
1807            None,
1808            &registry,
1809        )
1810        .await
1811        .unwrap();
1812
1813        // Add first sequential pipeline
1814        indexer
1815            .sequential_pipeline(
1816                MockHandler,
1817                pipeline::sequential::SequentialConfig::default(),
1818            )
1819            .await
1820            .unwrap();
1821
1822        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1823        assert_eq!(
1824            indexer.next_sequential_checkpoint(),
1825            Some(11),
1826            "next_sequential_checkpoint should be 11"
1827        );
1828
1829        // Add second sequential pipeline
1830        indexer
1831            .sequential_pipeline(
1832                SequentialHandler,
1833                pipeline::sequential::SequentialConfig::default(),
1834            )
1835            .await
1836            .unwrap();
1837
1838        // Should change to 6 (minimum of 6 and 11)
1839        assert_eq!(
1840            indexer.next_sequential_checkpoint(),
1841            Some(6),
1842            "next_sequential_checkpoint should still be 6"
1843        );
1844
1845        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1846        indexer.run().await.unwrap().join().await.unwrap();
1847
1848        // Verify each pipeline made some progress independently
1849        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1850        let watermark2 = conn
1851            .committer_watermark(SequentialHandler::NAME)
1852            .await
1853            .unwrap();
1854
1855        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1856        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1857    }
1858
1859    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1860    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1861    /// committing checkpoints less than the main pipeline's reader watermark.
1862    #[tokio::test]
1863    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1864        let registry = Registry::new();
1865        let store = MockStore::default();
1866
1867        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1868        // reader watermark at `7`.
1869        let mut conn = store.connect().await.unwrap();
1870        conn.set_committer_watermark(
1871            MockCheckpointSequenceNumberHandler::NAME,
1872            CommitterWatermark {
1873                checkpoint_hi_inclusive: 10,
1874                ..Default::default()
1875            },
1876        )
1877        .await
1878        .unwrap();
1879        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1880            .await
1881            .unwrap();
1882
1883        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1884        // be ignored by the tasked indexer.
1885        let indexer_args = IndexerArgs {
1886            first_checkpoint: Some(0),
1887            last_checkpoint: Some(15),
1888            pipeline: vec![],
1889            task: TaskArgs::tasked("task".to_string(), 10),
1890        };
1891        let temp_dir = tempfile::tempdir().unwrap();
1892        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1893            ingestion_dir: temp_dir.path().to_owned(),
1894            starting_checkpoint: 0,
1895            num_checkpoints: 16,
1896            checkpoint_size: 2,
1897        })
1898        .await;
1899
1900        let client_args = ClientArgs {
1901            ingestion: IngestionClientArgs {
1902                local_ingestion_path: Some(temp_dir.path().to_owned()),
1903                ..Default::default()
1904            },
1905            ..Default::default()
1906        };
1907
1908        let ingestion_config = IngestionConfig::default();
1909
1910        let mut tasked_indexer = Indexer::new(
1911            store.clone(),
1912            indexer_args,
1913            client_args,
1914            ingestion_config,
1915            None,
1916            &registry,
1917        )
1918        .await
1919        .unwrap();
1920
1921        let _ = tasked_indexer
1922            .concurrent_pipeline(
1923                MockCheckpointSequenceNumberHandler,
1924                ConcurrentConfig::default(),
1925            )
1926            .await;
1927
1928        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1929        let metrics = tasked_indexer.indexer_metrics().clone();
1930
1931        tasked_indexer.run().await.unwrap().join().await.unwrap();
1932
1933        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1934        assert_eq!(
1935            metrics
1936                .total_collector_skipped_checkpoints
1937                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1938                .unwrap()
1939                .get(),
1940            7
1941        );
1942        let data = store
1943            .data
1944            .get(MockCheckpointSequenceNumberHandler::NAME)
1945            .unwrap();
1946        assert_eq!(data.len(), 9);
1947        for i in 0..7 {
1948            assert!(data.get(&i).is_none());
1949        }
1950        for i in 7..16 {
1951            assert!(data.get(&i).is_some());
1952        }
1953    }
1954
1955    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1956    #[tokio::test]
1957    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1958        let registry = Registry::new();
1959        let store = MockStore::default();
1960
1961        let mut conn = store.connect().await.unwrap();
1962        conn.set_committer_watermark(
1963            "test",
1964            CommitterWatermark {
1965                checkpoint_hi_inclusive: 10,
1966                ..Default::default()
1967            },
1968        )
1969        .await
1970        .unwrap();
1971        conn.set_reader_watermark("test", 5).await.unwrap();
1972
1973        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1974        // watermarks.
1975        let indexer_args = IndexerArgs {
1976            first_checkpoint: Some(9),
1977            last_checkpoint: Some(25),
1978            pipeline: vec![],
1979            task: TaskArgs::tasked("task".to_string(), 10),
1980        };
1981        let temp_dir = tempfile::tempdir().unwrap();
1982        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1983            ingestion_dir: temp_dir.path().to_owned(),
1984            starting_checkpoint: 9,
1985            num_checkpoints: 17,
1986            checkpoint_size: 2,
1987        })
1988        .await;
1989
1990        let client_args = ClientArgs {
1991            ingestion: IngestionClientArgs {
1992                local_ingestion_path: Some(temp_dir.path().to_owned()),
1993                ..Default::default()
1994            },
1995            ..Default::default()
1996        };
1997
1998        let ingestion_config = IngestionConfig::default();
1999
2000        let mut tasked_indexer = Indexer::new(
2001            store.clone(),
2002            indexer_args,
2003            client_args,
2004            ingestion_config,
2005            None,
2006            &registry,
2007        )
2008        .await
2009        .unwrap();
2010
2011        let _ = tasked_indexer
2012            .concurrent_pipeline(
2013                MockCheckpointSequenceNumberHandler,
2014                ConcurrentConfig::default(),
2015            )
2016            .await;
2017
2018        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2019        let metrics = tasked_indexer.indexer_metrics().clone();
2020
2021        tasked_indexer.run().await.unwrap().join().await.unwrap();
2022
2023        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2024        assert_eq!(
2025            metrics
2026                .total_watermarks_out_of_order
2027                .get_metric_with_label_values(&["test"])
2028                .unwrap()
2029                .get(),
2030            0
2031        );
2032        assert_eq!(
2033            metrics
2034                .total_collector_skipped_checkpoints
2035                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2036                .unwrap()
2037                .get(),
2038            0
2039        );
2040
2041        let data = store.data.get("test").unwrap();
2042        assert!(data.len() == 17);
2043        for i in 0..9 {
2044            assert!(data.get(&i).is_none());
2045        }
2046        for i in 9..26 {
2047            assert!(data.get(&i).is_some());
2048        }
2049        let main_pipeline_watermark = store.watermark("test").unwrap();
2050        // assert that the main pipeline's watermarks are not updated
2051        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2052        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2053        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2054        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2055        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2056    }
2057
2058    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2059    /// committed, and any checkpoints inflight < X will be skipped.
2060    #[tokio::test]
2061    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2062        let registry = Registry::new();
2063        let store = MockStore::default();
2064        let mut conn = store.connect().await.unwrap();
2065        // Set the main pipeline watermark.
2066        conn.set_committer_watermark(
2067            ControllableHandler::NAME,
2068            CommitterWatermark {
2069                checkpoint_hi_inclusive: 11,
2070                ..Default::default()
2071            },
2072        )
2073        .await
2074        .unwrap();
2075
2076        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2077        let temp_dir = tempfile::tempdir().unwrap();
2078        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2079            ingestion_dir: temp_dir.path().to_owned(),
2080            starting_checkpoint: 0,
2081            num_checkpoints: 501,
2082            checkpoint_size: 2,
2083        })
2084        .await;
2085        let indexer_args = IndexerArgs {
2086            first_checkpoint: Some(0),
2087            last_checkpoint: Some(500),
2088            pipeline: vec![],
2089            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2090        };
2091        let client_args = ClientArgs {
2092            ingestion: IngestionClientArgs {
2093                local_ingestion_path: Some(temp_dir.path().to_owned()),
2094                ..Default::default()
2095            },
2096            ..Default::default()
2097        };
2098        let ingestion_config = IngestionConfig::default();
2099        let mut tasked_indexer = Indexer::new(
2100            store.clone(),
2101            indexer_args,
2102            client_args,
2103            ingestion_config,
2104            None,
2105            &registry,
2106        )
2107        .await
2108        .unwrap();
2109        let mut allow_process = 10;
2110        // Limit the pipeline to process only checkpoints `[0, 10]`.
2111        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2112        let _ = tasked_indexer
2113            .concurrent_pipeline(
2114                controllable_handler,
2115                ConcurrentConfig {
2116                    committer: CommitterConfig {
2117                        collect_interval_ms: 10,
2118                        watermark_interval_ms: 10,
2119                        ..Default::default()
2120                    },
2121                    ..Default::default()
2122                },
2123            )
2124            .await;
2125        let metrics = tasked_indexer.indexer_metrics().clone();
2126
2127        let mut s_indexer = tasked_indexer.run().await.unwrap();
2128
2129        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2130        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2131        // committed.
2132        store
2133            .wait_for_watermark(
2134                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2135                10,
2136                std::time::Duration::from_secs(10),
2137            )
2138            .await;
2139
2140        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2141        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2142        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2143        // one at a time until the collector_reader_lo metric shows the new value.
2144        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2145            .await
2146            .unwrap();
2147
2148        let reader_lo = metrics
2149            .collector_reader_lo
2150            .with_label_values(&[ControllableHandler::NAME]);
2151
2152        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2153        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2154        // checkpoints have been processed.
2155        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2156        while reader_lo.get() != 250 {
2157            interval.tick().await;
2158            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2159            allow_process += 1;
2160            assert!(
2161                allow_process <= 500,
2162                "Released all checkpoints but collector never observed new reader_lo"
2163            );
2164            process_below.send(allow_process).ok();
2165        }
2166
2167        // At this point, the collector has observed reader_lo = 250. Release all remaining
2168        // checkpoints. Guarantees:
2169        // - [0, 10]: committed (before reader_lo was set)
2170        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2171        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2172        // - [250, 500]: committed (>= reader_lo)
2173        process_below.send(500).ok();
2174
2175        s_indexer.join().await.unwrap();
2176
2177        let data = store.data.get(ControllableHandler::NAME).unwrap();
2178
2179        // Checkpoints (allow_process, 250) must be skipped.
2180        for chkpt in (allow_process + 1)..250 {
2181            assert!(
2182                data.get(&chkpt).is_none(),
2183                "Checkpoint {chkpt} should have been skipped"
2184            );
2185        }
2186
2187        // Checkpoints >= reader_lo must be committed.
2188        for chkpt in 250..=500 {
2189            assert!(
2190                data.get(&chkpt).is_some(),
2191                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2192            );
2193        }
2194
2195        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2196        for chkpt in 0..=10 {
2197            assert!(
2198                data.get(&chkpt).is_some(),
2199                "Checkpoint {chkpt} should have been committed (baseline)"
2200            );
2201        }
2202    }
2203}