sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod config;
42pub mod ingestion;
43pub mod metrics;
44pub mod pipeline;
45#[cfg(feature = "postgres")]
46pub mod postgres;
47
48#[cfg(test)]
49pub mod mocks;
50
51/// Command-line arguments for the indexer
52#[derive(clap::Args, Default, Debug, Clone)]
53pub struct IndexerArgs {
54    /// Override the next checkpoint for all pipelines without a committer watermark to start
55    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
56    /// setting and always resume from their committer watermark + 1.
57    ///
58    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
59    /// is the minimum across all pipelines' next checkpoints.
60    #[arg(long)]
61    pub first_checkpoint: Option<u64>,
62
63    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
64    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
65    #[arg(long)]
66    pub last_checkpoint: Option<u64>,
67
68    /// Only run the following pipelines. If not provided, all pipelines found in the
69    /// configuration file will be run.
70    #[arg(long, action = clap::ArgAction::Append)]
71    pub pipeline: Vec<String>,
72
73    /// Additional configurations for running a tasked indexer.
74    #[clap(flatten)]
75    pub task: TaskArgs,
76}
77
78/// Command-line arguments for configuring a tasked indexer.
79#[derive(clap::Parser, Default, Debug, Clone)]
80pub struct TaskArgs {
81    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
82    /// delimiter defined on the store. This allows the same pipelines to run under multiple
83    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
84    /// entries in the database.
85    ///
86    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
87    ///
88    /// Sequential pipelines cannot be attached to a tasked indexer.
89    ///
90    /// The framework ensures that tasked pipelines never commit checkpoints below the main
91    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
92    #[arg(long, requires = "reader_interval_ms")]
93    task: Option<String>,
94
95    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
96    /// refetch its main pipeline's reader watermark.
97    ///
98    /// This is required when `--task` is set and should should ideally be set to a value that is
99    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
100    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
101    /// it.
102    ///
103    /// If the main pipeline does not have pruning enabled, this value can be set to some high
104    /// value, as the tasked pipeline will never see an updated reader watermark.
105    #[arg(long, requires = "task")]
106    reader_interval_ms: Option<u64>,
107}
108
109pub struct Indexer<S: Store> {
110    /// The storage backend that the indexer uses to write and query indexed data. This
111    /// generic implementation allows for plugging in different storage solutions that implement the
112    /// `Store` trait.
113    store: S,
114
115    /// Prometheus Metrics.
116    metrics: Arc<IndexerMetrics>,
117
118    /// Service for downloading and disseminating checkpoint data.
119    ingestion_service: IngestionService,
120
121    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
122    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
123    /// always resume from their committer watermark + 1.
124    ///
125    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
126    /// is the minimum across all pipelines' next checkpoints.
127    default_next_checkpoint: u64,
128
129    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
130    /// this checkpoint.
131    last_checkpoint: Option<u64>,
132
133    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
134    /// delimiter defined on the store. This allows the same pipelines to run under multiple
135    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
136    /// entries in the database.
137    ///
138    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
139    ///
140    /// Sequential pipelines cannot be attached to a tasked indexer.
141    ///
142    /// The framework ensures that tasked pipelines never commit checkpoints below the main
143    /// pipeline’s pruner watermark.
144    task: Option<Task>,
145
146    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
147    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
148    /// a warning when the indexer is run.
149    enabled_pipelines: Option<BTreeSet<String>>,
150
151    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
152    /// with the same name isn't added twice.
153    added_pipelines: BTreeSet<&'static str>,
154
155    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
156    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
157    /// unless overridden by [Self::default_next_checkpoint].
158    first_ingestion_checkpoint: u64,
159
160    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
161    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
162    next_sequential_checkpoint: Option<u64>,
163
164    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
165    pipelines: Vec<Service>,
166}
167
168/// Configuration for a tasked indexer.
169#[derive(Clone)]
170pub(crate) struct Task {
171    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
172    /// record pipeline watermarks.
173    task: String,
174    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
175    /// pipeline's reader watermark.
176    reader_interval: Duration,
177}
178
179impl TaskArgs {
180    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
181        Self {
182            task: Some(task),
183            reader_interval_ms: Some(reader_interval_ms),
184        }
185    }
186
187    fn into_task(self) -> Option<Task> {
188        Some(Task {
189            task: self.task?,
190            reader_interval: Duration::from_millis(self.reader_interval_ms?),
191        })
192    }
193}
194
195impl<S: Store> Indexer<S> {
196    /// Create a new instance of the indexer framework from a store that implements the `Store`
197    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
198    /// arguments configure the following:
199    ///
200    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
201    ///   watermarks) and where to serve metrics from,
202    /// - Where to download checkpoints from,
203    /// - Concurrency and buffering parameters for downloading checkpoints.
204    ///
205    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
206    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
207    pub async fn new(
208        store: S,
209        indexer_args: IndexerArgs,
210        client_args: ClientArgs,
211        ingestion_config: IngestionConfig,
212        metrics_prefix: Option<&str>,
213        registry: &Registry,
214    ) -> Result<Self> {
215        let IndexerArgs {
216            first_checkpoint,
217            last_checkpoint,
218            pipeline,
219            task,
220        } = indexer_args;
221
222        let metrics = IndexerMetrics::new(metrics_prefix, registry);
223
224        let ingestion_service =
225            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
226
227        Ok(Self {
228            store,
229            metrics,
230            ingestion_service,
231            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
232            last_checkpoint,
233            task: task.into_task(),
234            enabled_pipelines: if pipeline.is_empty() {
235                None
236            } else {
237                Some(pipeline.into_iter().collect())
238            },
239            added_pipelines: BTreeSet::new(),
240            first_ingestion_checkpoint: u64::MAX,
241            next_sequential_checkpoint: None,
242            pipelines: vec![],
243        })
244    }
245
246    /// The store used by the indexer.
247    pub fn store(&self) -> &S {
248        &self.store
249    }
250
251    /// The ingestion client used by the indexer to fetch checkpoints.
252    pub fn ingestion_client(&self) -> &IngestionClient {
253        self.ingestion_service.ingestion_client()
254    }
255
256    /// The indexer's metrics.
257    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
258        &self.metrics
259    }
260
261    /// The ingestion service's metrics.
262    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
263        self.ingestion_service.metrics()
264    }
265
266    /// The pipelines that this indexer will run.
267    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
268        self.added_pipelines.iter().copied().filter(|p| {
269            self.enabled_pipelines
270                .as_ref()
271                .is_none_or(|e| e.contains(*p))
272        })
273    }
274
275    /// The minimum next checkpoint across all sequential pipelines. This value is used to
276    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
277    /// too far ahead of sequential pipelines.
278    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
279        self.next_sequential_checkpoint
280    }
281
282    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
283    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
284    ///
285    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
286    /// keep the watermark table up-to-date with the highest point they can guarantee all data
287    /// exists for, for their pipeline.
288    pub async fn concurrent_pipeline<H>(
289        &mut self,
290        handler: H,
291        config: ConcurrentConfig,
292    ) -> Result<()>
293    where
294        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
295    {
296        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
297            return Ok(());
298        };
299
300        self.pipelines.push(concurrent::pipeline::<H>(
301            handler,
302            next_checkpoint,
303            config,
304            self.store.clone(),
305            self.task.clone(),
306            self.ingestion_service.subscribe().0,
307            self.metrics.clone(),
308        ));
309
310        Ok(())
311    }
312
313    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
314    /// will start processing and committing once the ingestion service has caught up to their
315    /// respective watermarks.
316    ///
317    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
318    pub async fn run(self) -> Result<Service> {
319        if let Some(enabled_pipelines) = self.enabled_pipelines {
320            ensure!(
321                enabled_pipelines.is_empty(),
322                "Tried to enable pipelines that this indexer does not know about: \
323                {enabled_pipelines:#?}",
324            );
325        }
326
327        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
328
329        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
330
331        let mut service = self
332            .ingestion_service
333            .run(
334                self.first_ingestion_checkpoint..=last_checkpoint,
335                self.next_sequential_checkpoint,
336            )
337            .await
338            .context("Failed to start ingestion service")?;
339
340        for pipeline in self.pipelines {
341            service = service.merge(pipeline);
342        }
343
344        Ok(service)
345    }
346
347    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
348    /// checkpoint after its watermark, or if that doesn't exist, then the provided
349    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
350    ///
351    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
352    /// calculated above.
353    ///
354    /// Returns `Ok(None)` if the pipeline is disabled.
355    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
356        ensure!(
357            self.added_pipelines.insert(P::NAME),
358            "Pipeline {:?} already added",
359            P::NAME,
360        );
361
362        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
363            && !enabled_pipelines.remove(P::NAME)
364        {
365            info!(pipeline = P::NAME, "Skipping");
366            return Ok(None);
367        }
368
369        let mut conn = self
370            .store
371            .connect()
372            .await
373            .context("Failed to establish connection to store")?;
374
375        let pipeline_task =
376            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
377
378        let checkpoint_hi_inclusive = conn
379            .init_watermark(&pipeline_task, self.default_next_checkpoint)
380            .await
381            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
382
383        let next_checkpoint =
384            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
385
386        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
387
388        Ok(Some(next_checkpoint))
389    }
390}
391
392impl<T: TransactionalStore> Indexer<T> {
393    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
394    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
395    ///
396    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
397    /// required to handle pipelines that modify data in-place (where each update is not an insert,
398    /// but could be a modification of an existing row, where ordering between updates is
399    /// important).
400    ///
401    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
402    /// number of checkpoints (configured by `checkpoint_lag`).
403    pub async fn sequential_pipeline<H>(
404        &mut self,
405        handler: H,
406        config: SequentialConfig,
407    ) -> Result<()>
408    where
409        H: Handler<Store = T> + Send + Sync + 'static,
410    {
411        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
412            return Ok(());
413        };
414
415        if self.task.is_some() {
416            bail!(
417                "Sequential pipelines do not support pipeline tasks. \
418                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
419                Running the same pipeline under a different task would violate these guarantees."
420            );
421        }
422
423        // Track the minimum next_checkpoint across all sequential pipelines
424        self.next_sequential_checkpoint = Some(
425            self.next_sequential_checkpoint
426                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
427        );
428
429        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
430
431        self.pipelines.push(sequential::pipeline::<H>(
432            handler,
433            next_checkpoint,
434            config,
435            self.store.clone(),
436            checkpoint_rx,
437            commit_hi_tx,
438            self.metrics.clone(),
439        ));
440
441        Ok(())
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use std::sync::Arc;
448
449    use async_trait::async_trait;
450    use clap::Parser;
451    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
452    use sui_synthetic_ingestion::synthetic_ingestion;
453    use tokio::sync::watch;
454
455    use crate::FieldCount;
456    use crate::config::ConcurrencyConfig;
457    use crate::ingestion::ingestion_client::IngestionClientArgs;
458    use crate::mocks::store::MockStore;
459    use crate::pipeline::CommitterConfig;
460    use crate::pipeline::Processor;
461    use crate::pipeline::concurrent::ConcurrentConfig;
462    use crate::store::CommitterWatermark;
463
464    use super::*;
465
466    #[allow(dead_code)]
467    #[derive(Clone, FieldCount)]
468    struct MockValue(u64);
469
470    /// A handler that can be controlled externally to block checkpoint processing.
471    struct ControllableHandler {
472        /// Process checkpoints less than or equal to this value.
473        process_below: watch::Receiver<u64>,
474    }
475
476    impl ControllableHandler {
477        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
478            let (tx, rx) = watch::channel(limit);
479            (Self { process_below: rx }, tx)
480        }
481    }
482
483    #[async_trait]
484    impl Processor for ControllableHandler {
485        const NAME: &'static str = "controllable";
486        type Value = MockValue;
487
488        async fn process(
489            &self,
490            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
491        ) -> anyhow::Result<Vec<Self::Value>> {
492            let cp_num = checkpoint.summary.sequence_number;
493
494            // Wait until the checkpoint is allowed to be processed
495            self.process_below
496                .clone()
497                .wait_for(|&limit| cp_num <= limit)
498                .await
499                .ok();
500
501            Ok(vec![MockValue(cp_num)])
502        }
503    }
504
505    #[async_trait]
506    impl crate::pipeline::concurrent::Handler for ControllableHandler {
507        type Store = MockStore;
508        type Batch = Vec<MockValue>;
509
510        fn batch(
511            &self,
512            batch: &mut Self::Batch,
513            values: &mut std::vec::IntoIter<Self::Value>,
514        ) -> crate::pipeline::concurrent::BatchStatus {
515            batch.extend(values);
516            crate::pipeline::concurrent::BatchStatus::Ready
517        }
518
519        async fn commit<'a>(
520            &self,
521            batch: &Self::Batch,
522            conn: &mut <Self::Store as Store>::Connection<'a>,
523        ) -> anyhow::Result<usize> {
524            for value in batch {
525                conn.0
526                    .commit_data(Self::NAME, value.0, vec![value.0])
527                    .await?;
528            }
529            Ok(batch.len())
530        }
531    }
532
533    macro_rules! test_pipeline {
534        ($handler:ident, $name:literal) => {
535            struct $handler;
536
537            #[async_trait]
538            impl Processor for $handler {
539                const NAME: &'static str = $name;
540                type Value = MockValue;
541                async fn process(
542                    &self,
543                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
544                ) -> anyhow::Result<Vec<Self::Value>> {
545                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
546                }
547            }
548
549            #[async_trait]
550            impl crate::pipeline::concurrent::Handler for $handler {
551                type Store = MockStore;
552                type Batch = Vec<Self::Value>;
553
554                fn batch(
555                    &self,
556                    batch: &mut Self::Batch,
557                    values: &mut std::vec::IntoIter<Self::Value>,
558                ) -> crate::pipeline::concurrent::BatchStatus {
559                    batch.extend(values);
560                    crate::pipeline::concurrent::BatchStatus::Pending
561                }
562
563                async fn commit<'a>(
564                    &self,
565                    batch: &Self::Batch,
566                    conn: &mut <Self::Store as Store>::Connection<'a>,
567                ) -> anyhow::Result<usize> {
568                    for value in batch {
569                        conn.0
570                            .commit_data(Self::NAME, value.0, vec![value.0])
571                            .await?;
572                    }
573                    Ok(batch.len())
574                }
575            }
576
577            #[async_trait]
578            impl crate::pipeline::sequential::Handler for $handler {
579                type Store = MockStore;
580                type Batch = Vec<Self::Value>;
581
582                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
583                    batch.extend(values);
584                }
585
586                async fn commit<'a>(
587                    &self,
588                    _batch: &Self::Batch,
589                    _conn: &mut <Self::Store as Store>::Connection<'a>,
590                ) -> anyhow::Result<usize> {
591                    Ok(1)
592                }
593            }
594        };
595    }
596
597    test_pipeline!(MockHandler, "test_processor");
598    test_pipeline!(SequentialHandler, "sequential_handler");
599    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
600
601    async fn test_init_watermark(
602        first_checkpoint: Option<u64>,
603        is_concurrent: bool,
604    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
605        let registry = Registry::new();
606        let store = MockStore::default();
607
608        test_pipeline!(A, "pipeline_name");
609
610        let mut conn = store.connect().await.unwrap();
611
612        let indexer_args = IndexerArgs {
613            first_checkpoint,
614            ..IndexerArgs::default()
615        };
616        let temp_dir = tempfile::tempdir().unwrap();
617        let client_args = ClientArgs {
618            ingestion: IngestionClientArgs {
619                local_ingestion_path: Some(temp_dir.path().to_owned()),
620                ..Default::default()
621            },
622            ..Default::default()
623        };
624        let ingestion_config = IngestionConfig::default();
625
626        let mut indexer = Indexer::new(
627            store.clone(),
628            indexer_args,
629            client_args,
630            ingestion_config,
631            None,
632            &registry,
633        )
634        .await
635        .unwrap();
636
637        if is_concurrent {
638            indexer
639                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
640                .await
641                .unwrap();
642        } else {
643            indexer
644                .sequential_pipeline::<A>(A, SequentialConfig::default())
645                .await
646                .unwrap();
647        }
648
649        (
650            conn.committer_watermark(A::NAME).await.unwrap(),
651            conn.pruner_watermark(A::NAME, Duration::ZERO)
652                .await
653                .unwrap(),
654        )
655    }
656
657    #[test]
658    fn test_arg_parsing() {
659        #[derive(Parser)]
660        struct Args {
661            #[clap(flatten)]
662            indexer: IndexerArgs,
663        }
664
665        let args = Args::try_parse_from([
666            "cmd",
667            "--first-checkpoint",
668            "10",
669            "--last-checkpoint",
670            "100",
671            "--pipeline",
672            "a",
673            "--pipeline",
674            "b",
675            "--task",
676            "t",
677            "--reader-interval-ms",
678            "5000",
679        ])
680        .unwrap();
681
682        assert_eq!(args.indexer.first_checkpoint, Some(10));
683        assert_eq!(args.indexer.last_checkpoint, Some(100));
684        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
685        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
686        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
687    }
688
689    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
690    #[tokio::test]
691    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
692        let registry = Registry::new();
693        let store = MockStore::default();
694
695        test_pipeline!(A, "concurrent_a");
696        test_pipeline!(B, "concurrent_b");
697        test_pipeline!(C, "sequential_c");
698        test_pipeline!(D, "sequential_d");
699
700        let mut conn = store.connect().await.unwrap();
701        conn.set_committer_watermark(
702            A::NAME,
703            CommitterWatermark {
704                checkpoint_hi_inclusive: 100,
705                ..Default::default()
706            },
707        )
708        .await
709        .unwrap();
710        conn.set_committer_watermark(
711            B::NAME,
712            CommitterWatermark {
713                checkpoint_hi_inclusive: 10,
714                ..Default::default()
715            },
716        )
717        .await
718        .unwrap();
719        conn.set_committer_watermark(
720            C::NAME,
721            CommitterWatermark {
722                checkpoint_hi_inclusive: 1,
723                ..Default::default()
724            },
725        )
726        .await
727        .unwrap();
728        conn.set_committer_watermark(
729            D::NAME,
730            CommitterWatermark {
731                checkpoint_hi_inclusive: 50,
732                ..Default::default()
733            },
734        )
735        .await
736        .unwrap();
737
738        let indexer_args = IndexerArgs::default();
739        let temp_dir = tempfile::tempdir().unwrap();
740        let client_args = ClientArgs {
741            ingestion: IngestionClientArgs {
742                local_ingestion_path: Some(temp_dir.path().to_owned()),
743                ..Default::default()
744            },
745            ..Default::default()
746        };
747        let ingestion_config = IngestionConfig::default();
748
749        let mut indexer = Indexer::new(
750            store,
751            indexer_args,
752            client_args,
753            ingestion_config,
754            None,
755            &registry,
756        )
757        .await
758        .unwrap();
759
760        indexer
761            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
762            .await
763            .unwrap();
764        indexer
765            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
766            .await
767            .unwrap();
768        indexer
769            .sequential_pipeline::<C>(C, SequentialConfig::default())
770            .await
771            .unwrap();
772        indexer
773            .sequential_pipeline::<D>(D, SequentialConfig::default())
774            .await
775            .unwrap();
776
777        assert_eq!(indexer.first_ingestion_checkpoint, 2);
778    }
779
780    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
781    #[tokio::test]
782    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
783        let registry = Registry::new();
784        let store = MockStore::default();
785
786        test_pipeline!(A, "concurrent_a");
787        test_pipeline!(B, "concurrent_b");
788        test_pipeline!(C, "sequential_c");
789        test_pipeline!(D, "sequential_d");
790
791        let mut conn = store.connect().await.unwrap();
792        conn.set_committer_watermark(
793            B::NAME,
794            CommitterWatermark {
795                checkpoint_hi_inclusive: 10,
796                ..Default::default()
797            },
798        )
799        .await
800        .unwrap();
801        conn.set_committer_watermark(
802            C::NAME,
803            CommitterWatermark {
804                checkpoint_hi_inclusive: 1,
805                ..Default::default()
806            },
807        )
808        .await
809        .unwrap();
810
811        let indexer_args = IndexerArgs::default();
812        let temp_dir = tempfile::tempdir().unwrap();
813        let client_args = ClientArgs {
814            ingestion: IngestionClientArgs {
815                local_ingestion_path: Some(temp_dir.path().to_owned()),
816                ..Default::default()
817            },
818            ..Default::default()
819        };
820        let ingestion_config = IngestionConfig::default();
821
822        let mut indexer = Indexer::new(
823            store,
824            indexer_args,
825            client_args,
826            ingestion_config,
827            None,
828            &registry,
829        )
830        .await
831        .unwrap();
832
833        indexer
834            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
835            .await
836            .unwrap();
837        indexer
838            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
839            .await
840            .unwrap();
841        indexer
842            .sequential_pipeline::<C>(C, SequentialConfig::default())
843            .await
844            .unwrap();
845        indexer
846            .sequential_pipeline::<D>(D, SequentialConfig::default())
847            .await
848            .unwrap();
849
850        assert_eq!(indexer.first_ingestion_checkpoint, 0);
851    }
852
853    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
854    #[tokio::test]
855    async fn test_first_ingestion_checkpoint_smallest_is_0() {
856        let registry = Registry::new();
857        let store = MockStore::default();
858
859        test_pipeline!(A, "concurrent_a");
860        test_pipeline!(B, "concurrent_b");
861        test_pipeline!(C, "sequential_c");
862        test_pipeline!(D, "sequential_d");
863
864        let mut conn = store.connect().await.unwrap();
865        conn.set_committer_watermark(
866            A::NAME,
867            CommitterWatermark {
868                checkpoint_hi_inclusive: 100,
869                ..Default::default()
870            },
871        )
872        .await
873        .unwrap();
874        conn.set_committer_watermark(
875            B::NAME,
876            CommitterWatermark {
877                checkpoint_hi_inclusive: 10,
878                ..Default::default()
879            },
880        )
881        .await
882        .unwrap();
883        conn.set_committer_watermark(
884            C::NAME,
885            CommitterWatermark {
886                checkpoint_hi_inclusive: 1,
887                ..Default::default()
888            },
889        )
890        .await
891        .unwrap();
892        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
893            .await
894            .unwrap();
895
896        let indexer_args = IndexerArgs::default();
897        let temp_dir = tempfile::tempdir().unwrap();
898        let client_args = ClientArgs {
899            ingestion: IngestionClientArgs {
900                local_ingestion_path: Some(temp_dir.path().to_owned()),
901                ..Default::default()
902            },
903            ..Default::default()
904        };
905        let ingestion_config = IngestionConfig::default();
906
907        let mut indexer = Indexer::new(
908            store,
909            indexer_args,
910            client_args,
911            ingestion_config,
912            None,
913            &registry,
914        )
915        .await
916        .unwrap();
917
918        indexer
919            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
920            .await
921            .unwrap();
922        indexer
923            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
924            .await
925            .unwrap();
926        indexer
927            .sequential_pipeline::<C>(C, SequentialConfig::default())
928            .await
929            .unwrap();
930        indexer
931            .sequential_pipeline::<D>(D, SequentialConfig::default())
932            .await
933            .unwrap();
934
935        assert_eq!(indexer.first_ingestion_checkpoint, 1);
936    }
937
938    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
939    /// watermark, and first_checkpoint is smallest.
940    #[tokio::test]
941    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
942        let registry = Registry::new();
943        let store = MockStore::default();
944
945        test_pipeline!(A, "concurrent_a");
946        test_pipeline!(B, "concurrent_b");
947        test_pipeline!(C, "sequential_c");
948        test_pipeline!(D, "sequential_d");
949
950        let mut conn = store.connect().await.unwrap();
951        conn.set_committer_watermark(
952            B::NAME,
953            CommitterWatermark {
954                checkpoint_hi_inclusive: 50,
955                ..Default::default()
956            },
957        )
958        .await
959        .unwrap();
960        conn.set_committer_watermark(
961            C::NAME,
962            CommitterWatermark {
963                checkpoint_hi_inclusive: 10,
964                ..Default::default()
965            },
966        )
967        .await
968        .unwrap();
969
970        let indexer_args = IndexerArgs {
971            first_checkpoint: Some(5),
972            ..Default::default()
973        };
974        let temp_dir = tempfile::tempdir().unwrap();
975        let client_args = ClientArgs {
976            ingestion: IngestionClientArgs {
977                local_ingestion_path: Some(temp_dir.path().to_owned()),
978                ..Default::default()
979            },
980            ..Default::default()
981        };
982        let ingestion_config = IngestionConfig::default();
983
984        let mut indexer = Indexer::new(
985            store,
986            indexer_args,
987            client_args,
988            ingestion_config,
989            None,
990            &registry,
991        )
992        .await
993        .unwrap();
994
995        indexer
996            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
997            .await
998            .unwrap();
999        indexer
1000            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1001            .await
1002            .unwrap();
1003        indexer
1004            .sequential_pipeline::<C>(C, SequentialConfig::default())
1005            .await
1006            .unwrap();
1007        indexer
1008            .sequential_pipeline::<D>(D, SequentialConfig::default())
1009            .await
1010            .unwrap();
1011
1012        assert_eq!(indexer.first_ingestion_checkpoint, 5);
1013    }
1014
1015    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1016    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1017    #[tokio::test]
1018    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1019        let registry = Registry::new();
1020        let store = MockStore::default();
1021
1022        test_pipeline!(B, "concurrent_b");
1023        test_pipeline!(C, "sequential_c");
1024
1025        let mut conn = store.connect().await.unwrap();
1026        conn.set_committer_watermark(
1027            B::NAME,
1028            CommitterWatermark {
1029                checkpoint_hi_inclusive: 50,
1030                ..Default::default()
1031            },
1032        )
1033        .await
1034        .unwrap();
1035        conn.set_committer_watermark(
1036            C::NAME,
1037            CommitterWatermark {
1038                checkpoint_hi_inclusive: 10,
1039                ..Default::default()
1040            },
1041        )
1042        .await
1043        .unwrap();
1044
1045        let indexer_args = IndexerArgs {
1046            first_checkpoint: Some(5),
1047            ..Default::default()
1048        };
1049        let temp_dir = tempfile::tempdir().unwrap();
1050        let client_args = ClientArgs {
1051            ingestion: IngestionClientArgs {
1052                local_ingestion_path: Some(temp_dir.path().to_owned()),
1053                ..Default::default()
1054            },
1055            ..Default::default()
1056        };
1057        let ingestion_config = IngestionConfig::default();
1058
1059        let mut indexer = Indexer::new(
1060            store,
1061            indexer_args,
1062            client_args,
1063            ingestion_config,
1064            None,
1065            &registry,
1066        )
1067        .await
1068        .unwrap();
1069
1070        indexer
1071            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1072            .await
1073            .unwrap();
1074        indexer
1075            .sequential_pipeline::<C>(C, SequentialConfig::default())
1076            .await
1077            .unwrap();
1078
1079        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1080    }
1081
1082    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1083    /// will not be used as the starting point if it is not the smallest valid committer watermark
1084    /// to resume ingesting from.
1085    #[tokio::test]
1086    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1087        let registry = Registry::new();
1088        let store = MockStore::default();
1089
1090        test_pipeline!(A, "concurrent_a");
1091        test_pipeline!(B, "concurrent_b");
1092        test_pipeline!(C, "sequential_c");
1093
1094        let mut conn = store.connect().await.unwrap();
1095        conn.set_committer_watermark(
1096            B::NAME,
1097            CommitterWatermark {
1098                checkpoint_hi_inclusive: 50,
1099                ..Default::default()
1100            },
1101        )
1102        .await
1103        .unwrap();
1104        conn.set_committer_watermark(
1105            C::NAME,
1106            CommitterWatermark {
1107                checkpoint_hi_inclusive: 10,
1108                ..Default::default()
1109            },
1110        )
1111        .await
1112        .unwrap();
1113
1114        let indexer_args = IndexerArgs {
1115            first_checkpoint: Some(24),
1116            ..Default::default()
1117        };
1118        let temp_dir = tempfile::tempdir().unwrap();
1119        let client_args = ClientArgs {
1120            ingestion: IngestionClientArgs {
1121                local_ingestion_path: Some(temp_dir.path().to_owned()),
1122                ..Default::default()
1123            },
1124            ..Default::default()
1125        };
1126        let ingestion_config = IngestionConfig::default();
1127
1128        let mut indexer = Indexer::new(
1129            store,
1130            indexer_args,
1131            client_args,
1132            ingestion_config,
1133            None,
1134            &registry,
1135        )
1136        .await
1137        .unwrap();
1138
1139        indexer
1140            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1141            .await
1142            .unwrap();
1143
1144        indexer
1145            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1146            .await
1147            .unwrap();
1148
1149        indexer
1150            .sequential_pipeline::<C>(C, SequentialConfig::default())
1151            .await
1152            .unwrap();
1153
1154        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1155    }
1156
1157    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1158    #[tokio::test]
1159    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1160        let registry = Registry::new();
1161        let store = MockStore::default();
1162
1163        test_pipeline!(A, "concurrent_a");
1164        test_pipeline!(B, "concurrent_b");
1165        test_pipeline!(C, "sequential_c");
1166        test_pipeline!(D, "sequential_d");
1167
1168        let mut conn = store.connect().await.unwrap();
1169        conn.set_committer_watermark(
1170            A::NAME,
1171            CommitterWatermark {
1172                checkpoint_hi_inclusive: 5,
1173                ..Default::default()
1174            },
1175        )
1176        .await
1177        .unwrap();
1178        conn.set_committer_watermark(
1179            B::NAME,
1180            CommitterWatermark {
1181                checkpoint_hi_inclusive: 10,
1182                ..Default::default()
1183            },
1184        )
1185        .await
1186        .unwrap();
1187        conn.set_committer_watermark(
1188            C::NAME,
1189            CommitterWatermark {
1190                checkpoint_hi_inclusive: 15,
1191                ..Default::default()
1192            },
1193        )
1194        .await
1195        .unwrap();
1196        conn.set_committer_watermark(
1197            D::NAME,
1198            CommitterWatermark {
1199                checkpoint_hi_inclusive: 20,
1200                ..Default::default()
1201            },
1202        )
1203        .await
1204        .unwrap();
1205
1206        // Create synthetic ingestion data
1207        let temp_dir = tempfile::tempdir().unwrap();
1208        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1209            ingestion_dir: temp_dir.path().to_owned(),
1210            starting_checkpoint: 5,
1211            num_checkpoints: 25,
1212            checkpoint_size: 1,
1213        })
1214        .await;
1215
1216        let indexer_args = IndexerArgs {
1217            last_checkpoint: Some(29),
1218            ..Default::default()
1219        };
1220
1221        let client_args = ClientArgs {
1222            ingestion: IngestionClientArgs {
1223                local_ingestion_path: Some(temp_dir.path().to_owned()),
1224                ..Default::default()
1225            },
1226            ..Default::default()
1227        };
1228
1229        let ingestion_config = IngestionConfig::default();
1230
1231        let mut indexer = Indexer::new(
1232            store.clone(),
1233            indexer_args,
1234            client_args,
1235            ingestion_config,
1236            None,
1237            &registry,
1238        )
1239        .await
1240        .unwrap();
1241
1242        indexer
1243            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1244            .await
1245            .unwrap();
1246        indexer
1247            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1248            .await
1249            .unwrap();
1250        indexer
1251            .sequential_pipeline::<C>(C, SequentialConfig::default())
1252            .await
1253            .unwrap();
1254        indexer
1255            .sequential_pipeline::<D>(D, SequentialConfig::default())
1256            .await
1257            .unwrap();
1258
1259        let ingestion_metrics = indexer.ingestion_metrics().clone();
1260        let indexer_metrics = indexer.indexer_metrics().clone();
1261
1262        indexer.run().await.unwrap().join().await.unwrap();
1263
1264        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1265        assert_eq!(
1266            indexer_metrics
1267                .total_watermarks_out_of_order
1268                .get_metric_with_label_values(&[A::NAME])
1269                .unwrap()
1270                .get(),
1271            0
1272        );
1273        assert_eq!(
1274            indexer_metrics
1275                .total_watermarks_out_of_order
1276                .get_metric_with_label_values(&[B::NAME])
1277                .unwrap()
1278                .get(),
1279            5
1280        );
1281        assert_eq!(
1282            indexer_metrics
1283                .total_watermarks_out_of_order
1284                .get_metric_with_label_values(&[C::NAME])
1285                .unwrap()
1286                .get(),
1287            10
1288        );
1289        assert_eq!(
1290            indexer_metrics
1291                .total_watermarks_out_of_order
1292                .get_metric_with_label_values(&[D::NAME])
1293                .unwrap()
1294                .get(),
1295            15
1296        );
1297    }
1298
1299    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1300    #[tokio::test]
1301    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1302        let registry = Registry::new();
1303        let store = MockStore::default();
1304
1305        test_pipeline!(A, "concurrent_a");
1306        test_pipeline!(B, "concurrent_b");
1307        test_pipeline!(C, "sequential_c");
1308        test_pipeline!(D, "sequential_d");
1309
1310        let mut conn = store.connect().await.unwrap();
1311        conn.set_committer_watermark(
1312            A::NAME,
1313            CommitterWatermark {
1314                checkpoint_hi_inclusive: 5,
1315                ..Default::default()
1316            },
1317        )
1318        .await
1319        .unwrap();
1320        conn.set_committer_watermark(
1321            B::NAME,
1322            CommitterWatermark {
1323                checkpoint_hi_inclusive: 10,
1324                ..Default::default()
1325            },
1326        )
1327        .await
1328        .unwrap();
1329        conn.set_committer_watermark(
1330            C::NAME,
1331            CommitterWatermark {
1332                checkpoint_hi_inclusive: 15,
1333                ..Default::default()
1334            },
1335        )
1336        .await
1337        .unwrap();
1338        conn.set_committer_watermark(
1339            D::NAME,
1340            CommitterWatermark {
1341                checkpoint_hi_inclusive: 20,
1342                ..Default::default()
1343            },
1344        )
1345        .await
1346        .unwrap();
1347
1348        // Create synthetic ingestion data
1349        let temp_dir = tempfile::tempdir().unwrap();
1350        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1351            ingestion_dir: temp_dir.path().to_owned(),
1352            starting_checkpoint: 5,
1353            num_checkpoints: 25,
1354            checkpoint_size: 1,
1355        })
1356        .await;
1357
1358        let indexer_args = IndexerArgs {
1359            first_checkpoint: Some(3),
1360            last_checkpoint: Some(29),
1361            ..Default::default()
1362        };
1363
1364        let client_args = ClientArgs {
1365            ingestion: IngestionClientArgs {
1366                local_ingestion_path: Some(temp_dir.path().to_owned()),
1367                ..Default::default()
1368            },
1369            ..Default::default()
1370        };
1371
1372        let ingestion_config = IngestionConfig::default();
1373
1374        let mut indexer = Indexer::new(
1375            store.clone(),
1376            indexer_args,
1377            client_args,
1378            ingestion_config,
1379            None,
1380            &registry,
1381        )
1382        .await
1383        .unwrap();
1384
1385        indexer
1386            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1387            .await
1388            .unwrap();
1389        indexer
1390            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1391            .await
1392            .unwrap();
1393        indexer
1394            .sequential_pipeline::<C>(C, SequentialConfig::default())
1395            .await
1396            .unwrap();
1397        indexer
1398            .sequential_pipeline::<D>(D, SequentialConfig::default())
1399            .await
1400            .unwrap();
1401
1402        let ingestion_metrics = indexer.ingestion_metrics().clone();
1403        let metrics = indexer.indexer_metrics().clone();
1404        indexer.run().await.unwrap().join().await.unwrap();
1405
1406        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1407        assert_eq!(
1408            metrics
1409                .total_watermarks_out_of_order
1410                .get_metric_with_label_values(&[A::NAME])
1411                .unwrap()
1412                .get(),
1413            0
1414        );
1415        assert_eq!(
1416            metrics
1417                .total_watermarks_out_of_order
1418                .get_metric_with_label_values(&[B::NAME])
1419                .unwrap()
1420                .get(),
1421            5
1422        );
1423        assert_eq!(
1424            metrics
1425                .total_watermarks_out_of_order
1426                .get_metric_with_label_values(&[C::NAME])
1427                .unwrap()
1428                .get(),
1429            10
1430        );
1431        assert_eq!(
1432            metrics
1433                .total_watermarks_out_of_order
1434                .get_metric_with_label_values(&[D::NAME])
1435                .unwrap()
1436                .get(),
1437            15
1438        );
1439    }
1440
1441    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1442    #[tokio::test]
1443    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1444        let registry = Registry::new();
1445        let store = MockStore::default();
1446
1447        test_pipeline!(A, "concurrent_a");
1448        test_pipeline!(B, "concurrent_b");
1449        test_pipeline!(C, "sequential_c");
1450        test_pipeline!(D, "sequential_d");
1451
1452        let mut conn = store.connect().await.unwrap();
1453        conn.set_committer_watermark(
1454            B::NAME,
1455            CommitterWatermark {
1456                checkpoint_hi_inclusive: 10,
1457                ..Default::default()
1458            },
1459        )
1460        .await
1461        .unwrap();
1462        conn.set_committer_watermark(
1463            C::NAME,
1464            CommitterWatermark {
1465                checkpoint_hi_inclusive: 15,
1466                ..Default::default()
1467            },
1468        )
1469        .await
1470        .unwrap();
1471        conn.set_committer_watermark(
1472            D::NAME,
1473            CommitterWatermark {
1474                checkpoint_hi_inclusive: 20,
1475                ..Default::default()
1476            },
1477        )
1478        .await
1479        .unwrap();
1480
1481        // Create synthetic ingestion data
1482        let temp_dir = tempfile::tempdir().unwrap();
1483        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1484            ingestion_dir: temp_dir.path().to_owned(),
1485            starting_checkpoint: 0,
1486            num_checkpoints: 30,
1487            checkpoint_size: 1,
1488        })
1489        .await;
1490
1491        let indexer_args = IndexerArgs {
1492            last_checkpoint: Some(29),
1493            ..Default::default()
1494        };
1495
1496        let client_args = ClientArgs {
1497            ingestion: IngestionClientArgs {
1498                local_ingestion_path: Some(temp_dir.path().to_owned()),
1499                ..Default::default()
1500            },
1501            ..Default::default()
1502        };
1503
1504        let ingestion_config = IngestionConfig::default();
1505
1506        let mut indexer = Indexer::new(
1507            store.clone(),
1508            indexer_args,
1509            client_args,
1510            ingestion_config,
1511            None,
1512            &registry,
1513        )
1514        .await
1515        .unwrap();
1516
1517        indexer
1518            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1519            .await
1520            .unwrap();
1521        indexer
1522            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1523            .await
1524            .unwrap();
1525        indexer
1526            .sequential_pipeline::<C>(C, SequentialConfig::default())
1527            .await
1528            .unwrap();
1529        indexer
1530            .sequential_pipeline::<D>(D, SequentialConfig::default())
1531            .await
1532            .unwrap();
1533
1534        let ingestion_metrics = indexer.ingestion_metrics().clone();
1535        let metrics = indexer.indexer_metrics().clone();
1536        indexer.run().await.unwrap().join().await.unwrap();
1537
1538        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1539        assert_eq!(
1540            metrics
1541                .total_watermarks_out_of_order
1542                .get_metric_with_label_values(&[A::NAME])
1543                .unwrap()
1544                .get(),
1545            0
1546        );
1547        assert_eq!(
1548            metrics
1549                .total_watermarks_out_of_order
1550                .get_metric_with_label_values(&[B::NAME])
1551                .unwrap()
1552                .get(),
1553            11
1554        );
1555        assert_eq!(
1556            metrics
1557                .total_watermarks_out_of_order
1558                .get_metric_with_label_values(&[C::NAME])
1559                .unwrap()
1560                .get(),
1561            16
1562        );
1563        assert_eq!(
1564            metrics
1565                .total_watermarks_out_of_order
1566                .get_metric_with_label_values(&[D::NAME])
1567                .unwrap()
1568                .get(),
1569            21
1570        );
1571    }
1572
1573    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1574    #[tokio::test]
1575    async fn test_indexer_ingestion_use_first_checkpoint() {
1576        let registry = Registry::new();
1577        let store = MockStore::default();
1578
1579        test_pipeline!(A, "concurrent_a");
1580        test_pipeline!(B, "concurrent_b");
1581        test_pipeline!(C, "sequential_c");
1582        test_pipeline!(D, "sequential_d");
1583
1584        let mut conn = store.connect().await.unwrap();
1585        conn.set_committer_watermark(
1586            B::NAME,
1587            CommitterWatermark {
1588                checkpoint_hi_inclusive: 10,
1589                ..Default::default()
1590            },
1591        )
1592        .await
1593        .unwrap();
1594        conn.set_committer_watermark(
1595            C::NAME,
1596            CommitterWatermark {
1597                checkpoint_hi_inclusive: 15,
1598                ..Default::default()
1599            },
1600        )
1601        .await
1602        .unwrap();
1603        conn.set_committer_watermark(
1604            D::NAME,
1605            CommitterWatermark {
1606                checkpoint_hi_inclusive: 20,
1607                ..Default::default()
1608            },
1609        )
1610        .await
1611        .unwrap();
1612
1613        // Create synthetic ingestion data
1614        let temp_dir = tempfile::tempdir().unwrap();
1615        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1616            ingestion_dir: temp_dir.path().to_owned(),
1617            starting_checkpoint: 5,
1618            num_checkpoints: 25,
1619            checkpoint_size: 1,
1620        })
1621        .await;
1622
1623        let indexer_args = IndexerArgs {
1624            first_checkpoint: Some(10),
1625            last_checkpoint: Some(29),
1626            ..Default::default()
1627        };
1628
1629        let client_args = ClientArgs {
1630            ingestion: IngestionClientArgs {
1631                local_ingestion_path: Some(temp_dir.path().to_owned()),
1632                ..Default::default()
1633            },
1634            ..Default::default()
1635        };
1636
1637        let ingestion_config = IngestionConfig::default();
1638
1639        let mut indexer = Indexer::new(
1640            store.clone(),
1641            indexer_args,
1642            client_args,
1643            ingestion_config,
1644            None,
1645            &registry,
1646        )
1647        .await
1648        .unwrap();
1649
1650        indexer
1651            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1652            .await
1653            .unwrap();
1654        indexer
1655            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1656            .await
1657            .unwrap();
1658        indexer
1659            .sequential_pipeline::<C>(C, SequentialConfig::default())
1660            .await
1661            .unwrap();
1662        indexer
1663            .sequential_pipeline::<D>(D, SequentialConfig::default())
1664            .await
1665            .unwrap();
1666
1667        let ingestion_metrics = indexer.ingestion_metrics().clone();
1668        let metrics = indexer.indexer_metrics().clone();
1669        indexer.run().await.unwrap().join().await.unwrap();
1670
1671        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1672        assert_eq!(
1673            metrics
1674                .total_watermarks_out_of_order
1675                .get_metric_with_label_values(&[A::NAME])
1676                .unwrap()
1677                .get(),
1678            0
1679        );
1680        assert_eq!(
1681            metrics
1682                .total_watermarks_out_of_order
1683                .get_metric_with_label_values(&[B::NAME])
1684                .unwrap()
1685                .get(),
1686            1
1687        );
1688        assert_eq!(
1689            metrics
1690                .total_watermarks_out_of_order
1691                .get_metric_with_label_values(&[C::NAME])
1692                .unwrap()
1693                .get(),
1694            6
1695        );
1696        assert_eq!(
1697            metrics
1698                .total_watermarks_out_of_order
1699                .get_metric_with_label_values(&[D::NAME])
1700                .unwrap()
1701                .get(),
1702            11
1703        );
1704    }
1705
1706    #[tokio::test]
1707    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1708        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1709        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1710        assert_eq!(committer_watermark, None);
1711        assert_eq!(pruner_watermark, None);
1712    }
1713
1714    #[tokio::test]
1715    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1716        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1717        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1718        assert_eq!(committer_watermark, None);
1719        assert_eq!(pruner_watermark, None);
1720    }
1721
1722    #[tokio::test]
1723    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1724        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1725
1726        let committer_watermark = committer_watermark.unwrap();
1727        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1728
1729        let pruner_watermark = pruner_watermark.unwrap();
1730        assert_eq!(pruner_watermark.reader_lo, 1);
1731        assert_eq!(pruner_watermark.pruner_hi, 1);
1732    }
1733
1734    #[tokio::test]
1735    async fn test_init_watermark_sequential() {
1736        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1737
1738        let committer_watermark = committer_watermark.unwrap();
1739        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1740
1741        let pruner_watermark = pruner_watermark.unwrap();
1742        assert_eq!(pruner_watermark.reader_lo, 1);
1743        assert_eq!(pruner_watermark.pruner_hi, 1);
1744    }
1745
1746    #[tokio::test]
1747    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1748        let registry = Registry::new();
1749        let store = MockStore::default();
1750
1751        // Set up different watermarks for three different sequential pipelines
1752        let mut conn = store.connect().await.unwrap();
1753
1754        // First handler at checkpoint 10
1755        conn.set_committer_watermark(
1756            MockHandler::NAME,
1757            CommitterWatermark {
1758                epoch_hi_inclusive: 0,
1759                checkpoint_hi_inclusive: 10,
1760                tx_hi: 20,
1761                timestamp_ms_hi_inclusive: 10000,
1762            },
1763        )
1764        .await
1765        .unwrap();
1766
1767        // SequentialHandler at checkpoint 5
1768        conn.set_committer_watermark(
1769            SequentialHandler::NAME,
1770            CommitterWatermark {
1771                epoch_hi_inclusive: 0,
1772                checkpoint_hi_inclusive: 5,
1773                tx_hi: 10,
1774                timestamp_ms_hi_inclusive: 5000,
1775            },
1776        )
1777        .await
1778        .unwrap();
1779
1780        // Create synthetic ingestion data
1781        let temp_dir = tempfile::tempdir().unwrap();
1782        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1783            ingestion_dir: temp_dir.path().to_owned(),
1784            starting_checkpoint: 0,
1785            num_checkpoints: 20,
1786            checkpoint_size: 2,
1787        })
1788        .await;
1789
1790        let indexer_args = IndexerArgs {
1791            first_checkpoint: None,
1792            last_checkpoint: Some(19),
1793            pipeline: vec![],
1794            ..Default::default()
1795        };
1796
1797        let client_args = ClientArgs {
1798            ingestion: IngestionClientArgs {
1799                local_ingestion_path: Some(temp_dir.path().to_owned()),
1800                ..Default::default()
1801            },
1802            ..Default::default()
1803        };
1804
1805        let ingestion_config = IngestionConfig::default();
1806
1807        let mut indexer = Indexer::new(
1808            store.clone(),
1809            indexer_args,
1810            client_args,
1811            ingestion_config,
1812            None,
1813            &registry,
1814        )
1815        .await
1816        .unwrap();
1817
1818        // Add first sequential pipeline
1819        indexer
1820            .sequential_pipeline(
1821                MockHandler,
1822                pipeline::sequential::SequentialConfig::default(),
1823            )
1824            .await
1825            .unwrap();
1826
1827        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1828        assert_eq!(
1829            indexer.next_sequential_checkpoint(),
1830            Some(11),
1831            "next_sequential_checkpoint should be 11"
1832        );
1833
1834        // Add second sequential pipeline
1835        indexer
1836            .sequential_pipeline(
1837                SequentialHandler,
1838                pipeline::sequential::SequentialConfig::default(),
1839            )
1840            .await
1841            .unwrap();
1842
1843        // Should change to 6 (minimum of 6 and 11)
1844        assert_eq!(
1845            indexer.next_sequential_checkpoint(),
1846            Some(6),
1847            "next_sequential_checkpoint should still be 6"
1848        );
1849
1850        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1851        indexer.run().await.unwrap().join().await.unwrap();
1852
1853        // Verify each pipeline made some progress independently
1854        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1855        let watermark2 = conn
1856            .committer_watermark(SequentialHandler::NAME)
1857            .await
1858            .unwrap();
1859
1860        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1861        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1862    }
1863
1864    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1865    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1866    /// committing checkpoints less than the main pipeline's reader watermark.
1867    #[tokio::test]
1868    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1869        let registry = Registry::new();
1870        let store = MockStore::default();
1871
1872        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1873        // reader watermark at `7`.
1874        let mut conn = store.connect().await.unwrap();
1875        conn.set_committer_watermark(
1876            MockCheckpointSequenceNumberHandler::NAME,
1877            CommitterWatermark {
1878                checkpoint_hi_inclusive: 10,
1879                ..Default::default()
1880            },
1881        )
1882        .await
1883        .unwrap();
1884        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1885            .await
1886            .unwrap();
1887
1888        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1889        // be ignored by the tasked indexer.
1890        let indexer_args = IndexerArgs {
1891            first_checkpoint: Some(0),
1892            last_checkpoint: Some(15),
1893            task: TaskArgs::tasked("task".to_string(), 10),
1894            ..Default::default()
1895        };
1896        let temp_dir = tempfile::tempdir().unwrap();
1897        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1898            ingestion_dir: temp_dir.path().to_owned(),
1899            starting_checkpoint: 0,
1900            num_checkpoints: 16,
1901            checkpoint_size: 2,
1902        })
1903        .await;
1904
1905        let client_args = ClientArgs {
1906            ingestion: IngestionClientArgs {
1907                local_ingestion_path: Some(temp_dir.path().to_owned()),
1908                ..Default::default()
1909            },
1910            ..Default::default()
1911        };
1912
1913        let ingestion_config = IngestionConfig::default();
1914
1915        let mut tasked_indexer = Indexer::new(
1916            store.clone(),
1917            indexer_args,
1918            client_args,
1919            ingestion_config,
1920            None,
1921            &registry,
1922        )
1923        .await
1924        .unwrap();
1925
1926        let _ = tasked_indexer
1927            .concurrent_pipeline(
1928                MockCheckpointSequenceNumberHandler,
1929                ConcurrentConfig::default(),
1930            )
1931            .await;
1932
1933        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1934        let metrics = tasked_indexer.indexer_metrics().clone();
1935
1936        tasked_indexer.run().await.unwrap().join().await.unwrap();
1937
1938        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1939        assert_eq!(
1940            metrics
1941                .total_collector_skipped_checkpoints
1942                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1943                .unwrap()
1944                .get(),
1945            7
1946        );
1947        let data = store
1948            .data
1949            .get(MockCheckpointSequenceNumberHandler::NAME)
1950            .unwrap();
1951        assert_eq!(data.len(), 9);
1952        for i in 0..7 {
1953            assert!(data.get(&i).is_none());
1954        }
1955        for i in 7..16 {
1956            assert!(data.get(&i).is_some());
1957        }
1958    }
1959
1960    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1961    #[tokio::test]
1962    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1963        let registry = Registry::new();
1964        let store = MockStore::default();
1965
1966        let mut conn = store.connect().await.unwrap();
1967        conn.set_committer_watermark(
1968            "test",
1969            CommitterWatermark {
1970                checkpoint_hi_inclusive: 10,
1971                ..Default::default()
1972            },
1973        )
1974        .await
1975        .unwrap();
1976        conn.set_reader_watermark("test", 5).await.unwrap();
1977
1978        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1979        // watermarks.
1980        let indexer_args = IndexerArgs {
1981            first_checkpoint: Some(9),
1982            last_checkpoint: Some(25),
1983            task: TaskArgs::tasked("task".to_string(), 10),
1984            ..Default::default()
1985        };
1986        let temp_dir = tempfile::tempdir().unwrap();
1987        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1988            ingestion_dir: temp_dir.path().to_owned(),
1989            starting_checkpoint: 9,
1990            num_checkpoints: 17,
1991            checkpoint_size: 2,
1992        })
1993        .await;
1994
1995        let client_args = ClientArgs {
1996            ingestion: IngestionClientArgs {
1997                local_ingestion_path: Some(temp_dir.path().to_owned()),
1998                ..Default::default()
1999            },
2000            ..Default::default()
2001        };
2002
2003        let ingestion_config = IngestionConfig::default();
2004
2005        let mut tasked_indexer = Indexer::new(
2006            store.clone(),
2007            indexer_args,
2008            client_args,
2009            ingestion_config,
2010            None,
2011            &registry,
2012        )
2013        .await
2014        .unwrap();
2015
2016        let _ = tasked_indexer
2017            .concurrent_pipeline(
2018                MockCheckpointSequenceNumberHandler,
2019                ConcurrentConfig::default(),
2020            )
2021            .await;
2022
2023        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2024        let metrics = tasked_indexer.indexer_metrics().clone();
2025
2026        tasked_indexer.run().await.unwrap().join().await.unwrap();
2027
2028        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2029        assert_eq!(
2030            metrics
2031                .total_watermarks_out_of_order
2032                .get_metric_with_label_values(&["test"])
2033                .unwrap()
2034                .get(),
2035            0
2036        );
2037        assert_eq!(
2038            metrics
2039                .total_collector_skipped_checkpoints
2040                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2041                .unwrap()
2042                .get(),
2043            0
2044        );
2045
2046        let data = store.data.get("test").unwrap();
2047        assert!(data.len() == 17);
2048        for i in 0..9 {
2049            assert!(data.get(&i).is_none());
2050        }
2051        for i in 9..26 {
2052            assert!(data.get(&i).is_some());
2053        }
2054        let main_pipeline_watermark = store.watermark("test").unwrap();
2055        // assert that the main pipeline's watermarks are not updated
2056        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2057        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2058        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2059        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2060        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2061    }
2062
2063    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2064    /// committed, and any checkpoints inflight < X will be skipped.
2065    #[tokio::test]
2066    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2067        let registry = Registry::new();
2068        let store = MockStore::default();
2069        let mut conn = store.connect().await.unwrap();
2070        // Set the main pipeline watermark.
2071        conn.set_committer_watermark(
2072            ControllableHandler::NAME,
2073            CommitterWatermark {
2074                checkpoint_hi_inclusive: 11,
2075                ..Default::default()
2076            },
2077        )
2078        .await
2079        .unwrap();
2080
2081        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2082        let temp_dir = tempfile::tempdir().unwrap();
2083        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2084            ingestion_dir: temp_dir.path().to_owned(),
2085            starting_checkpoint: 0,
2086            num_checkpoints: 501,
2087            checkpoint_size: 2,
2088        })
2089        .await;
2090        let indexer_args = IndexerArgs {
2091            first_checkpoint: Some(0),
2092            last_checkpoint: Some(500),
2093            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2094            ..Default::default()
2095        };
2096        let client_args = ClientArgs {
2097            ingestion: IngestionClientArgs {
2098                local_ingestion_path: Some(temp_dir.path().to_owned()),
2099                ..Default::default()
2100            },
2101            ..Default::default()
2102        };
2103        let ingestion_config = IngestionConfig::default();
2104        let mut tasked_indexer = Indexer::new(
2105            store.clone(),
2106            indexer_args,
2107            client_args,
2108            ingestion_config,
2109            None,
2110            &registry,
2111        )
2112        .await
2113        .unwrap();
2114        let mut allow_process = 10;
2115        // Limit the pipeline to process only checkpoints `[0, 10]`.
2116        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2117        let _ = tasked_indexer
2118            .concurrent_pipeline(
2119                controllable_handler,
2120                ConcurrentConfig {
2121                    committer: CommitterConfig {
2122                        collect_interval_ms: 10,
2123                        watermark_interval_ms: 10,
2124                        ..Default::default()
2125                    },
2126                    // High fixed concurrency so all checkpoints can be processed
2127                    // concurrently despite out-of-order arrival.
2128                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2129                    ..Default::default()
2130                },
2131            )
2132            .await;
2133        let metrics = tasked_indexer.indexer_metrics().clone();
2134
2135        let mut s_indexer = tasked_indexer.run().await.unwrap();
2136
2137        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2138        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2139        // committed.
2140        store
2141            .wait_for_watermark(
2142                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2143                10,
2144                std::time::Duration::from_secs(10),
2145            )
2146            .await;
2147
2148        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2149        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2150        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2151        // one at a time until the collector_reader_lo metric shows the new value.
2152        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2153            .await
2154            .unwrap();
2155
2156        let reader_lo = metrics
2157            .collector_reader_lo
2158            .with_label_values(&[ControllableHandler::NAME]);
2159
2160        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2161        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2162        // checkpoints have been processed.
2163        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2164        while reader_lo.get() != 250 {
2165            interval.tick().await;
2166            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2167            allow_process += 1;
2168            assert!(
2169                allow_process <= 500,
2170                "Released all checkpoints but collector never observed new reader_lo"
2171            );
2172            process_below.send(allow_process).ok();
2173        }
2174
2175        // At this point, the collector has observed reader_lo = 250. Release all remaining
2176        // checkpoints. Guarantees:
2177        // - [0, 10]: committed (before reader_lo was set)
2178        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2179        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2180        // - [250, 500]: committed (>= reader_lo)
2181        process_below.send(500).ok();
2182
2183        s_indexer.join().await.unwrap();
2184
2185        let data = store.data.get(ControllableHandler::NAME).unwrap();
2186
2187        // Checkpoints (allow_process, 250) must be skipped.
2188        for chkpt in (allow_process + 1)..250 {
2189            assert!(
2190                data.get(&chkpt).is_none(),
2191                "Checkpoint {chkpt} should have been skipped"
2192            );
2193        }
2194
2195        // Checkpoints >= reader_lo must be committed.
2196        for chkpt in 250..=500 {
2197            assert!(
2198                data.get(&chkpt).is_some(),
2199                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2200            );
2201        }
2202
2203        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2204        for chkpt in 0..=10 {
2205            assert!(
2206                data.get(&chkpt).is_some(),
2207                "Checkpoint {chkpt} should have been committed (baseline)"
2208            );
2209        }
2210    }
2211}