sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::InitWatermark;
19use sui_indexer_alt_framework_store_traits::Store;
20use sui_indexer_alt_framework_store_traits::TransactionalStore;
21use sui_indexer_alt_framework_store_traits::pipeline_task;
22use tracing::info;
23
24use crate::metrics::IngestionMetrics;
25use crate::pipeline::Processor;
26use crate::pipeline::concurrent::ConcurrentConfig;
27use crate::pipeline::concurrent::{self};
28use crate::pipeline::sequential::Handler;
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36/// External users access the store trait through framework::store
37pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52/// Command-line arguments for the indexer
53#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55    /// Override the next checkpoint for all pipelines without a committer watermark to start
56    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
57    /// setting and always resume from their committer watermark + 1.
58    ///
59    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
60    /// is the minimum across all pipelines' next checkpoints.
61    #[arg(long)]
62    pub first_checkpoint: Option<u64>,
63
64    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
65    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
66    #[arg(long)]
67    pub last_checkpoint: Option<u64>,
68
69    /// Only run the following pipelines. If not provided, all pipelines found in the
70    /// configuration file will be run.
71    #[arg(long, action = clap::ArgAction::Append)]
72    pub pipeline: Vec<String>,
73
74    /// Additional configurations for running a tasked indexer.
75    #[clap(flatten)]
76    pub task: TaskArgs,
77}
78
79/// Command-line arguments for configuring a tasked indexer.
80#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
83    /// delimiter defined on the store. This allows the same pipelines to run under multiple
84    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
85    /// entries in the database.
86    ///
87    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
88    ///
89    /// Sequential pipelines cannot be attached to a tasked indexer.
90    ///
91    /// The framework ensures that tasked pipelines never commit checkpoints below the main
92    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
93    #[arg(long, requires = "reader_interval_ms")]
94    task: Option<String>,
95
96    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
97    /// refetch its main pipeline's reader watermark.
98    ///
99    /// This is required when `--task` is set and should should ideally be set to a value that is
100    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
101    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
102    /// it.
103    ///
104    /// If the main pipeline does not have pruning enabled, this value can be set to some high
105    /// value, as the tasked pipeline will never see an updated reader watermark.
106    #[arg(long, requires = "task")]
107    reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111    /// The storage backend that the indexer uses to write and query indexed data. This
112    /// generic implementation allows for plugging in different storage solutions that implement the
113    /// `Store` trait.
114    store: S,
115
116    /// Prometheus Metrics.
117    metrics: Arc<IndexerMetrics>,
118
119    /// Service for downloading and disseminating checkpoint data.
120    ingestion_service: IngestionService,
121
122    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
123    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
124    /// always resume from their committer watermark + 1.
125    ///
126    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
127    /// is the minimum across all pipelines' next checkpoints.
128    default_next_checkpoint: u64,
129
130    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
131    /// this checkpoint.
132    last_checkpoint: Option<u64>,
133
134    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
135    /// delimiter defined on the store. This allows the same pipelines to run under multiple
136    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
137    /// entries in the database.
138    ///
139    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
140    ///
141    /// Sequential pipelines cannot be attached to a tasked indexer.
142    ///
143    /// The framework ensures that tasked pipelines never commit checkpoints below the main
144    /// pipeline’s pruner watermark.
145    task: Option<Task>,
146
147    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
148    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
149    /// a warning when the indexer is run.
150    enabled_pipelines: Option<BTreeSet<String>>,
151
152    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
153    /// with the same name isn't added twice.
154    added_pipelines: BTreeSet<&'static str>,
155
156    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
157    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
158    /// unless overridden by [Self::default_next_checkpoint].
159    first_ingestion_checkpoint: u64,
160
161    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
162    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
163    next_sequential_checkpoint: Option<u64>,
164
165    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
166    pipelines: Vec<Service>,
167}
168
169/// Configuration for a tasked indexer.
170#[derive(Clone)]
171pub(crate) struct Task {
172    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
173    /// record pipeline watermarks.
174    task: String,
175    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
176    /// pipeline's reader watermark.
177    reader_interval: Duration,
178}
179
180impl TaskArgs {
181    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
182        Self {
183            task: Some(task),
184            reader_interval_ms: Some(reader_interval_ms),
185        }
186    }
187
188    fn into_task(self) -> Option<Task> {
189        Some(Task {
190            task: self.task?,
191            reader_interval: Duration::from_millis(self.reader_interval_ms?),
192        })
193    }
194}
195
196impl<S: Store> Indexer<S> {
197    /// Create a new instance of the indexer framework from a store that implements the `Store`
198    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
199    /// arguments configure the following:
200    ///
201    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
202    ///   watermarks) and where to serve metrics from,
203    /// - Where to download checkpoints from,
204    /// - Concurrency and buffering parameters for downloading checkpoints.
205    ///
206    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
207    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
208    pub async fn new(
209        store: S,
210        indexer_args: IndexerArgs,
211        client_args: ClientArgs,
212        ingestion_config: IngestionConfig,
213        metrics_prefix: Option<&str>,
214        registry: &Registry,
215    ) -> Result<Self> {
216        let IndexerArgs {
217            first_checkpoint,
218            last_checkpoint,
219            pipeline,
220            task,
221        } = indexer_args;
222
223        let metrics = IndexerMetrics::new(metrics_prefix, registry);
224
225        let ingestion_service =
226            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
227
228        Ok(Self {
229            store,
230            metrics,
231            ingestion_service,
232            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
233            last_checkpoint,
234            task: task.into_task(),
235            enabled_pipelines: if pipeline.is_empty() {
236                None
237            } else {
238                Some(pipeline.into_iter().collect())
239            },
240            added_pipelines: BTreeSet::new(),
241            first_ingestion_checkpoint: u64::MAX,
242            next_sequential_checkpoint: None,
243            pipelines: vec![],
244        })
245    }
246
247    /// The store used by the indexer.
248    pub fn store(&self) -> &S {
249        &self.store
250    }
251
252    /// The ingestion client used by the indexer to fetch checkpoints.
253    pub fn ingestion_client(&self) -> &IngestionClient {
254        self.ingestion_service.ingestion_client()
255    }
256
257    /// The indexer's metrics.
258    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
259        &self.metrics
260    }
261
262    /// The ingestion service's metrics.
263    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
264        self.ingestion_service.metrics()
265    }
266
267    /// The pipelines that this indexer will run.
268    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
269        self.added_pipelines.iter().copied().filter(|p| {
270            self.enabled_pipelines
271                .as_ref()
272                .is_none_or(|e| e.contains(*p))
273        })
274    }
275
276    /// The minimum next checkpoint across all sequential pipelines. This value is used to
277    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
278    /// too far ahead of sequential pipelines.
279    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
280        self.next_sequential_checkpoint
281    }
282
283    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
284    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
285    ///
286    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
287    /// keep the watermark table up-to-date with the highest point they can guarantee all data
288    /// exists for, for their pipeline.
289    pub async fn concurrent_pipeline<H>(
290        &mut self,
291        handler: H,
292        config: ConcurrentConfig,
293    ) -> Result<()>
294    where
295        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
296    {
297        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
298            return Ok(());
299        };
300
301        self.pipelines.push(concurrent::pipeline::<H>(
302            handler,
303            next_checkpoint,
304            config,
305            self.store.clone(),
306            self.task.clone(),
307            self.ingestion_service.subscribe().0,
308            self.metrics.clone(),
309        ));
310
311        Ok(())
312    }
313
314    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
315    /// will start processing and committing once the ingestion service has caught up to their
316    /// respective watermarks.
317    ///
318    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
319    pub async fn run(self) -> Result<Service> {
320        if let Some(enabled_pipelines) = self.enabled_pipelines {
321            ensure!(
322                enabled_pipelines.is_empty(),
323                "Tried to enable pipelines that this indexer does not know about: \
324                {enabled_pipelines:#?}",
325            );
326        }
327
328        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
329
330        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
331
332        let mut service = self
333            .ingestion_service
334            .run(
335                self.first_ingestion_checkpoint..=last_checkpoint,
336                self.next_sequential_checkpoint,
337            )
338            .await
339            .context("Failed to start ingestion service")?;
340
341        for pipeline in self.pipelines {
342            service = service.merge(pipeline);
343        }
344
345        Ok(service)
346    }
347
348    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
349    /// checkpoint after its watermark, or if that doesn't exist, then the provided
350    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
351    ///
352    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
353    /// calculated above.
354    ///
355    /// Returns `Ok(None)` if the pipeline is disabled.
356    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
357        ensure!(
358            self.added_pipelines.insert(P::NAME),
359            "Pipeline {:?} already added",
360            P::NAME,
361        );
362
363        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
364            && !enabled_pipelines.remove(P::NAME)
365        {
366            info!(pipeline = P::NAME, "Skipping");
367            return Ok(None);
368        }
369
370        let mut conn = self
371            .store
372            .connect()
373            .await
374            .context("Failed to establish connection to store")?;
375
376        let pipeline_task =
377            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
378
379        let InitWatermark {
380            checkpoint_hi_inclusive,
381            reader_lo,
382        } = conn
383            .init_watermark(
384                &pipeline_task,
385                InitWatermark {
386                    checkpoint_hi_inclusive: self.default_next_checkpoint.checked_sub(1),
387                    reader_lo: self.default_next_checkpoint,
388                },
389            )
390            .await
391            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
392
393        let next_checkpoint = checkpoint_hi_inclusive.map_or(reader_lo, |c| c + 1);
394
395        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
396
397        Ok(Some(next_checkpoint))
398    }
399}
400
401impl<T: TransactionalStore> Indexer<T> {
402    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
403    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
404    ///
405    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
406    /// required to handle pipelines that modify data in-place (where each update is not an insert,
407    /// but could be a modification of an existing row, where ordering between updates is
408    /// important).
409    ///
410    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
411    /// number of checkpoints (configured by `checkpoint_lag`).
412    pub async fn sequential_pipeline<H>(
413        &mut self,
414        handler: H,
415        config: SequentialConfig,
416    ) -> Result<()>
417    where
418        H: Handler<Store = T> + Send + Sync + 'static,
419    {
420        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
421            return Ok(());
422        };
423
424        if self.task.is_some() {
425            bail!(
426                "Sequential pipelines do not support pipeline tasks. \
427                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
428                Running the same pipeline under a different task would violate these guarantees."
429            );
430        }
431
432        // Track the minimum next_checkpoint across all sequential pipelines
433        self.next_sequential_checkpoint = Some(
434            self.next_sequential_checkpoint
435                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
436        );
437
438        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
439
440        self.pipelines.push(sequential::pipeline::<H>(
441            handler,
442            next_checkpoint,
443            config,
444            self.store.clone(),
445            checkpoint_rx,
446            commit_hi_tx,
447            self.metrics.clone(),
448        ));
449
450        Ok(())
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use std::sync::Arc;
457
458    use async_trait::async_trait;
459    use clap::Parser;
460    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
461    use sui_synthetic_ingestion::synthetic_ingestion;
462    use tokio::sync::watch;
463
464    use crate::FieldCount;
465    use crate::config::ConcurrencyConfig;
466    use crate::ingestion::ingestion_client::IngestionClientArgs;
467    use crate::mocks::store::MockStore;
468    use crate::pipeline::CommitterConfig;
469    use crate::pipeline::Processor;
470    use crate::pipeline::concurrent::ConcurrentConfig;
471    use crate::store::CommitterWatermark;
472
473    use super::*;
474
475    #[allow(dead_code)]
476    #[derive(Clone, FieldCount)]
477    struct MockValue(u64);
478
479    /// A handler that can be controlled externally to block checkpoint processing.
480    struct ControllableHandler {
481        /// Process checkpoints less than or equal to this value.
482        process_below: watch::Receiver<u64>,
483    }
484
485    impl ControllableHandler {
486        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
487            let (tx, rx) = watch::channel(limit);
488            (Self { process_below: rx }, tx)
489        }
490    }
491
492    #[async_trait]
493    impl Processor for ControllableHandler {
494        const NAME: &'static str = "controllable";
495        type Value = MockValue;
496
497        async fn process(
498            &self,
499            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
500        ) -> anyhow::Result<Vec<Self::Value>> {
501            let cp_num = checkpoint.summary.sequence_number;
502
503            // Wait until the checkpoint is allowed to be processed
504            self.process_below
505                .clone()
506                .wait_for(|&limit| cp_num <= limit)
507                .await
508                .ok();
509
510            Ok(vec![MockValue(cp_num)])
511        }
512    }
513
514    #[async_trait]
515    impl crate::pipeline::concurrent::Handler for ControllableHandler {
516        type Store = MockStore;
517        type Batch = Vec<MockValue>;
518
519        fn batch(
520            &self,
521            batch: &mut Self::Batch,
522            values: &mut std::vec::IntoIter<Self::Value>,
523        ) -> crate::pipeline::concurrent::BatchStatus {
524            batch.extend(values);
525            crate::pipeline::concurrent::BatchStatus::Ready
526        }
527
528        async fn commit<'a>(
529            &self,
530            batch: &Self::Batch,
531            conn: &mut <Self::Store as Store>::Connection<'a>,
532        ) -> anyhow::Result<usize> {
533            for value in batch {
534                conn.0
535                    .commit_data(Self::NAME, value.0, vec![value.0])
536                    .await?;
537            }
538            Ok(batch.len())
539        }
540    }
541
542    macro_rules! test_pipeline {
543        ($handler:ident, $name:literal) => {
544            struct $handler;
545
546            #[async_trait]
547            impl Processor for $handler {
548                const NAME: &'static str = $name;
549                type Value = MockValue;
550                async fn process(
551                    &self,
552                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
553                ) -> anyhow::Result<Vec<Self::Value>> {
554                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
555                }
556            }
557
558            #[async_trait]
559            impl crate::pipeline::concurrent::Handler for $handler {
560                type Store = MockStore;
561                type Batch = Vec<Self::Value>;
562
563                fn batch(
564                    &self,
565                    batch: &mut Self::Batch,
566                    values: &mut std::vec::IntoIter<Self::Value>,
567                ) -> crate::pipeline::concurrent::BatchStatus {
568                    batch.extend(values);
569                    crate::pipeline::concurrent::BatchStatus::Pending
570                }
571
572                async fn commit<'a>(
573                    &self,
574                    batch: &Self::Batch,
575                    conn: &mut <Self::Store as Store>::Connection<'a>,
576                ) -> anyhow::Result<usize> {
577                    for value in batch {
578                        conn.0
579                            .commit_data(Self::NAME, value.0, vec![value.0])
580                            .await?;
581                    }
582                    Ok(batch.len())
583                }
584            }
585
586            #[async_trait]
587            impl crate::pipeline::sequential::Handler for $handler {
588                type Store = MockStore;
589                type Batch = Vec<Self::Value>;
590
591                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
592                    batch.extend(values);
593                }
594
595                async fn commit<'a>(
596                    &self,
597                    _batch: &Self::Batch,
598                    _conn: &mut <Self::Store as Store>::Connection<'a>,
599                ) -> anyhow::Result<usize> {
600                    Ok(1)
601                }
602            }
603        };
604    }
605
606    test_pipeline!(MockHandler, "test_processor");
607    test_pipeline!(SequentialHandler, "sequential_handler");
608    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
609
610    async fn test_init_watermark(
611        first_checkpoint: Option<u64>,
612        is_concurrent: bool,
613    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
614        let registry = Registry::new();
615        let store = MockStore::default();
616
617        test_pipeline!(A, "pipeline_name");
618
619        let mut conn = store.connect().await.unwrap();
620
621        let indexer_args = IndexerArgs {
622            first_checkpoint,
623            ..IndexerArgs::default()
624        };
625        let temp_dir = tempfile::tempdir().unwrap();
626        let client_args = ClientArgs {
627            ingestion: IngestionClientArgs {
628                local_ingestion_path: Some(temp_dir.path().to_owned()),
629                ..Default::default()
630            },
631            ..Default::default()
632        };
633        let ingestion_config = IngestionConfig::default();
634
635        let mut indexer = Indexer::new(
636            store.clone(),
637            indexer_args,
638            client_args,
639            ingestion_config,
640            None,
641            &registry,
642        )
643        .await
644        .unwrap();
645
646        if is_concurrent {
647            indexer
648                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
649                .await
650                .unwrap();
651        } else {
652            indexer
653                .sequential_pipeline::<A>(A, SequentialConfig::default())
654                .await
655                .unwrap();
656        }
657
658        (
659            conn.committer_watermark(A::NAME).await.unwrap(),
660            conn.pruner_watermark(A::NAME, Duration::ZERO)
661                .await
662                .unwrap(),
663        )
664    }
665
666    #[test]
667    fn test_arg_parsing() {
668        #[derive(Parser)]
669        struct Args {
670            #[clap(flatten)]
671            indexer: IndexerArgs,
672        }
673
674        let args = Args::try_parse_from([
675            "cmd",
676            "--first-checkpoint",
677            "10",
678            "--last-checkpoint",
679            "100",
680            "--pipeline",
681            "a",
682            "--pipeline",
683            "b",
684            "--task",
685            "t",
686            "--reader-interval-ms",
687            "5000",
688        ])
689        .unwrap();
690
691        assert_eq!(args.indexer.first_checkpoint, Some(10));
692        assert_eq!(args.indexer.last_checkpoint, Some(100));
693        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
694        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
695        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
696    }
697
698    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
699    #[tokio::test]
700    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
701        let registry = Registry::new();
702        let store = MockStore::default();
703
704        test_pipeline!(A, "concurrent_a");
705        test_pipeline!(B, "concurrent_b");
706        test_pipeline!(C, "sequential_c");
707        test_pipeline!(D, "sequential_d");
708
709        let mut conn = store.connect().await.unwrap();
710
711        conn.init_watermark(A::NAME, InitWatermark::default())
712            .await
713            .unwrap();
714        conn.set_committer_watermark(
715            A::NAME,
716            CommitterWatermark {
717                checkpoint_hi_inclusive: 100,
718                ..Default::default()
719            },
720        )
721        .await
722        .unwrap();
723
724        conn.init_watermark(B::NAME, InitWatermark::default())
725            .await
726            .unwrap();
727        conn.set_committer_watermark(
728            B::NAME,
729            CommitterWatermark {
730                checkpoint_hi_inclusive: 10,
731                ..Default::default()
732            },
733        )
734        .await
735        .unwrap();
736
737        conn.init_watermark(C::NAME, InitWatermark::default())
738            .await
739            .unwrap();
740        conn.set_committer_watermark(
741            C::NAME,
742            CommitterWatermark {
743                checkpoint_hi_inclusive: 1,
744                ..Default::default()
745            },
746        )
747        .await
748        .unwrap();
749
750        conn.init_watermark(D::NAME, InitWatermark::default())
751            .await
752            .unwrap();
753        conn.set_committer_watermark(
754            D::NAME,
755            CommitterWatermark {
756                checkpoint_hi_inclusive: 50,
757                ..Default::default()
758            },
759        )
760        .await
761        .unwrap();
762
763        let indexer_args = IndexerArgs::default();
764        let temp_dir = tempfile::tempdir().unwrap();
765        let client_args = ClientArgs {
766            ingestion: IngestionClientArgs {
767                local_ingestion_path: Some(temp_dir.path().to_owned()),
768                ..Default::default()
769            },
770            ..Default::default()
771        };
772        let ingestion_config = IngestionConfig::default();
773
774        let mut indexer = Indexer::new(
775            store,
776            indexer_args,
777            client_args,
778            ingestion_config,
779            None,
780            &registry,
781        )
782        .await
783        .unwrap();
784
785        indexer
786            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
787            .await
788            .unwrap();
789        indexer
790            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
791            .await
792            .unwrap();
793        indexer
794            .sequential_pipeline::<C>(C, SequentialConfig::default())
795            .await
796            .unwrap();
797        indexer
798            .sequential_pipeline::<D>(D, SequentialConfig::default())
799            .await
800            .unwrap();
801
802        assert_eq!(indexer.first_ingestion_checkpoint, 2);
803    }
804
805    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
806    #[tokio::test]
807    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
808        let registry = Registry::new();
809        let store = MockStore::default();
810
811        test_pipeline!(A, "concurrent_a");
812        test_pipeline!(B, "concurrent_b");
813        test_pipeline!(C, "sequential_c");
814        test_pipeline!(D, "sequential_d");
815
816        let mut conn = store.connect().await.unwrap();
817        conn.set_committer_watermark(
818            B::NAME,
819            CommitterWatermark {
820                checkpoint_hi_inclusive: 10,
821                ..Default::default()
822            },
823        )
824        .await
825        .unwrap();
826        conn.set_committer_watermark(
827            C::NAME,
828            CommitterWatermark {
829                checkpoint_hi_inclusive: 1,
830                ..Default::default()
831            },
832        )
833        .await
834        .unwrap();
835
836        let indexer_args = IndexerArgs::default();
837        let temp_dir = tempfile::tempdir().unwrap();
838        let client_args = ClientArgs {
839            ingestion: IngestionClientArgs {
840                local_ingestion_path: Some(temp_dir.path().to_owned()),
841                ..Default::default()
842            },
843            ..Default::default()
844        };
845        let ingestion_config = IngestionConfig::default();
846
847        let mut indexer = Indexer::new(
848            store,
849            indexer_args,
850            client_args,
851            ingestion_config,
852            None,
853            &registry,
854        )
855        .await
856        .unwrap();
857
858        indexer
859            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
860            .await
861            .unwrap();
862        indexer
863            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
864            .await
865            .unwrap();
866        indexer
867            .sequential_pipeline::<C>(C, SequentialConfig::default())
868            .await
869            .unwrap();
870        indexer
871            .sequential_pipeline::<D>(D, SequentialConfig::default())
872            .await
873            .unwrap();
874
875        assert_eq!(indexer.first_ingestion_checkpoint, 0);
876    }
877
878    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
879    #[tokio::test]
880    async fn test_first_ingestion_checkpoint_smallest_is_0() {
881        let registry = Registry::new();
882        let store = MockStore::default();
883
884        test_pipeline!(A, "concurrent_a");
885        test_pipeline!(B, "concurrent_b");
886        test_pipeline!(C, "sequential_c");
887        test_pipeline!(D, "sequential_d");
888
889        let mut conn = store.connect().await.unwrap();
890        conn.set_committer_watermark(
891            A::NAME,
892            CommitterWatermark {
893                checkpoint_hi_inclusive: 100,
894                ..Default::default()
895            },
896        )
897        .await
898        .unwrap();
899        conn.set_committer_watermark(
900            B::NAME,
901            CommitterWatermark {
902                checkpoint_hi_inclusive: 10,
903                ..Default::default()
904            },
905        )
906        .await
907        .unwrap();
908        conn.set_committer_watermark(
909            C::NAME,
910            CommitterWatermark {
911                checkpoint_hi_inclusive: 1,
912                ..Default::default()
913            },
914        )
915        .await
916        .unwrap();
917        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
918            .await
919            .unwrap();
920
921        let indexer_args = IndexerArgs::default();
922        let temp_dir = tempfile::tempdir().unwrap();
923        let client_args = ClientArgs {
924            ingestion: IngestionClientArgs {
925                local_ingestion_path: Some(temp_dir.path().to_owned()),
926                ..Default::default()
927            },
928            ..Default::default()
929        };
930        let ingestion_config = IngestionConfig::default();
931
932        let mut indexer = Indexer::new(
933            store,
934            indexer_args,
935            client_args,
936            ingestion_config,
937            None,
938            &registry,
939        )
940        .await
941        .unwrap();
942
943        indexer
944            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
945            .await
946            .unwrap();
947        indexer
948            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
949            .await
950            .unwrap();
951        indexer
952            .sequential_pipeline::<C>(C, SequentialConfig::default())
953            .await
954            .unwrap();
955        indexer
956            .sequential_pipeline::<D>(D, SequentialConfig::default())
957            .await
958            .unwrap();
959
960        assert_eq!(indexer.first_ingestion_checkpoint, 1);
961    }
962
963    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
964    /// watermark, and first_checkpoint is smallest.
965    #[tokio::test]
966    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
967        let registry = Registry::new();
968        let store = MockStore::default();
969
970        test_pipeline!(A, "concurrent_a");
971        test_pipeline!(B, "concurrent_b");
972        test_pipeline!(C, "sequential_c");
973        test_pipeline!(D, "sequential_d");
974
975        let mut conn = store.connect().await.unwrap();
976        conn.set_committer_watermark(
977            B::NAME,
978            CommitterWatermark {
979                checkpoint_hi_inclusive: 50,
980                ..Default::default()
981            },
982        )
983        .await
984        .unwrap();
985        conn.set_committer_watermark(
986            C::NAME,
987            CommitterWatermark {
988                checkpoint_hi_inclusive: 10,
989                ..Default::default()
990            },
991        )
992        .await
993        .unwrap();
994
995        let indexer_args = IndexerArgs {
996            first_checkpoint: Some(5),
997            ..Default::default()
998        };
999        let temp_dir = tempfile::tempdir().unwrap();
1000        let client_args = ClientArgs {
1001            ingestion: IngestionClientArgs {
1002                local_ingestion_path: Some(temp_dir.path().to_owned()),
1003                ..Default::default()
1004            },
1005            ..Default::default()
1006        };
1007        let ingestion_config = IngestionConfig::default();
1008
1009        let mut indexer = Indexer::new(
1010            store,
1011            indexer_args,
1012            client_args,
1013            ingestion_config,
1014            None,
1015            &registry,
1016        )
1017        .await
1018        .unwrap();
1019
1020        indexer
1021            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1022            .await
1023            .unwrap();
1024        indexer
1025            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1026            .await
1027            .unwrap();
1028        indexer
1029            .sequential_pipeline::<C>(C, SequentialConfig::default())
1030            .await
1031            .unwrap();
1032        indexer
1033            .sequential_pipeline::<D>(D, SequentialConfig::default())
1034            .await
1035            .unwrap();
1036
1037        assert_eq!(indexer.first_ingestion_checkpoint, 5);
1038    }
1039
1040    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1041    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1042    #[tokio::test]
1043    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1044        let registry = Registry::new();
1045        let store = MockStore::default();
1046
1047        test_pipeline!(B, "concurrent_b");
1048        test_pipeline!(C, "sequential_c");
1049
1050        let mut conn = store.connect().await.unwrap();
1051        conn.set_committer_watermark(
1052            B::NAME,
1053            CommitterWatermark {
1054                checkpoint_hi_inclusive: 50,
1055                ..Default::default()
1056            },
1057        )
1058        .await
1059        .unwrap();
1060        conn.set_committer_watermark(
1061            C::NAME,
1062            CommitterWatermark {
1063                checkpoint_hi_inclusive: 10,
1064                ..Default::default()
1065            },
1066        )
1067        .await
1068        .unwrap();
1069
1070        let indexer_args = IndexerArgs {
1071            first_checkpoint: Some(5),
1072            ..Default::default()
1073        };
1074        let temp_dir = tempfile::tempdir().unwrap();
1075        let client_args = ClientArgs {
1076            ingestion: IngestionClientArgs {
1077                local_ingestion_path: Some(temp_dir.path().to_owned()),
1078                ..Default::default()
1079            },
1080            ..Default::default()
1081        };
1082        let ingestion_config = IngestionConfig::default();
1083
1084        let mut indexer = Indexer::new(
1085            store,
1086            indexer_args,
1087            client_args,
1088            ingestion_config,
1089            None,
1090            &registry,
1091        )
1092        .await
1093        .unwrap();
1094
1095        indexer
1096            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1097            .await
1098            .unwrap();
1099        indexer
1100            .sequential_pipeline::<C>(C, SequentialConfig::default())
1101            .await
1102            .unwrap();
1103
1104        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1105    }
1106
1107    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1108    /// will not be used as the starting point if it is not the smallest valid committer watermark
1109    /// to resume ingesting from.
1110    #[tokio::test]
1111    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1112        let registry = Registry::new();
1113        let store = MockStore::default();
1114
1115        test_pipeline!(A, "concurrent_a");
1116        test_pipeline!(B, "concurrent_b");
1117        test_pipeline!(C, "sequential_c");
1118
1119        let mut conn = store.connect().await.unwrap();
1120        conn.set_committer_watermark(
1121            B::NAME,
1122            CommitterWatermark {
1123                checkpoint_hi_inclusive: 50,
1124                ..Default::default()
1125            },
1126        )
1127        .await
1128        .unwrap();
1129        conn.set_committer_watermark(
1130            C::NAME,
1131            CommitterWatermark {
1132                checkpoint_hi_inclusive: 10,
1133                ..Default::default()
1134            },
1135        )
1136        .await
1137        .unwrap();
1138
1139        let indexer_args = IndexerArgs {
1140            first_checkpoint: Some(24),
1141            ..Default::default()
1142        };
1143        let temp_dir = tempfile::tempdir().unwrap();
1144        let client_args = ClientArgs {
1145            ingestion: IngestionClientArgs {
1146                local_ingestion_path: Some(temp_dir.path().to_owned()),
1147                ..Default::default()
1148            },
1149            ..Default::default()
1150        };
1151        let ingestion_config = IngestionConfig::default();
1152
1153        let mut indexer = Indexer::new(
1154            store,
1155            indexer_args,
1156            client_args,
1157            ingestion_config,
1158            None,
1159            &registry,
1160        )
1161        .await
1162        .unwrap();
1163
1164        indexer
1165            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1166            .await
1167            .unwrap();
1168
1169        indexer
1170            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1171            .await
1172            .unwrap();
1173
1174        indexer
1175            .sequential_pipeline::<C>(C, SequentialConfig::default())
1176            .await
1177            .unwrap();
1178
1179        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1180    }
1181
1182    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1183    #[tokio::test]
1184    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1185        let registry = Registry::new();
1186        let store = MockStore::default();
1187
1188        test_pipeline!(A, "concurrent_a");
1189        test_pipeline!(B, "concurrent_b");
1190        test_pipeline!(C, "sequential_c");
1191        test_pipeline!(D, "sequential_d");
1192
1193        let mut conn = store.connect().await.unwrap();
1194        conn.set_committer_watermark(
1195            A::NAME,
1196            CommitterWatermark {
1197                checkpoint_hi_inclusive: 5,
1198                ..Default::default()
1199            },
1200        )
1201        .await
1202        .unwrap();
1203        conn.set_committer_watermark(
1204            B::NAME,
1205            CommitterWatermark {
1206                checkpoint_hi_inclusive: 10,
1207                ..Default::default()
1208            },
1209        )
1210        .await
1211        .unwrap();
1212        conn.set_committer_watermark(
1213            C::NAME,
1214            CommitterWatermark {
1215                checkpoint_hi_inclusive: 15,
1216                ..Default::default()
1217            },
1218        )
1219        .await
1220        .unwrap();
1221        conn.set_committer_watermark(
1222            D::NAME,
1223            CommitterWatermark {
1224                checkpoint_hi_inclusive: 20,
1225                ..Default::default()
1226            },
1227        )
1228        .await
1229        .unwrap();
1230
1231        // Create synthetic ingestion data
1232        let temp_dir = tempfile::tempdir().unwrap();
1233        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1234            ingestion_dir: temp_dir.path().to_owned(),
1235            starting_checkpoint: 5,
1236            num_checkpoints: 25,
1237            checkpoint_size: 1,
1238        })
1239        .await;
1240
1241        let indexer_args = IndexerArgs {
1242            last_checkpoint: Some(29),
1243            ..Default::default()
1244        };
1245
1246        let client_args = ClientArgs {
1247            ingestion: IngestionClientArgs {
1248                local_ingestion_path: Some(temp_dir.path().to_owned()),
1249                ..Default::default()
1250            },
1251            ..Default::default()
1252        };
1253
1254        let ingestion_config = IngestionConfig::default();
1255
1256        let mut indexer = Indexer::new(
1257            store.clone(),
1258            indexer_args,
1259            client_args,
1260            ingestion_config,
1261            None,
1262            &registry,
1263        )
1264        .await
1265        .unwrap();
1266
1267        indexer
1268            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1269            .await
1270            .unwrap();
1271        indexer
1272            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1273            .await
1274            .unwrap();
1275        indexer
1276            .sequential_pipeline::<C>(C, SequentialConfig::default())
1277            .await
1278            .unwrap();
1279        indexer
1280            .sequential_pipeline::<D>(D, SequentialConfig::default())
1281            .await
1282            .unwrap();
1283
1284        let ingestion_metrics = indexer.ingestion_metrics().clone();
1285        let indexer_metrics = indexer.indexer_metrics().clone();
1286
1287        indexer.run().await.unwrap().join().await.unwrap();
1288
1289        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1290        assert_eq!(
1291            indexer_metrics
1292                .total_watermarks_out_of_order
1293                .get_metric_with_label_values(&[A::NAME])
1294                .unwrap()
1295                .get(),
1296            0
1297        );
1298        assert_eq!(
1299            indexer_metrics
1300                .total_watermarks_out_of_order
1301                .get_metric_with_label_values(&[B::NAME])
1302                .unwrap()
1303                .get(),
1304            5
1305        );
1306        assert_eq!(
1307            indexer_metrics
1308                .total_watermarks_out_of_order
1309                .get_metric_with_label_values(&[C::NAME])
1310                .unwrap()
1311                .get(),
1312            10
1313        );
1314        assert_eq!(
1315            indexer_metrics
1316                .total_watermarks_out_of_order
1317                .get_metric_with_label_values(&[D::NAME])
1318                .unwrap()
1319                .get(),
1320            15
1321        );
1322    }
1323
1324    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1325    #[tokio::test]
1326    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1327        let registry = Registry::new();
1328        let store = MockStore::default();
1329
1330        test_pipeline!(A, "concurrent_a");
1331        test_pipeline!(B, "concurrent_b");
1332        test_pipeline!(C, "sequential_c");
1333        test_pipeline!(D, "sequential_d");
1334
1335        let mut conn = store.connect().await.unwrap();
1336        conn.set_committer_watermark(
1337            A::NAME,
1338            CommitterWatermark {
1339                checkpoint_hi_inclusive: 5,
1340                ..Default::default()
1341            },
1342        )
1343        .await
1344        .unwrap();
1345        conn.set_committer_watermark(
1346            B::NAME,
1347            CommitterWatermark {
1348                checkpoint_hi_inclusive: 10,
1349                ..Default::default()
1350            },
1351        )
1352        .await
1353        .unwrap();
1354        conn.set_committer_watermark(
1355            C::NAME,
1356            CommitterWatermark {
1357                checkpoint_hi_inclusive: 15,
1358                ..Default::default()
1359            },
1360        )
1361        .await
1362        .unwrap();
1363        conn.set_committer_watermark(
1364            D::NAME,
1365            CommitterWatermark {
1366                checkpoint_hi_inclusive: 20,
1367                ..Default::default()
1368            },
1369        )
1370        .await
1371        .unwrap();
1372
1373        // Create synthetic ingestion data
1374        let temp_dir = tempfile::tempdir().unwrap();
1375        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1376            ingestion_dir: temp_dir.path().to_owned(),
1377            starting_checkpoint: 5,
1378            num_checkpoints: 25,
1379            checkpoint_size: 1,
1380        })
1381        .await;
1382
1383        let indexer_args = IndexerArgs {
1384            first_checkpoint: Some(3),
1385            last_checkpoint: Some(29),
1386            ..Default::default()
1387        };
1388
1389        let client_args = ClientArgs {
1390            ingestion: IngestionClientArgs {
1391                local_ingestion_path: Some(temp_dir.path().to_owned()),
1392                ..Default::default()
1393            },
1394            ..Default::default()
1395        };
1396
1397        let ingestion_config = IngestionConfig::default();
1398
1399        let mut indexer = Indexer::new(
1400            store.clone(),
1401            indexer_args,
1402            client_args,
1403            ingestion_config,
1404            None,
1405            &registry,
1406        )
1407        .await
1408        .unwrap();
1409
1410        indexer
1411            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1412            .await
1413            .unwrap();
1414        indexer
1415            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1416            .await
1417            .unwrap();
1418        indexer
1419            .sequential_pipeline::<C>(C, SequentialConfig::default())
1420            .await
1421            .unwrap();
1422        indexer
1423            .sequential_pipeline::<D>(D, SequentialConfig::default())
1424            .await
1425            .unwrap();
1426
1427        let ingestion_metrics = indexer.ingestion_metrics().clone();
1428        let metrics = indexer.indexer_metrics().clone();
1429        indexer.run().await.unwrap().join().await.unwrap();
1430
1431        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1432        assert_eq!(
1433            metrics
1434                .total_watermarks_out_of_order
1435                .get_metric_with_label_values(&[A::NAME])
1436                .unwrap()
1437                .get(),
1438            0
1439        );
1440        assert_eq!(
1441            metrics
1442                .total_watermarks_out_of_order
1443                .get_metric_with_label_values(&[B::NAME])
1444                .unwrap()
1445                .get(),
1446            5
1447        );
1448        assert_eq!(
1449            metrics
1450                .total_watermarks_out_of_order
1451                .get_metric_with_label_values(&[C::NAME])
1452                .unwrap()
1453                .get(),
1454            10
1455        );
1456        assert_eq!(
1457            metrics
1458                .total_watermarks_out_of_order
1459                .get_metric_with_label_values(&[D::NAME])
1460                .unwrap()
1461                .get(),
1462            15
1463        );
1464    }
1465
1466    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1467    #[tokio::test]
1468    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1469        let registry = Registry::new();
1470        let store = MockStore::default();
1471
1472        test_pipeline!(A, "concurrent_a");
1473        test_pipeline!(B, "concurrent_b");
1474        test_pipeline!(C, "sequential_c");
1475        test_pipeline!(D, "sequential_d");
1476
1477        let mut conn = store.connect().await.unwrap();
1478        conn.set_committer_watermark(
1479            B::NAME,
1480            CommitterWatermark {
1481                checkpoint_hi_inclusive: 10,
1482                ..Default::default()
1483            },
1484        )
1485        .await
1486        .unwrap();
1487        conn.set_committer_watermark(
1488            C::NAME,
1489            CommitterWatermark {
1490                checkpoint_hi_inclusive: 15,
1491                ..Default::default()
1492            },
1493        )
1494        .await
1495        .unwrap();
1496        conn.set_committer_watermark(
1497            D::NAME,
1498            CommitterWatermark {
1499                checkpoint_hi_inclusive: 20,
1500                ..Default::default()
1501            },
1502        )
1503        .await
1504        .unwrap();
1505
1506        // Create synthetic ingestion data
1507        let temp_dir = tempfile::tempdir().unwrap();
1508        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1509            ingestion_dir: temp_dir.path().to_owned(),
1510            starting_checkpoint: 0,
1511            num_checkpoints: 30,
1512            checkpoint_size: 1,
1513        })
1514        .await;
1515
1516        let indexer_args = IndexerArgs {
1517            last_checkpoint: Some(29),
1518            ..Default::default()
1519        };
1520
1521        let client_args = ClientArgs {
1522            ingestion: IngestionClientArgs {
1523                local_ingestion_path: Some(temp_dir.path().to_owned()),
1524                ..Default::default()
1525            },
1526            ..Default::default()
1527        };
1528
1529        let ingestion_config = IngestionConfig::default();
1530
1531        let mut indexer = Indexer::new(
1532            store.clone(),
1533            indexer_args,
1534            client_args,
1535            ingestion_config,
1536            None,
1537            &registry,
1538        )
1539        .await
1540        .unwrap();
1541
1542        indexer
1543            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1544            .await
1545            .unwrap();
1546        indexer
1547            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1548            .await
1549            .unwrap();
1550        indexer
1551            .sequential_pipeline::<C>(C, SequentialConfig::default())
1552            .await
1553            .unwrap();
1554        indexer
1555            .sequential_pipeline::<D>(D, SequentialConfig::default())
1556            .await
1557            .unwrap();
1558
1559        let ingestion_metrics = indexer.ingestion_metrics().clone();
1560        let metrics = indexer.indexer_metrics().clone();
1561        indexer.run().await.unwrap().join().await.unwrap();
1562
1563        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1564        assert_eq!(
1565            metrics
1566                .total_watermarks_out_of_order
1567                .get_metric_with_label_values(&[A::NAME])
1568                .unwrap()
1569                .get(),
1570            0
1571        );
1572        assert_eq!(
1573            metrics
1574                .total_watermarks_out_of_order
1575                .get_metric_with_label_values(&[B::NAME])
1576                .unwrap()
1577                .get(),
1578            11
1579        );
1580        assert_eq!(
1581            metrics
1582                .total_watermarks_out_of_order
1583                .get_metric_with_label_values(&[C::NAME])
1584                .unwrap()
1585                .get(),
1586            16
1587        );
1588        assert_eq!(
1589            metrics
1590                .total_watermarks_out_of_order
1591                .get_metric_with_label_values(&[D::NAME])
1592                .unwrap()
1593                .get(),
1594            21
1595        );
1596    }
1597
1598    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1599    #[tokio::test]
1600    async fn test_indexer_ingestion_use_first_checkpoint() {
1601        let registry = Registry::new();
1602        let store = MockStore::default();
1603
1604        test_pipeline!(A, "concurrent_a");
1605        test_pipeline!(B, "concurrent_b");
1606        test_pipeline!(C, "sequential_c");
1607        test_pipeline!(D, "sequential_d");
1608
1609        let mut conn = store.connect().await.unwrap();
1610        conn.set_committer_watermark(
1611            B::NAME,
1612            CommitterWatermark {
1613                checkpoint_hi_inclusive: 10,
1614                ..Default::default()
1615            },
1616        )
1617        .await
1618        .unwrap();
1619        conn.set_committer_watermark(
1620            C::NAME,
1621            CommitterWatermark {
1622                checkpoint_hi_inclusive: 15,
1623                ..Default::default()
1624            },
1625        )
1626        .await
1627        .unwrap();
1628        conn.set_committer_watermark(
1629            D::NAME,
1630            CommitterWatermark {
1631                checkpoint_hi_inclusive: 20,
1632                ..Default::default()
1633            },
1634        )
1635        .await
1636        .unwrap();
1637
1638        // Create synthetic ingestion data
1639        let temp_dir = tempfile::tempdir().unwrap();
1640        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1641            ingestion_dir: temp_dir.path().to_owned(),
1642            starting_checkpoint: 5,
1643            num_checkpoints: 25,
1644            checkpoint_size: 1,
1645        })
1646        .await;
1647
1648        let indexer_args = IndexerArgs {
1649            first_checkpoint: Some(10),
1650            last_checkpoint: Some(29),
1651            ..Default::default()
1652        };
1653
1654        let client_args = ClientArgs {
1655            ingestion: IngestionClientArgs {
1656                local_ingestion_path: Some(temp_dir.path().to_owned()),
1657                ..Default::default()
1658            },
1659            ..Default::default()
1660        };
1661
1662        let ingestion_config = IngestionConfig::default();
1663
1664        let mut indexer = Indexer::new(
1665            store.clone(),
1666            indexer_args,
1667            client_args,
1668            ingestion_config,
1669            None,
1670            &registry,
1671        )
1672        .await
1673        .unwrap();
1674
1675        indexer
1676            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1677            .await
1678            .unwrap();
1679        indexer
1680            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1681            .await
1682            .unwrap();
1683        indexer
1684            .sequential_pipeline::<C>(C, SequentialConfig::default())
1685            .await
1686            .unwrap();
1687        indexer
1688            .sequential_pipeline::<D>(D, SequentialConfig::default())
1689            .await
1690            .unwrap();
1691
1692        let ingestion_metrics = indexer.ingestion_metrics().clone();
1693        let metrics = indexer.indexer_metrics().clone();
1694        indexer.run().await.unwrap().join().await.unwrap();
1695
1696        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1697        assert_eq!(
1698            metrics
1699                .total_watermarks_out_of_order
1700                .get_metric_with_label_values(&[A::NAME])
1701                .unwrap()
1702                .get(),
1703            0
1704        );
1705        assert_eq!(
1706            metrics
1707                .total_watermarks_out_of_order
1708                .get_metric_with_label_values(&[B::NAME])
1709                .unwrap()
1710                .get(),
1711            1
1712        );
1713        assert_eq!(
1714            metrics
1715                .total_watermarks_out_of_order
1716                .get_metric_with_label_values(&[C::NAME])
1717                .unwrap()
1718                .get(),
1719            6
1720        );
1721        assert_eq!(
1722            metrics
1723                .total_watermarks_out_of_order
1724                .get_metric_with_label_values(&[D::NAME])
1725                .unwrap()
1726                .get(),
1727            11
1728        );
1729    }
1730
1731    #[tokio::test]
1732    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1733        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1734        assert_eq!(committer_watermark, None);
1735        assert_eq!(pruner_watermark, None);
1736    }
1737
1738    #[tokio::test]
1739    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1740        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1741        assert_eq!(committer_watermark, None);
1742        assert_eq!(pruner_watermark, None);
1743    }
1744
1745    #[tokio::test]
1746    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1747        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1748
1749        let committer_watermark = committer_watermark.unwrap();
1750        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1751
1752        let pruner_watermark = pruner_watermark.unwrap();
1753        assert_eq!(pruner_watermark.reader_lo, 1);
1754        assert_eq!(pruner_watermark.pruner_hi, 1);
1755    }
1756
1757    #[tokio::test]
1758    async fn test_init_watermark_sequential() {
1759        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1760
1761        let committer_watermark = committer_watermark.unwrap();
1762        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1763
1764        let pruner_watermark = pruner_watermark.unwrap();
1765        assert_eq!(pruner_watermark.reader_lo, 1);
1766        assert_eq!(pruner_watermark.pruner_hi, 1);
1767    }
1768
1769    #[tokio::test]
1770    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1771        let registry = Registry::new();
1772        let store = MockStore::default();
1773
1774        // Set up different watermarks for three different sequential pipelines
1775        let mut conn = store.connect().await.unwrap();
1776
1777        // First handler at checkpoint 10
1778        conn.set_committer_watermark(
1779            MockHandler::NAME,
1780            CommitterWatermark {
1781                epoch_hi_inclusive: 0,
1782                checkpoint_hi_inclusive: 10,
1783                tx_hi: 20,
1784                timestamp_ms_hi_inclusive: 10000,
1785            },
1786        )
1787        .await
1788        .unwrap();
1789
1790        // SequentialHandler at checkpoint 5
1791        conn.set_committer_watermark(
1792            SequentialHandler::NAME,
1793            CommitterWatermark {
1794                epoch_hi_inclusive: 0,
1795                checkpoint_hi_inclusive: 5,
1796                tx_hi: 10,
1797                timestamp_ms_hi_inclusive: 5000,
1798            },
1799        )
1800        .await
1801        .unwrap();
1802
1803        // Create synthetic ingestion data
1804        let temp_dir = tempfile::tempdir().unwrap();
1805        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1806            ingestion_dir: temp_dir.path().to_owned(),
1807            starting_checkpoint: 0,
1808            num_checkpoints: 20,
1809            checkpoint_size: 2,
1810        })
1811        .await;
1812
1813        let indexer_args = IndexerArgs {
1814            first_checkpoint: None,
1815            last_checkpoint: Some(19),
1816            pipeline: vec![],
1817            ..Default::default()
1818        };
1819
1820        let client_args = ClientArgs {
1821            ingestion: IngestionClientArgs {
1822                local_ingestion_path: Some(temp_dir.path().to_owned()),
1823                ..Default::default()
1824            },
1825            ..Default::default()
1826        };
1827
1828        let ingestion_config = IngestionConfig::default();
1829
1830        let mut indexer = Indexer::new(
1831            store.clone(),
1832            indexer_args,
1833            client_args,
1834            ingestion_config,
1835            None,
1836            &registry,
1837        )
1838        .await
1839        .unwrap();
1840
1841        // Add first sequential pipeline
1842        indexer
1843            .sequential_pipeline(
1844                MockHandler,
1845                pipeline::sequential::SequentialConfig::default(),
1846            )
1847            .await
1848            .unwrap();
1849
1850        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1851        assert_eq!(
1852            indexer.next_sequential_checkpoint(),
1853            Some(11),
1854            "next_sequential_checkpoint should be 11"
1855        );
1856
1857        // Add second sequential pipeline
1858        indexer
1859            .sequential_pipeline(
1860                SequentialHandler,
1861                pipeline::sequential::SequentialConfig::default(),
1862            )
1863            .await
1864            .unwrap();
1865
1866        // Should change to 6 (minimum of 6 and 11)
1867        assert_eq!(
1868            indexer.next_sequential_checkpoint(),
1869            Some(6),
1870            "next_sequential_checkpoint should still be 6"
1871        );
1872
1873        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1874        indexer.run().await.unwrap().join().await.unwrap();
1875
1876        // Verify each pipeline made some progress independently
1877        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1878        let watermark2 = conn
1879            .committer_watermark(SequentialHandler::NAME)
1880            .await
1881            .unwrap();
1882
1883        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1884        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1885    }
1886
1887    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1888    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1889    /// committing checkpoints less than the main pipeline's reader watermark.
1890    #[tokio::test]
1891    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1892        let registry = Registry::new();
1893        let store = MockStore::default();
1894
1895        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1896        // reader watermark at `7`.
1897        let mut conn = store.connect().await.unwrap();
1898        conn.set_committer_watermark(
1899            MockCheckpointSequenceNumberHandler::NAME,
1900            CommitterWatermark {
1901                checkpoint_hi_inclusive: 10,
1902                ..Default::default()
1903            },
1904        )
1905        .await
1906        .unwrap();
1907        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1908            .await
1909            .unwrap();
1910
1911        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1912        // be ignored by the tasked indexer.
1913        let indexer_args = IndexerArgs {
1914            first_checkpoint: Some(0),
1915            last_checkpoint: Some(15),
1916            task: TaskArgs::tasked("task".to_string(), 10),
1917            ..Default::default()
1918        };
1919        let temp_dir = tempfile::tempdir().unwrap();
1920        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1921            ingestion_dir: temp_dir.path().to_owned(),
1922            starting_checkpoint: 0,
1923            num_checkpoints: 16,
1924            checkpoint_size: 2,
1925        })
1926        .await;
1927
1928        let client_args = ClientArgs {
1929            ingestion: IngestionClientArgs {
1930                local_ingestion_path: Some(temp_dir.path().to_owned()),
1931                ..Default::default()
1932            },
1933            ..Default::default()
1934        };
1935
1936        let ingestion_config = IngestionConfig::default();
1937
1938        let mut tasked_indexer = Indexer::new(
1939            store.clone(),
1940            indexer_args,
1941            client_args,
1942            ingestion_config,
1943            None,
1944            &registry,
1945        )
1946        .await
1947        .unwrap();
1948
1949        let _ = tasked_indexer
1950            .concurrent_pipeline(
1951                MockCheckpointSequenceNumberHandler,
1952                ConcurrentConfig::default(),
1953            )
1954            .await;
1955
1956        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1957        let metrics = tasked_indexer.indexer_metrics().clone();
1958
1959        tasked_indexer.run().await.unwrap().join().await.unwrap();
1960
1961        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1962        assert_eq!(
1963            metrics
1964                .total_collector_skipped_checkpoints
1965                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1966                .unwrap()
1967                .get(),
1968            7
1969        );
1970        let data = store
1971            .data
1972            .get(MockCheckpointSequenceNumberHandler::NAME)
1973            .unwrap();
1974        assert_eq!(data.len(), 9);
1975        for i in 0..7 {
1976            assert!(data.get(&i).is_none());
1977        }
1978        for i in 7..16 {
1979            assert!(data.get(&i).is_some());
1980        }
1981    }
1982
1983    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1984    #[tokio::test]
1985    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1986        let registry = Registry::new();
1987        let store = MockStore::default();
1988
1989        let mut conn = store.connect().await.unwrap();
1990        conn.set_committer_watermark(
1991            "test",
1992            CommitterWatermark {
1993                checkpoint_hi_inclusive: 10,
1994                ..Default::default()
1995            },
1996        )
1997        .await
1998        .unwrap();
1999        conn.set_reader_watermark("test", 5).await.unwrap();
2000
2001        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
2002        // watermarks.
2003        let indexer_args = IndexerArgs {
2004            first_checkpoint: Some(9),
2005            last_checkpoint: Some(25),
2006            task: TaskArgs::tasked("task".to_string(), 10),
2007            ..Default::default()
2008        };
2009        let temp_dir = tempfile::tempdir().unwrap();
2010        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2011            ingestion_dir: temp_dir.path().to_owned(),
2012            starting_checkpoint: 9,
2013            num_checkpoints: 17,
2014            checkpoint_size: 2,
2015        })
2016        .await;
2017
2018        let client_args = ClientArgs {
2019            ingestion: IngestionClientArgs {
2020                local_ingestion_path: Some(temp_dir.path().to_owned()),
2021                ..Default::default()
2022            },
2023            ..Default::default()
2024        };
2025
2026        let ingestion_config = IngestionConfig::default();
2027
2028        let mut tasked_indexer = Indexer::new(
2029            store.clone(),
2030            indexer_args,
2031            client_args,
2032            ingestion_config,
2033            None,
2034            &registry,
2035        )
2036        .await
2037        .unwrap();
2038
2039        let _ = tasked_indexer
2040            .concurrent_pipeline(
2041                MockCheckpointSequenceNumberHandler,
2042                ConcurrentConfig::default(),
2043            )
2044            .await;
2045
2046        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2047        let metrics = tasked_indexer.indexer_metrics().clone();
2048
2049        tasked_indexer.run().await.unwrap().join().await.unwrap();
2050
2051        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2052        assert_eq!(
2053            metrics
2054                .total_watermarks_out_of_order
2055                .get_metric_with_label_values(&["test"])
2056                .unwrap()
2057                .get(),
2058            0
2059        );
2060        assert_eq!(
2061            metrics
2062                .total_collector_skipped_checkpoints
2063                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2064                .unwrap()
2065                .get(),
2066            0
2067        );
2068
2069        let data = store.data.get("test").unwrap();
2070        assert!(data.len() == 17);
2071        for i in 0..9 {
2072            assert!(data.get(&i).is_none());
2073        }
2074        for i in 9..26 {
2075            assert!(data.get(&i).is_some());
2076        }
2077        let main_pipeline_watermark = store.watermark("test").unwrap();
2078        // assert that the main pipeline's watermarks are not updated
2079        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2080        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2081        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2082        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2083        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2084    }
2085
2086    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2087    /// committed, and any checkpoints inflight < X will be skipped.
2088    #[tokio::test]
2089    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2090        let registry = Registry::new();
2091        let store = MockStore::default();
2092        let mut conn = store.connect().await.unwrap();
2093        // Set the main pipeline watermark.
2094        conn.set_committer_watermark(
2095            ControllableHandler::NAME,
2096            CommitterWatermark {
2097                checkpoint_hi_inclusive: 11,
2098                ..Default::default()
2099            },
2100        )
2101        .await
2102        .unwrap();
2103
2104        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2105        let temp_dir = tempfile::tempdir().unwrap();
2106        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2107            ingestion_dir: temp_dir.path().to_owned(),
2108            starting_checkpoint: 0,
2109            num_checkpoints: 501,
2110            checkpoint_size: 2,
2111        })
2112        .await;
2113        let indexer_args = IndexerArgs {
2114            first_checkpoint: Some(0),
2115            last_checkpoint: Some(500),
2116            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2117            ..Default::default()
2118        };
2119        let client_args = ClientArgs {
2120            ingestion: IngestionClientArgs {
2121                local_ingestion_path: Some(temp_dir.path().to_owned()),
2122                ..Default::default()
2123            },
2124            ..Default::default()
2125        };
2126        let ingestion_config = IngestionConfig::default();
2127        let mut tasked_indexer = Indexer::new(
2128            store.clone(),
2129            indexer_args,
2130            client_args,
2131            ingestion_config,
2132            None,
2133            &registry,
2134        )
2135        .await
2136        .unwrap();
2137        let mut allow_process = 10;
2138        // Limit the pipeline to process only checkpoints `[0, 10]`.
2139        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2140        let _ = tasked_indexer
2141            .concurrent_pipeline(
2142                controllable_handler,
2143                ConcurrentConfig {
2144                    committer: CommitterConfig {
2145                        collect_interval_ms: 10,
2146                        watermark_interval_ms: 10,
2147                        ..Default::default()
2148                    },
2149                    // High fixed concurrency so all checkpoints can be processed
2150                    // concurrently despite out-of-order arrival.
2151                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2152                    ..Default::default()
2153                },
2154            )
2155            .await;
2156        let metrics = tasked_indexer.indexer_metrics().clone();
2157
2158        let mut s_indexer = tasked_indexer.run().await.unwrap();
2159
2160        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2161        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2162        // committed.
2163        store
2164            .wait_for_watermark(
2165                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2166                10,
2167                std::time::Duration::from_secs(10),
2168            )
2169            .await;
2170
2171        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2172        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2173        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2174        // one at a time until the collector_reader_lo metric shows the new value.
2175        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2176            .await
2177            .unwrap();
2178
2179        let reader_lo = metrics
2180            .collector_reader_lo
2181            .with_label_values(&[ControllableHandler::NAME]);
2182
2183        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2184        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2185        // checkpoints have been processed.
2186        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2187        while reader_lo.get() != 250 {
2188            interval.tick().await;
2189            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2190            allow_process += 1;
2191            assert!(
2192                allow_process <= 500,
2193                "Released all checkpoints but collector never observed new reader_lo"
2194            );
2195            process_below.send(allow_process).ok();
2196        }
2197
2198        // At this point, the collector has observed reader_lo = 250. Release all remaining
2199        // checkpoints. Guarantees:
2200        // - [0, 10]: committed (before reader_lo was set)
2201        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2202        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2203        // - [250, 500]: committed (>= reader_lo)
2204        process_below.send(500).ok();
2205
2206        s_indexer.join().await.unwrap();
2207
2208        let data = store.data.get(ControllableHandler::NAME).unwrap();
2209
2210        // Checkpoints (allow_process, 250) must be skipped.
2211        for chkpt in (allow_process + 1)..250 {
2212            assert!(
2213                data.get(&chkpt).is_none(),
2214                "Checkpoint {chkpt} should have been skipped"
2215            );
2216        }
2217
2218        // Checkpoints >= reader_lo must be committed.
2219        for chkpt in 250..=500 {
2220            assert!(
2221                data.get(&chkpt).is_some(),
2222                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2223            );
2224        }
2225
2226        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2227        for chkpt in 0..=10 {
2228            assert!(
2229                data.get(&chkpt).is_some(),
2230                "Checkpoint {chkpt} should have been committed (baseline)"
2231            );
2232        }
2233    }
2234}