sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::Connection;
19use sui_indexer_alt_framework_store_traits::InitWatermark;
20use sui_indexer_alt_framework_store_traits::Store;
21use sui_indexer_alt_framework_store_traits::TransactionalStore;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::Handler;
30use crate::pipeline::sequential::SequentialConfig;
31use crate::pipeline::sequential::{self};
32use crate::service::Service;
33
34pub use anyhow::Result;
35pub use sui_field_count::FieldCount;
36pub use sui_futures::service;
37/// External users access the store trait through framework::store
38pub use sui_indexer_alt_framework_store_traits as store;
39pub use sui_types as types;
40
41#[cfg(feature = "cluster")]
42pub mod cluster;
43pub mod config;
44pub mod ingestion;
45pub mod metrics;
46pub mod pipeline;
47#[cfg(feature = "postgres")]
48pub mod postgres;
49
50#[cfg(test)]
51pub mod mocks;
52
53/// Command-line arguments for the indexer
54#[derive(clap::Args, Default, Debug, Clone)]
55pub struct IndexerArgs {
56    /// Override the next checkpoint for all pipelines without a committer watermark to start
57    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
58    /// setting and always resume from their committer watermark + 1.
59    ///
60    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
61    /// is the minimum across all pipelines' next checkpoints.
62    #[arg(long)]
63    pub first_checkpoint: Option<u64>,
64
65    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
66    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
67    #[arg(long)]
68    pub last_checkpoint: Option<u64>,
69
70    /// Only run the following pipelines. If not provided, all pipelines found in the
71    /// configuration file will be run.
72    #[arg(long, action = clap::ArgAction::Append)]
73    pub pipeline: Vec<String>,
74
75    /// Additional configurations for running a tasked indexer.
76    #[clap(flatten)]
77    pub task: TaskArgs,
78}
79
80/// Command-line arguments for configuring a tasked indexer.
81#[derive(clap::Parser, Default, Debug, Clone)]
82pub struct TaskArgs {
83    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
84    /// delimiter defined on the store. This allows the same pipelines to run under multiple
85    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
86    /// entries in the database.
87    ///
88    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
89    ///
90    /// Sequential pipelines cannot be attached to a tasked indexer.
91    ///
92    /// The framework ensures that tasked pipelines never commit checkpoints below the main
93    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
94    #[arg(long, requires = "reader_interval_ms")]
95    task: Option<String>,
96
97    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
98    /// refetch its main pipeline's reader watermark.
99    ///
100    /// This is required when `--task` is set and should should ideally be set to a value that is
101    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
102    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
103    /// it.
104    ///
105    /// If the main pipeline does not have pruning enabled, this value can be set to some high
106    /// value, as the tasked pipeline will never see an updated reader watermark.
107    #[arg(long, requires = "task")]
108    reader_interval_ms: Option<u64>,
109}
110
111pub struct Indexer<S: Store> {
112    /// The storage backend that the indexer uses to write and query indexed data. This
113    /// generic implementation allows for plugging in different storage solutions that implement the
114    /// `Store` trait.
115    store: S,
116
117    /// Prometheus Metrics.
118    metrics: Arc<IndexerMetrics>,
119
120    /// Service for downloading and disseminating checkpoint data.
121    ingestion_service: IngestionService,
122
123    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
124    /// watermark will start processing at this checkpoint.
125    first_checkpoint: Option<u64>,
126
127    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
128    /// this checkpoint.
129    last_checkpoint: Option<u64>,
130
131    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
132    /// to start ingesting from.
133    next_checkpoint: u64,
134
135    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
136    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
137    next_sequential_checkpoint: Option<u64>,
138
139    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
140    /// delimiter defined on the store. This allows the same pipelines to run under multiple
141    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
142    /// entries in the database.
143    ///
144    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
145    ///
146    /// Sequential pipelines cannot be attached to a tasked indexer.
147    ///
148    /// The framework ensures that tasked pipelines never commit checkpoints below the main
149    /// pipeline’s pruner watermark.
150    task: Option<Task>,
151
152    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
153    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
154    /// a warning when the indexer is run.
155    enabled_pipelines: Option<BTreeSet<String>>,
156
157    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
158    /// with the same name isn't added twice.
159    added_pipelines: BTreeSet<&'static str>,
160
161    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
162    pipelines: Vec<Service>,
163}
164
165/// Configuration for a tasked indexer.
166#[derive(Clone)]
167pub(crate) struct Task {
168    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
169    /// record pipeline watermarks.
170    task: String,
171    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
172    /// pipeline's reader watermark.
173    reader_interval: Duration,
174}
175
176impl TaskArgs {
177    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
178        Self {
179            task: Some(task),
180            reader_interval_ms: Some(reader_interval_ms),
181        }
182    }
183
184    fn into_task(self) -> Option<Task> {
185        Some(Task {
186            task: self.task?,
187            reader_interval: Duration::from_millis(self.reader_interval_ms?),
188        })
189    }
190}
191
192impl<S: Store> Indexer<S> {
193    /// Create a new instance of the indexer framework from a store that implements the `Store`
194    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
195    /// arguments configure the following:
196    ///
197    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
198    ///   watermarks) and where to serve metrics from,
199    /// - Where to download checkpoints from,
200    /// - Concurrency and buffering parameters for downloading checkpoints.
201    ///
202    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
203    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
204    pub async fn new(
205        store: S,
206        indexer_args: IndexerArgs,
207        client_args: ClientArgs,
208        ingestion_config: IngestionConfig,
209        metrics_prefix: Option<&str>,
210        registry: &Registry,
211    ) -> Result<Self> {
212        let IndexerArgs {
213            first_checkpoint,
214            last_checkpoint,
215            pipeline,
216            task,
217        } = indexer_args;
218
219        let metrics = IndexerMetrics::new(metrics_prefix, registry);
220
221        let ingestion_service =
222            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
223
224        Ok(Self {
225            store,
226            metrics,
227            ingestion_service,
228            first_checkpoint,
229            last_checkpoint,
230            next_checkpoint: u64::MAX,
231            next_sequential_checkpoint: None,
232            task: task.into_task(),
233            enabled_pipelines: if pipeline.is_empty() {
234                None
235            } else {
236                Some(pipeline.into_iter().collect())
237            },
238            added_pipelines: BTreeSet::new(),
239            pipelines: vec![],
240        })
241    }
242
243    /// The store used by the indexer.
244    pub fn store(&self) -> &S {
245        &self.store
246    }
247
248    /// The ingestion client used by the indexer to fetch checkpoints.
249    pub fn ingestion_client(&self) -> &IngestionClient {
250        self.ingestion_service.ingestion_client()
251    }
252
253    /// The indexer's metrics.
254    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
255        &self.metrics
256    }
257
258    /// The ingestion service's metrics.
259    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
260        self.ingestion_service.metrics()
261    }
262
263    /// The pipelines that this indexer will run.
264    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
265        self.added_pipelines.iter().copied().filter(|p| {
266            self.enabled_pipelines
267                .as_ref()
268                .is_none_or(|e| e.contains(*p))
269        })
270    }
271
272    /// The minimum next checkpoint across all sequential pipelines. This value is used to
273    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
274    /// too far ahead of sequential pipelines.
275    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
276        self.next_sequential_checkpoint
277    }
278
279    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
280    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
281    ///
282    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
283    /// keep the watermark table up-to-date with the highest point they can guarantee all data
284    /// exists for, for their pipeline.
285    pub async fn concurrent_pipeline<H>(
286        &mut self,
287        handler: H,
288        config: ConcurrentConfig,
289    ) -> Result<()>
290    where
291        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
292    {
293        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
294            return Ok(());
295        };
296
297        self.pipelines.push(concurrent::pipeline::<H>(
298            handler,
299            next_checkpoint,
300            config,
301            self.store.clone(),
302            self.task.clone(),
303            self.ingestion_service.subscribe().0,
304            self.metrics.clone(),
305        ));
306
307        Ok(())
308    }
309
310    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
311    /// will start processing and committing once the ingestion service has caught up to their
312    /// respective watermarks.
313    ///
314    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
315    pub async fn run(self) -> anyhow::Result<Service> {
316        if let Some(enabled_pipelines) = self.enabled_pipelines {
317            ensure!(
318                enabled_pipelines.is_empty(),
319                "Tried to enable pipelines that this indexer does not know about: \
320                {enabled_pipelines:#?}",
321            );
322        }
323
324        let start = self.next_checkpoint;
325        let end = self.last_checkpoint;
326        info!(start, end, "Ingestion range");
327
328        let mut service = self
329            .ingestion_service
330            .run(
331                (
332                    Bound::Included(start),
333                    end.map_or(Bound::Unbounded, Bound::Included),
334                ),
335                self.next_sequential_checkpoint,
336            )
337            .await
338            .context("Failed to start ingestion service")?;
339
340        for pipeline in self.pipelines {
341            service = service.merge(pipeline);
342        }
343
344        Ok(service)
345    }
346
347    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
348    /// checkpoint after its watermark, or if that doesn't exist, then the provided
349    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
350    ///
351    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
352    /// calculated above.
353    ///
354    /// Returns `Ok(None)` if the pipeline is disabled.
355    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
356        ensure!(
357            self.added_pipelines.insert(P::NAME),
358            "Pipeline {:?} already added",
359            P::NAME,
360        );
361
362        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
363            && !enabled_pipelines.remove(P::NAME)
364        {
365            info!(pipeline = P::NAME, "Skipping");
366            return Ok(None);
367        }
368
369        let mut conn = self
370            .store
371            .connect()
372            .await
373            .context("Failed to establish connection to store")?;
374
375        let pipeline_task =
376            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
377
378        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
379        // Otherwise, use the existing record and disregard the proposed value.
380        let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
381        let InitWatermark {
382            checkpoint_hi_inclusive,
383            reader_lo,
384        } = conn
385            .init_watermark(
386                &pipeline_task,
387                InitWatermark {
388                    checkpoint_hi_inclusive: proposed_next_checkpoint.checked_sub(1),
389                    reader_lo: proposed_next_checkpoint,
390                },
391            )
392            .await
393            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
394
395        let next_checkpoint = checkpoint_hi_inclusive.map_or(reader_lo, |c| c + 1);
396
397        self.next_checkpoint = self.next_checkpoint.min(next_checkpoint);
398
399        Ok(Some(next_checkpoint))
400    }
401}
402
403impl<T: TransactionalStore> Indexer<T> {
404    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
405    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
406    ///
407    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
408    /// required to handle pipelines that modify data in-place (where each update is not an insert,
409    /// but could be a modification of an existing row, where ordering between updates is
410    /// important).
411    ///
412    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
413    /// number of checkpoints (configured by `checkpoint_lag`).
414    pub async fn sequential_pipeline<H>(
415        &mut self,
416        handler: H,
417        config: SequentialConfig,
418    ) -> Result<()>
419    where
420        H: Handler<Store = T> + Send + Sync + 'static,
421    {
422        if self.task.is_some() {
423            bail!(
424                "Sequential pipelines do not support pipeline tasks. \
425                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
426                Running the same pipeline under a different task would violate these guarantees."
427            );
428        }
429
430        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
431            return Ok(());
432        };
433
434        // Track the minimum checkpoint_hi across all sequential pipelines
435        self.next_sequential_checkpoint = Some(
436            self.next_sequential_checkpoint
437                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
438        );
439
440        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
441
442        self.pipelines.push(sequential::pipeline::<H>(
443            handler,
444            next_checkpoint,
445            config,
446            self.store.clone(),
447            checkpoint_rx,
448            commit_hi_tx,
449            self.metrics.clone(),
450        ));
451
452        Ok(())
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::sync::Arc;
459
460    use async_trait::async_trait;
461    use clap::Parser;
462    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
463    use sui_synthetic_ingestion::synthetic_ingestion;
464    use tokio::sync::watch;
465
466    use crate::FieldCount;
467    use crate::config::ConcurrencyConfig;
468    use crate::ingestion::ingestion_client::IngestionClientArgs;
469    use crate::mocks::store::MockStore;
470    use crate::pipeline::CommitterConfig;
471    use crate::pipeline::Processor;
472    use crate::pipeline::concurrent::ConcurrentConfig;
473    use crate::store::CommitterWatermark;
474
475    use super::*;
476
477    #[allow(dead_code)]
478    #[derive(Clone, FieldCount)]
479    struct MockValue(u64);
480
481    /// A handler that can be controlled externally to block checkpoint processing.
482    struct ControllableHandler {
483        /// Process checkpoints less than or equal to this value.
484        process_below: watch::Receiver<u64>,
485    }
486
487    impl ControllableHandler {
488        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
489            let (tx, rx) = watch::channel(limit);
490            (Self { process_below: rx }, tx)
491        }
492    }
493
494    #[async_trait]
495    impl Processor for ControllableHandler {
496        const NAME: &'static str = "controllable";
497        type Value = MockValue;
498
499        async fn process(
500            &self,
501            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
502        ) -> anyhow::Result<Vec<Self::Value>> {
503            let cp_num = checkpoint.summary.sequence_number;
504
505            // Wait until the checkpoint is allowed to be processed
506            self.process_below
507                .clone()
508                .wait_for(|&limit| cp_num <= limit)
509                .await
510                .ok();
511
512            Ok(vec![MockValue(cp_num)])
513        }
514    }
515
516    #[async_trait]
517    impl concurrent::Handler for ControllableHandler {
518        type Store = MockStore;
519        type Batch = Vec<MockValue>;
520
521        fn batch(
522            &self,
523            batch: &mut Self::Batch,
524            values: &mut std::vec::IntoIter<Self::Value>,
525        ) -> concurrent::BatchStatus {
526            batch.extend(values);
527            concurrent::BatchStatus::Ready
528        }
529
530        async fn commit<'a>(
531            &self,
532            batch: &Self::Batch,
533            conn: &mut <Self::Store as Store>::Connection<'a>,
534        ) -> anyhow::Result<usize> {
535            for value in batch {
536                conn.0
537                    .commit_data(Self::NAME, value.0, vec![value.0])
538                    .await?;
539            }
540            Ok(batch.len())
541        }
542    }
543
544    macro_rules! test_pipeline {
545        ($handler:ident, $name:literal) => {
546            struct $handler;
547
548            #[async_trait]
549            impl Processor for $handler {
550                const NAME: &'static str = $name;
551                type Value = MockValue;
552                async fn process(
553                    &self,
554                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
555                ) -> anyhow::Result<Vec<Self::Value>> {
556                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
557                }
558            }
559
560            #[async_trait]
561            impl crate::pipeline::concurrent::Handler for $handler {
562                type Store = MockStore;
563                type Batch = Vec<Self::Value>;
564
565                fn batch(
566                    &self,
567                    batch: &mut Self::Batch,
568                    values: &mut std::vec::IntoIter<Self::Value>,
569                ) -> crate::pipeline::concurrent::BatchStatus {
570                    batch.extend(values);
571                    crate::pipeline::concurrent::BatchStatus::Pending
572                }
573
574                async fn commit<'a>(
575                    &self,
576                    batch: &Self::Batch,
577                    conn: &mut <Self::Store as Store>::Connection<'a>,
578                ) -> anyhow::Result<usize> {
579                    for value in batch {
580                        conn.0
581                            .commit_data(Self::NAME, value.0, vec![value.0])
582                            .await?;
583                    }
584                    Ok(batch.len())
585                }
586            }
587
588            #[async_trait]
589            impl crate::pipeline::sequential::Handler for $handler {
590                type Store = MockStore;
591                type Batch = Vec<Self::Value>;
592
593                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
594                    batch.extend(values);
595                }
596
597                async fn commit<'a>(
598                    &self,
599                    _batch: &Self::Batch,
600                    _conn: &mut <Self::Store as Store>::Connection<'a>,
601                ) -> anyhow::Result<usize> {
602                    Ok(1)
603                }
604            }
605        };
606    }
607
608    test_pipeline!(MockHandler, "test_processor");
609    test_pipeline!(SequentialHandler, "sequential_handler");
610    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
611
612    async fn test_init_watermark(
613        first_checkpoint: Option<u64>,
614        is_concurrent: bool,
615    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
616        let registry = Registry::new();
617        let store = MockStore::default();
618
619        test_pipeline!(A, "pipeline_name");
620
621        let mut conn = store.connect().await.unwrap();
622
623        let indexer_args = IndexerArgs {
624            first_checkpoint,
625            ..IndexerArgs::default()
626        };
627        let temp_dir = tempfile::tempdir().unwrap();
628        let client_args = ClientArgs {
629            ingestion: IngestionClientArgs {
630                local_ingestion_path: Some(temp_dir.path().to_owned()),
631                ..Default::default()
632            },
633            ..Default::default()
634        };
635        let ingestion_config = IngestionConfig::default();
636
637        let mut indexer = Indexer::new(
638            store.clone(),
639            indexer_args,
640            client_args,
641            ingestion_config,
642            None,
643            &registry,
644        )
645        .await
646        .unwrap();
647
648        if is_concurrent {
649            indexer
650                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
651                .await
652                .unwrap();
653        } else {
654            indexer
655                .sequential_pipeline::<A>(A, SequentialConfig::default())
656                .await
657                .unwrap();
658        }
659
660        (
661            conn.committer_watermark(A::NAME).await.unwrap(),
662            conn.pruner_watermark(A::NAME, Duration::ZERO)
663                .await
664                .unwrap(),
665        )
666    }
667
668    #[test]
669    fn test_arg_parsing() {
670        #[derive(Parser)]
671        struct Args {
672            #[clap(flatten)]
673            indexer: IndexerArgs,
674        }
675
676        let args = Args::try_parse_from([
677            "cmd",
678            "--first-checkpoint",
679            "10",
680            "--last-checkpoint",
681            "100",
682            "--pipeline",
683            "a",
684            "--pipeline",
685            "b",
686            "--task",
687            "t",
688            "--reader-interval-ms",
689            "5000",
690        ])
691        .unwrap();
692
693        assert_eq!(args.indexer.first_checkpoint, Some(10));
694        assert_eq!(args.indexer.last_checkpoint, Some(100));
695        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
696        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
697        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
698    }
699
700    /// next_checkpoint is smallest among existing watermarks + 1.
701    #[tokio::test]
702    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
703        let registry = Registry::new();
704        let store = MockStore::default();
705
706        test_pipeline!(A, "concurrent_a");
707        test_pipeline!(B, "concurrent_b");
708        test_pipeline!(C, "sequential_c");
709        test_pipeline!(D, "sequential_d");
710
711        let mut conn = store.connect().await.unwrap();
712
713        conn.init_watermark(A::NAME, InitWatermark::default())
714            .await
715            .unwrap();
716        conn.set_committer_watermark(
717            A::NAME,
718            CommitterWatermark {
719                checkpoint_hi_inclusive: 100,
720                ..Default::default()
721            },
722        )
723        .await
724        .unwrap();
725
726        conn.init_watermark(B::NAME, InitWatermark::default())
727            .await
728            .unwrap();
729        conn.set_committer_watermark(
730            B::NAME,
731            CommitterWatermark {
732                checkpoint_hi_inclusive: 10,
733                ..Default::default()
734            },
735        )
736        .await
737        .unwrap();
738
739        conn.init_watermark(C::NAME, InitWatermark::default())
740            .await
741            .unwrap();
742        conn.set_committer_watermark(
743            C::NAME,
744            CommitterWatermark {
745                checkpoint_hi_inclusive: 1,
746                ..Default::default()
747            },
748        )
749        .await
750        .unwrap();
751
752        conn.init_watermark(D::NAME, InitWatermark::default())
753            .await
754            .unwrap();
755        conn.set_committer_watermark(
756            D::NAME,
757            CommitterWatermark {
758                checkpoint_hi_inclusive: 50,
759                ..Default::default()
760            },
761        )
762        .await
763        .unwrap();
764
765        let indexer_args = IndexerArgs::default();
766        let temp_dir = tempfile::tempdir().unwrap();
767        let client_args = ClientArgs {
768            ingestion: IngestionClientArgs {
769                local_ingestion_path: Some(temp_dir.path().to_owned()),
770                ..Default::default()
771            },
772            ..Default::default()
773        };
774        let ingestion_config = IngestionConfig::default();
775
776        let mut indexer = Indexer::new(
777            store,
778            indexer_args,
779            client_args,
780            ingestion_config,
781            None,
782            &registry,
783        )
784        .await
785        .unwrap();
786
787        indexer
788            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
789            .await
790            .unwrap();
791        indexer
792            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
793            .await
794            .unwrap();
795        indexer
796            .sequential_pipeline::<C>(C, SequentialConfig::default())
797            .await
798            .unwrap();
799        indexer
800            .sequential_pipeline::<D>(D, SequentialConfig::default())
801            .await
802            .unwrap();
803
804        assert_eq!(indexer.first_checkpoint, None);
805        assert_eq!(indexer.last_checkpoint, None);
806        assert_eq!(indexer.next_checkpoint, 2);
807        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
808    }
809
810    /// next_checkpoint is 0 when at least one pipeline has no watermark.
811    #[tokio::test]
812    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
813        let registry = Registry::new();
814        let store = MockStore::default();
815
816        test_pipeline!(A, "concurrent_a");
817        test_pipeline!(B, "concurrent_b");
818        test_pipeline!(C, "sequential_c");
819        test_pipeline!(D, "sequential_d");
820
821        let mut conn = store.connect().await.unwrap();
822        conn.set_committer_watermark(
823            B::NAME,
824            CommitterWatermark {
825                checkpoint_hi_inclusive: 10,
826                ..Default::default()
827            },
828        )
829        .await
830        .unwrap();
831        conn.set_committer_watermark(
832            C::NAME,
833            CommitterWatermark {
834                checkpoint_hi_inclusive: 1,
835                ..Default::default()
836            },
837        )
838        .await
839        .unwrap();
840
841        let indexer_args = IndexerArgs::default();
842        let temp_dir = tempfile::tempdir().unwrap();
843        let client_args = ClientArgs {
844            ingestion: IngestionClientArgs {
845                local_ingestion_path: Some(temp_dir.path().to_owned()),
846                ..Default::default()
847            },
848            ..Default::default()
849        };
850        let ingestion_config = IngestionConfig::default();
851
852        let mut indexer = Indexer::new(
853            store,
854            indexer_args,
855            client_args,
856            ingestion_config,
857            None,
858            &registry,
859        )
860        .await
861        .unwrap();
862
863        indexer
864            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
865            .await
866            .unwrap();
867        indexer
868            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
869            .await
870            .unwrap();
871        indexer
872            .sequential_pipeline::<C>(C, SequentialConfig::default())
873            .await
874            .unwrap();
875        indexer
876            .sequential_pipeline::<D>(D, SequentialConfig::default())
877            .await
878            .unwrap();
879
880        assert_eq!(indexer.first_checkpoint, None);
881        assert_eq!(indexer.last_checkpoint, None);
882        assert_eq!(indexer.next_checkpoint, 0);
883        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
884    }
885
886    /// next_checkpoint is 1 when smallest committer watermark is 0.
887    #[tokio::test]
888    async fn test_next_checkpoint_smallest_is_0() {
889        let registry = Registry::new();
890        let store = MockStore::default();
891
892        test_pipeline!(A, "concurrent_a");
893        test_pipeline!(B, "concurrent_b");
894        test_pipeline!(C, "sequential_c");
895        test_pipeline!(D, "sequential_d");
896
897        let mut conn = store.connect().await.unwrap();
898        conn.set_committer_watermark(
899            A::NAME,
900            CommitterWatermark {
901                checkpoint_hi_inclusive: 100,
902                ..Default::default()
903            },
904        )
905        .await
906        .unwrap();
907        conn.set_committer_watermark(
908            B::NAME,
909            CommitterWatermark {
910                checkpoint_hi_inclusive: 10,
911                ..Default::default()
912            },
913        )
914        .await
915        .unwrap();
916        conn.set_committer_watermark(
917            C::NAME,
918            CommitterWatermark {
919                checkpoint_hi_inclusive: 1,
920                ..Default::default()
921            },
922        )
923        .await
924        .unwrap();
925        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
926            .await
927            .unwrap();
928
929        let indexer_args = IndexerArgs::default();
930        let temp_dir = tempfile::tempdir().unwrap();
931        let client_args = ClientArgs {
932            ingestion: IngestionClientArgs {
933                local_ingestion_path: Some(temp_dir.path().to_owned()),
934                ..Default::default()
935            },
936            ..Default::default()
937        };
938        let ingestion_config = IngestionConfig::default();
939
940        let mut indexer = Indexer::new(
941            store,
942            indexer_args,
943            client_args,
944            ingestion_config,
945            None,
946            &registry,
947        )
948        .await
949        .unwrap();
950
951        indexer
952            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
953            .await
954            .unwrap();
955        indexer
956            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
957            .await
958            .unwrap();
959        indexer
960            .sequential_pipeline::<C>(C, SequentialConfig::default())
961            .await
962            .unwrap();
963        indexer
964            .sequential_pipeline::<D>(D, SequentialConfig::default())
965            .await
966            .unwrap();
967
968        assert_eq!(indexer.next_checkpoint, 1);
969    }
970
971    /// next_checkpoint is first_checkpoint when at least one pipeline has no
972    /// watermark, and first_checkpoint is smallest.
973    #[tokio::test]
974    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
975        let registry = Registry::new();
976        let store = MockStore::default();
977
978        test_pipeline!(A, "concurrent_a");
979        test_pipeline!(B, "concurrent_b");
980        test_pipeline!(C, "sequential_c");
981        test_pipeline!(D, "sequential_d");
982
983        let mut conn = store.connect().await.unwrap();
984        conn.set_committer_watermark(
985            B::NAME,
986            CommitterWatermark {
987                checkpoint_hi_inclusive: 50,
988                ..Default::default()
989            },
990        )
991        .await
992        .unwrap();
993        conn.set_committer_watermark(
994            C::NAME,
995            CommitterWatermark {
996                checkpoint_hi_inclusive: 10,
997                ..Default::default()
998            },
999        )
1000        .await
1001        .unwrap();
1002
1003        let indexer_args = IndexerArgs {
1004            first_checkpoint: Some(5),
1005            ..Default::default()
1006        };
1007        let temp_dir = tempfile::tempdir().unwrap();
1008        let client_args = ClientArgs {
1009            ingestion: IngestionClientArgs {
1010                local_ingestion_path: Some(temp_dir.path().to_owned()),
1011                ..Default::default()
1012            },
1013            ..Default::default()
1014        };
1015        let ingestion_config = IngestionConfig::default();
1016
1017        let mut indexer = Indexer::new(
1018            store,
1019            indexer_args,
1020            client_args,
1021            ingestion_config,
1022            None,
1023            &registry,
1024        )
1025        .await
1026        .unwrap();
1027
1028        indexer
1029            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1030            .await
1031            .unwrap();
1032        indexer
1033            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1034            .await
1035            .unwrap();
1036        indexer
1037            .sequential_pipeline::<C>(C, SequentialConfig::default())
1038            .await
1039            .unwrap();
1040        indexer
1041            .sequential_pipeline::<D>(D, SequentialConfig::default())
1042            .await
1043            .unwrap();
1044
1045        assert_eq!(indexer.first_checkpoint, Some(5));
1046        assert_eq!(indexer.last_checkpoint, None);
1047        assert_eq!(indexer.next_checkpoint, 5);
1048        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1049    }
1050
1051    /// next_checkpoint is smallest among existing watermarks + 1 if
1052    /// all pipelines have watermarks (ignores first_checkpoint).
1053    #[tokio::test]
1054    async fn test_next_checkpoint_ignore_first_checkpoint() {
1055        let registry = Registry::new();
1056        let store = MockStore::default();
1057
1058        test_pipeline!(B, "concurrent_b");
1059        test_pipeline!(C, "sequential_c");
1060
1061        let mut conn = store.connect().await.unwrap();
1062        conn.set_committer_watermark(
1063            B::NAME,
1064            CommitterWatermark {
1065                checkpoint_hi_inclusive: 50,
1066                ..Default::default()
1067            },
1068        )
1069        .await
1070        .unwrap();
1071        conn.set_committer_watermark(
1072            C::NAME,
1073            CommitterWatermark {
1074                checkpoint_hi_inclusive: 10,
1075                ..Default::default()
1076            },
1077        )
1078        .await
1079        .unwrap();
1080
1081        let indexer_args = IndexerArgs {
1082            first_checkpoint: Some(5),
1083            ..Default::default()
1084        };
1085        let temp_dir = tempfile::tempdir().unwrap();
1086        let client_args = ClientArgs {
1087            ingestion: IngestionClientArgs {
1088                local_ingestion_path: Some(temp_dir.path().to_owned()),
1089                ..Default::default()
1090            },
1091            ..Default::default()
1092        };
1093        let ingestion_config = IngestionConfig::default();
1094
1095        let mut indexer = Indexer::new(
1096            store,
1097            indexer_args,
1098            client_args,
1099            ingestion_config,
1100            None,
1101            &registry,
1102        )
1103        .await
1104        .unwrap();
1105
1106        indexer
1107            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1108            .await
1109            .unwrap();
1110        indexer
1111            .sequential_pipeline::<C>(C, SequentialConfig::default())
1112            .await
1113            .unwrap();
1114
1115        assert_eq!(indexer.first_checkpoint, Some(5));
1116        assert_eq!(indexer.last_checkpoint, None);
1117        assert_eq!(indexer.next_checkpoint, 11);
1118        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1119    }
1120
1121    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1122    /// will not be used as the starting point if it is not the smallest valid committer watermark
1123    /// to resume ingesting from.
1124    #[tokio::test]
1125    async fn test_next_checkpoint_large_first_checkpoint() {
1126        let registry = Registry::new();
1127        let store = MockStore::default();
1128
1129        test_pipeline!(A, "concurrent_a");
1130        test_pipeline!(B, "concurrent_b");
1131        test_pipeline!(C, "sequential_c");
1132
1133        let mut conn = store.connect().await.unwrap();
1134        conn.set_committer_watermark(
1135            B::NAME,
1136            CommitterWatermark {
1137                checkpoint_hi_inclusive: 50,
1138                ..Default::default()
1139            },
1140        )
1141        .await
1142        .unwrap();
1143        conn.set_committer_watermark(
1144            C::NAME,
1145            CommitterWatermark {
1146                checkpoint_hi_inclusive: 10,
1147                ..Default::default()
1148            },
1149        )
1150        .await
1151        .unwrap();
1152
1153        let indexer_args = IndexerArgs {
1154            first_checkpoint: Some(24),
1155            ..Default::default()
1156        };
1157        let temp_dir = tempfile::tempdir().unwrap();
1158        let client_args = ClientArgs {
1159            ingestion: IngestionClientArgs {
1160                local_ingestion_path: Some(temp_dir.path().to_owned()),
1161                ..Default::default()
1162            },
1163            ..Default::default()
1164        };
1165        let ingestion_config = IngestionConfig::default();
1166
1167        let mut indexer = Indexer::new(
1168            store,
1169            indexer_args,
1170            client_args,
1171            ingestion_config,
1172            None,
1173            &registry,
1174        )
1175        .await
1176        .unwrap();
1177
1178        indexer
1179            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1180            .await
1181            .unwrap();
1182
1183        indexer
1184            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1185            .await
1186            .unwrap();
1187
1188        indexer
1189            .sequential_pipeline::<C>(C, SequentialConfig::default())
1190            .await
1191            .unwrap();
1192
1193        assert_eq!(indexer.first_checkpoint, Some(24));
1194        assert_eq!(indexer.last_checkpoint, None);
1195        assert_eq!(indexer.next_checkpoint, 11);
1196        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1197    }
1198
1199    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1200    #[tokio::test]
1201    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1202        let registry = Registry::new();
1203        let store = MockStore::default();
1204
1205        test_pipeline!(A, "concurrent_a");
1206        test_pipeline!(B, "concurrent_b");
1207        test_pipeline!(C, "sequential_c");
1208        test_pipeline!(D, "sequential_d");
1209
1210        let mut conn = store.connect().await.unwrap();
1211        conn.set_committer_watermark(
1212            A::NAME,
1213            CommitterWatermark {
1214                checkpoint_hi_inclusive: 5,
1215                ..Default::default()
1216            },
1217        )
1218        .await
1219        .unwrap();
1220        conn.set_committer_watermark(
1221            B::NAME,
1222            CommitterWatermark {
1223                checkpoint_hi_inclusive: 10,
1224                ..Default::default()
1225            },
1226        )
1227        .await
1228        .unwrap();
1229        conn.set_committer_watermark(
1230            C::NAME,
1231            CommitterWatermark {
1232                checkpoint_hi_inclusive: 15,
1233                ..Default::default()
1234            },
1235        )
1236        .await
1237        .unwrap();
1238        conn.set_committer_watermark(
1239            D::NAME,
1240            CommitterWatermark {
1241                checkpoint_hi_inclusive: 20,
1242                ..Default::default()
1243            },
1244        )
1245        .await
1246        .unwrap();
1247
1248        // Create synthetic ingestion data
1249        let temp_dir = tempfile::tempdir().unwrap();
1250        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1251            ingestion_dir: temp_dir.path().to_owned(),
1252            starting_checkpoint: 0,
1253            num_checkpoints: 30,
1254            checkpoint_size: 1,
1255        })
1256        .await;
1257
1258        let indexer_args = IndexerArgs {
1259            last_checkpoint: Some(29),
1260            ..Default::default()
1261        };
1262
1263        let client_args = ClientArgs {
1264            ingestion: IngestionClientArgs {
1265                local_ingestion_path: Some(temp_dir.path().to_owned()),
1266                ..Default::default()
1267            },
1268            ..Default::default()
1269        };
1270
1271        let ingestion_config = IngestionConfig::default();
1272
1273        let mut indexer = Indexer::new(
1274            store.clone(),
1275            indexer_args,
1276            client_args,
1277            ingestion_config,
1278            None,
1279            &registry,
1280        )
1281        .await
1282        .unwrap();
1283
1284        indexer
1285            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1286            .await
1287            .unwrap();
1288        indexer
1289            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1290            .await
1291            .unwrap();
1292        indexer
1293            .sequential_pipeline::<C>(C, SequentialConfig::default())
1294            .await
1295            .unwrap();
1296        indexer
1297            .sequential_pipeline::<D>(D, SequentialConfig::default())
1298            .await
1299            .unwrap();
1300
1301        let ingestion_metrics = indexer.ingestion_metrics().clone();
1302        let indexer_metrics = indexer.indexer_metrics().clone();
1303
1304        indexer.run().await.unwrap().join().await.unwrap();
1305
1306        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1307        assert_eq!(
1308            indexer_metrics
1309                .total_watermarks_out_of_order
1310                .get_metric_with_label_values(&[A::NAME])
1311                .unwrap()
1312                .get(),
1313            0
1314        );
1315        assert_eq!(
1316            indexer_metrics
1317                .total_watermarks_out_of_order
1318                .get_metric_with_label_values(&[B::NAME])
1319                .unwrap()
1320                .get(),
1321            5
1322        );
1323        assert_eq!(
1324            indexer_metrics
1325                .total_watermarks_out_of_order
1326                .get_metric_with_label_values(&[C::NAME])
1327                .unwrap()
1328                .get(),
1329            10
1330        );
1331        assert_eq!(
1332            indexer_metrics
1333                .total_watermarks_out_of_order
1334                .get_metric_with_label_values(&[D::NAME])
1335                .unwrap()
1336                .get(),
1337            15
1338        );
1339    }
1340
1341    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1342    #[tokio::test]
1343    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1344        let registry = Registry::new();
1345        let store = MockStore::default();
1346
1347        test_pipeline!(A, "concurrent_a");
1348        test_pipeline!(B, "concurrent_b");
1349        test_pipeline!(C, "sequential_c");
1350        test_pipeline!(D, "sequential_d");
1351
1352        let mut conn = store.connect().await.unwrap();
1353        conn.set_committer_watermark(
1354            A::NAME,
1355            CommitterWatermark {
1356                checkpoint_hi_inclusive: 5,
1357                ..Default::default()
1358            },
1359        )
1360        .await
1361        .unwrap();
1362        conn.set_committer_watermark(
1363            B::NAME,
1364            CommitterWatermark {
1365                checkpoint_hi_inclusive: 10,
1366                ..Default::default()
1367            },
1368        )
1369        .await
1370        .unwrap();
1371        conn.set_committer_watermark(
1372            C::NAME,
1373            CommitterWatermark {
1374                checkpoint_hi_inclusive: 15,
1375                ..Default::default()
1376            },
1377        )
1378        .await
1379        .unwrap();
1380        conn.set_committer_watermark(
1381            D::NAME,
1382            CommitterWatermark {
1383                checkpoint_hi_inclusive: 20,
1384                ..Default::default()
1385            },
1386        )
1387        .await
1388        .unwrap();
1389
1390        // Create synthetic ingestion data
1391        let temp_dir = tempfile::tempdir().unwrap();
1392        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1393            ingestion_dir: temp_dir.path().to_owned(),
1394            starting_checkpoint: 0,
1395            num_checkpoints: 30,
1396            checkpoint_size: 1,
1397        })
1398        .await;
1399
1400        let indexer_args = IndexerArgs {
1401            first_checkpoint: Some(3),
1402            last_checkpoint: Some(29),
1403            ..Default::default()
1404        };
1405
1406        let client_args = ClientArgs {
1407            ingestion: IngestionClientArgs {
1408                local_ingestion_path: Some(temp_dir.path().to_owned()),
1409                ..Default::default()
1410            },
1411            ..Default::default()
1412        };
1413
1414        let ingestion_config = IngestionConfig::default();
1415
1416        let mut indexer = Indexer::new(
1417            store.clone(),
1418            indexer_args,
1419            client_args,
1420            ingestion_config,
1421            None,
1422            &registry,
1423        )
1424        .await
1425        .unwrap();
1426
1427        indexer
1428            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1429            .await
1430            .unwrap();
1431        indexer
1432            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1433            .await
1434            .unwrap();
1435        indexer
1436            .sequential_pipeline::<C>(C, SequentialConfig::default())
1437            .await
1438            .unwrap();
1439        indexer
1440            .sequential_pipeline::<D>(D, SequentialConfig::default())
1441            .await
1442            .unwrap();
1443
1444        let ingestion_metrics = indexer.ingestion_metrics().clone();
1445        let metrics = indexer.indexer_metrics().clone();
1446        indexer.run().await.unwrap().join().await.unwrap();
1447
1448        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1449        assert_eq!(
1450            metrics
1451                .total_watermarks_out_of_order
1452                .get_metric_with_label_values(&[A::NAME])
1453                .unwrap()
1454                .get(),
1455            0
1456        );
1457        assert_eq!(
1458            metrics
1459                .total_watermarks_out_of_order
1460                .get_metric_with_label_values(&[B::NAME])
1461                .unwrap()
1462                .get(),
1463            5
1464        );
1465        assert_eq!(
1466            metrics
1467                .total_watermarks_out_of_order
1468                .get_metric_with_label_values(&[C::NAME])
1469                .unwrap()
1470                .get(),
1471            10
1472        );
1473        assert_eq!(
1474            metrics
1475                .total_watermarks_out_of_order
1476                .get_metric_with_label_values(&[D::NAME])
1477                .unwrap()
1478                .get(),
1479            15
1480        );
1481    }
1482
1483    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1484    #[tokio::test]
1485    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1486        let registry = Registry::new();
1487        let store = MockStore::default();
1488
1489        test_pipeline!(A, "concurrent_a");
1490        test_pipeline!(B, "concurrent_b");
1491        test_pipeline!(C, "sequential_c");
1492        test_pipeline!(D, "sequential_d");
1493
1494        let mut conn = store.connect().await.unwrap();
1495        conn.set_committer_watermark(
1496            B::NAME,
1497            CommitterWatermark {
1498                checkpoint_hi_inclusive: 10,
1499                ..Default::default()
1500            },
1501        )
1502        .await
1503        .unwrap();
1504        conn.set_committer_watermark(
1505            C::NAME,
1506            CommitterWatermark {
1507                checkpoint_hi_inclusive: 15,
1508                ..Default::default()
1509            },
1510        )
1511        .await
1512        .unwrap();
1513        conn.set_committer_watermark(
1514            D::NAME,
1515            CommitterWatermark {
1516                checkpoint_hi_inclusive: 20,
1517                ..Default::default()
1518            },
1519        )
1520        .await
1521        .unwrap();
1522
1523        // Create synthetic ingestion data
1524        let temp_dir = tempfile::tempdir().unwrap();
1525        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1526            ingestion_dir: temp_dir.path().to_owned(),
1527            starting_checkpoint: 0,
1528            num_checkpoints: 30,
1529            checkpoint_size: 1,
1530        })
1531        .await;
1532
1533        let indexer_args = IndexerArgs {
1534            last_checkpoint: Some(29),
1535            ..Default::default()
1536        };
1537
1538        let client_args = ClientArgs {
1539            ingestion: IngestionClientArgs {
1540                local_ingestion_path: Some(temp_dir.path().to_owned()),
1541                ..Default::default()
1542            },
1543            ..Default::default()
1544        };
1545
1546        let ingestion_config = IngestionConfig::default();
1547
1548        let mut indexer = Indexer::new(
1549            store.clone(),
1550            indexer_args,
1551            client_args,
1552            ingestion_config,
1553            None,
1554            &registry,
1555        )
1556        .await
1557        .unwrap();
1558
1559        indexer
1560            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1561            .await
1562            .unwrap();
1563        indexer
1564            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1565            .await
1566            .unwrap();
1567        indexer
1568            .sequential_pipeline::<C>(C, SequentialConfig::default())
1569            .await
1570            .unwrap();
1571        indexer
1572            .sequential_pipeline::<D>(D, SequentialConfig::default())
1573            .await
1574            .unwrap();
1575
1576        let ingestion_metrics = indexer.ingestion_metrics().clone();
1577        let metrics = indexer.indexer_metrics().clone();
1578        indexer.run().await.unwrap().join().await.unwrap();
1579
1580        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1581        assert_eq!(
1582            metrics
1583                .total_watermarks_out_of_order
1584                .get_metric_with_label_values(&[A::NAME])
1585                .unwrap()
1586                .get(),
1587            0
1588        );
1589        assert_eq!(
1590            metrics
1591                .total_watermarks_out_of_order
1592                .get_metric_with_label_values(&[B::NAME])
1593                .unwrap()
1594                .get(),
1595            11
1596        );
1597        assert_eq!(
1598            metrics
1599                .total_watermarks_out_of_order
1600                .get_metric_with_label_values(&[C::NAME])
1601                .unwrap()
1602                .get(),
1603            16
1604        );
1605        assert_eq!(
1606            metrics
1607                .total_watermarks_out_of_order
1608                .get_metric_with_label_values(&[D::NAME])
1609                .unwrap()
1610                .get(),
1611            21
1612        );
1613    }
1614
1615    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1616    #[tokio::test]
1617    async fn test_indexer_ingestion_use_first_checkpoint() {
1618        let registry = Registry::new();
1619        let store = MockStore::default();
1620
1621        test_pipeline!(A, "concurrent_a");
1622        test_pipeline!(B, "concurrent_b");
1623        test_pipeline!(C, "sequential_c");
1624        test_pipeline!(D, "sequential_d");
1625
1626        let mut conn = store.connect().await.unwrap();
1627        conn.set_committer_watermark(
1628            B::NAME,
1629            CommitterWatermark {
1630                checkpoint_hi_inclusive: 10,
1631                ..Default::default()
1632            },
1633        )
1634        .await
1635        .unwrap();
1636        conn.set_committer_watermark(
1637            C::NAME,
1638            CommitterWatermark {
1639                checkpoint_hi_inclusive: 15,
1640                ..Default::default()
1641            },
1642        )
1643        .await
1644        .unwrap();
1645        conn.set_committer_watermark(
1646            D::NAME,
1647            CommitterWatermark {
1648                checkpoint_hi_inclusive: 20,
1649                ..Default::default()
1650            },
1651        )
1652        .await
1653        .unwrap();
1654
1655        // Create synthetic ingestion data
1656        let temp_dir = tempfile::tempdir().unwrap();
1657        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1658            ingestion_dir: temp_dir.path().to_owned(),
1659            starting_checkpoint: 0,
1660            num_checkpoints: 30,
1661            checkpoint_size: 1,
1662        })
1663        .await;
1664
1665        let indexer_args = IndexerArgs {
1666            first_checkpoint: Some(10),
1667            last_checkpoint: Some(29),
1668            ..Default::default()
1669        };
1670
1671        let client_args = ClientArgs {
1672            ingestion: IngestionClientArgs {
1673                local_ingestion_path: Some(temp_dir.path().to_owned()),
1674                ..Default::default()
1675            },
1676            ..Default::default()
1677        };
1678
1679        let ingestion_config = IngestionConfig::default();
1680
1681        let mut indexer = Indexer::new(
1682            store.clone(),
1683            indexer_args,
1684            client_args,
1685            ingestion_config,
1686            None,
1687            &registry,
1688        )
1689        .await
1690        .unwrap();
1691
1692        indexer
1693            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1694            .await
1695            .unwrap();
1696        indexer
1697            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1698            .await
1699            .unwrap();
1700        indexer
1701            .sequential_pipeline::<C>(C, SequentialConfig::default())
1702            .await
1703            .unwrap();
1704        indexer
1705            .sequential_pipeline::<D>(D, SequentialConfig::default())
1706            .await
1707            .unwrap();
1708
1709        let ingestion_metrics = indexer.ingestion_metrics().clone();
1710        let metrics = indexer.indexer_metrics().clone();
1711        indexer.run().await.unwrap().join().await.unwrap();
1712
1713        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1714        assert_eq!(
1715            metrics
1716                .total_watermarks_out_of_order
1717                .get_metric_with_label_values(&[A::NAME])
1718                .unwrap()
1719                .get(),
1720            0
1721        );
1722        assert_eq!(
1723            metrics
1724                .total_watermarks_out_of_order
1725                .get_metric_with_label_values(&[B::NAME])
1726                .unwrap()
1727                .get(),
1728            1
1729        );
1730        assert_eq!(
1731            metrics
1732                .total_watermarks_out_of_order
1733                .get_metric_with_label_values(&[C::NAME])
1734                .unwrap()
1735                .get(),
1736            6
1737        );
1738        assert_eq!(
1739            metrics
1740                .total_watermarks_out_of_order
1741                .get_metric_with_label_values(&[D::NAME])
1742                .unwrap()
1743                .get(),
1744            11
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1750        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1751        assert_eq!(committer_watermark, None);
1752        assert_eq!(pruner_watermark, None);
1753    }
1754
1755    #[tokio::test]
1756    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1757        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1758        assert_eq!(committer_watermark, None);
1759        assert_eq!(pruner_watermark, None);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1764        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1765
1766        let committer_watermark = committer_watermark.unwrap();
1767        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1768
1769        let pruner_watermark = pruner_watermark.unwrap();
1770        assert_eq!(pruner_watermark.reader_lo, 1);
1771        assert_eq!(pruner_watermark.pruner_hi, 1);
1772    }
1773
1774    #[tokio::test]
1775    async fn test_init_watermark_sequential() {
1776        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1777
1778        let committer_watermark = committer_watermark.unwrap();
1779        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1780
1781        let pruner_watermark = pruner_watermark.unwrap();
1782        assert_eq!(pruner_watermark.reader_lo, 1);
1783        assert_eq!(pruner_watermark.pruner_hi, 1);
1784    }
1785
1786    #[tokio::test]
1787    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1788        let registry = Registry::new();
1789        let store = MockStore::default();
1790
1791        // Set up different watermarks for three different sequential pipelines
1792        let mut conn = store.connect().await.unwrap();
1793
1794        // First handler at checkpoint 10
1795        conn.set_committer_watermark(
1796            MockHandler::NAME,
1797            CommitterWatermark {
1798                epoch_hi_inclusive: 0,
1799                checkpoint_hi_inclusive: 10,
1800                tx_hi: 20,
1801                timestamp_ms_hi_inclusive: 10000,
1802            },
1803        )
1804        .await
1805        .unwrap();
1806
1807        // SequentialHandler at checkpoint 5
1808        conn.set_committer_watermark(
1809            SequentialHandler::NAME,
1810            CommitterWatermark {
1811                epoch_hi_inclusive: 0,
1812                checkpoint_hi_inclusive: 5,
1813                tx_hi: 10,
1814                timestamp_ms_hi_inclusive: 5000,
1815            },
1816        )
1817        .await
1818        .unwrap();
1819
1820        // Create synthetic ingestion data
1821        let temp_dir = tempfile::tempdir().unwrap();
1822        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1823            ingestion_dir: temp_dir.path().to_owned(),
1824            starting_checkpoint: 0,
1825            num_checkpoints: 20,
1826            checkpoint_size: 2,
1827        })
1828        .await;
1829
1830        let indexer_args = IndexerArgs {
1831            first_checkpoint: None,
1832            last_checkpoint: Some(19),
1833            pipeline: vec![],
1834            ..Default::default()
1835        };
1836
1837        let client_args = ClientArgs {
1838            ingestion: IngestionClientArgs {
1839                local_ingestion_path: Some(temp_dir.path().to_owned()),
1840                ..Default::default()
1841            },
1842            ..Default::default()
1843        };
1844
1845        let ingestion_config = IngestionConfig::default();
1846
1847        let mut indexer = Indexer::new(
1848            store.clone(),
1849            indexer_args,
1850            client_args,
1851            ingestion_config,
1852            None,
1853            &registry,
1854        )
1855        .await
1856        .unwrap();
1857
1858        // Add first sequential pipeline
1859        indexer
1860            .sequential_pipeline(MockHandler, SequentialConfig::default())
1861            .await
1862            .unwrap();
1863
1864        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1865        assert_eq!(
1866            indexer.next_sequential_checkpoint(),
1867            Some(11),
1868            "next_sequential_checkpoint should be 11"
1869        );
1870
1871        // Add second sequential pipeline
1872        indexer
1873            .sequential_pipeline(SequentialHandler, SequentialConfig::default())
1874            .await
1875            .unwrap();
1876
1877        // Should change to 6 (minimum of 6 and 11)
1878        assert_eq!(
1879            indexer.next_sequential_checkpoint(),
1880            Some(6),
1881            "next_sequential_checkpoint should still be 6"
1882        );
1883
1884        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1885        indexer.run().await.unwrap().join().await.unwrap();
1886
1887        // Verify each pipeline made some progress independently
1888        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1889        let watermark2 = conn
1890            .committer_watermark(SequentialHandler::NAME)
1891            .await
1892            .unwrap();
1893
1894        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1895        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1896    }
1897
1898    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1899    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1900    /// committing checkpoints less than the main pipeline's reader watermark.
1901    #[tokio::test]
1902    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1903        let registry = Registry::new();
1904        let store = MockStore::default();
1905
1906        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1907        // reader watermark at `7`.
1908        let mut conn = store.connect().await.unwrap();
1909        conn.set_committer_watermark(
1910            MockCheckpointSequenceNumberHandler::NAME,
1911            CommitterWatermark {
1912                checkpoint_hi_inclusive: 10,
1913                ..Default::default()
1914            },
1915        )
1916        .await
1917        .unwrap();
1918        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1919            .await
1920            .unwrap();
1921
1922        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1923        // be ignored by the tasked indexer.
1924        let indexer_args = IndexerArgs {
1925            first_checkpoint: Some(0),
1926            last_checkpoint: Some(15),
1927            task: TaskArgs::tasked("task".to_string(), 10),
1928            ..Default::default()
1929        };
1930        let temp_dir = tempfile::tempdir().unwrap();
1931        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1932            ingestion_dir: temp_dir.path().to_owned(),
1933            starting_checkpoint: 0,
1934            num_checkpoints: 16,
1935            checkpoint_size: 2,
1936        })
1937        .await;
1938
1939        let client_args = ClientArgs {
1940            ingestion: IngestionClientArgs {
1941                local_ingestion_path: Some(temp_dir.path().to_owned()),
1942                ..Default::default()
1943            },
1944            ..Default::default()
1945        };
1946
1947        let ingestion_config = IngestionConfig::default();
1948
1949        let mut tasked_indexer = Indexer::new(
1950            store.clone(),
1951            indexer_args,
1952            client_args,
1953            ingestion_config,
1954            None,
1955            &registry,
1956        )
1957        .await
1958        .unwrap();
1959
1960        let _ = tasked_indexer
1961            .concurrent_pipeline(
1962                MockCheckpointSequenceNumberHandler,
1963                ConcurrentConfig::default(),
1964            )
1965            .await;
1966
1967        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1968        let metrics = tasked_indexer.indexer_metrics().clone();
1969
1970        tasked_indexer.run().await.unwrap().join().await.unwrap();
1971
1972        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1973        assert_eq!(
1974            metrics
1975                .total_collector_skipped_checkpoints
1976                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1977                .unwrap()
1978                .get(),
1979            7
1980        );
1981        let data = store
1982            .data
1983            .get(MockCheckpointSequenceNumberHandler::NAME)
1984            .unwrap();
1985        assert_eq!(data.len(), 9);
1986        for i in 0..7 {
1987            assert!(data.get(&i).is_none());
1988        }
1989        for i in 7..16 {
1990            assert!(data.get(&i).is_some());
1991        }
1992    }
1993
1994    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1995    #[tokio::test]
1996    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1997        let registry = Registry::new();
1998        let store = MockStore::default();
1999
2000        let mut conn = store.connect().await.unwrap();
2001        conn.set_committer_watermark(
2002            "test",
2003            CommitterWatermark {
2004                checkpoint_hi_inclusive: 10,
2005                ..Default::default()
2006            },
2007        )
2008        .await
2009        .unwrap();
2010        conn.set_reader_watermark("test", 5).await.unwrap();
2011
2012        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
2013        // watermarks.
2014        let indexer_args = IndexerArgs {
2015            first_checkpoint: Some(9),
2016            last_checkpoint: Some(25),
2017            task: TaskArgs::tasked("task".to_string(), 10),
2018            ..Default::default()
2019        };
2020        let temp_dir = tempfile::tempdir().unwrap();
2021        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2022            ingestion_dir: temp_dir.path().to_owned(),
2023            starting_checkpoint: 0,
2024            num_checkpoints: 26,
2025            checkpoint_size: 2,
2026        })
2027        .await;
2028
2029        let client_args = ClientArgs {
2030            ingestion: IngestionClientArgs {
2031                local_ingestion_path: Some(temp_dir.path().to_owned()),
2032                ..Default::default()
2033            },
2034            ..Default::default()
2035        };
2036
2037        let ingestion_config = IngestionConfig::default();
2038
2039        let mut tasked_indexer = Indexer::new(
2040            store.clone(),
2041            indexer_args,
2042            client_args,
2043            ingestion_config,
2044            None,
2045            &registry,
2046        )
2047        .await
2048        .unwrap();
2049
2050        let _ = tasked_indexer
2051            .concurrent_pipeline(
2052                MockCheckpointSequenceNumberHandler,
2053                ConcurrentConfig::default(),
2054            )
2055            .await;
2056
2057        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2058        let metrics = tasked_indexer.indexer_metrics().clone();
2059
2060        tasked_indexer.run().await.unwrap().join().await.unwrap();
2061
2062        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2063        assert_eq!(
2064            metrics
2065                .total_watermarks_out_of_order
2066                .get_metric_with_label_values(&["test"])
2067                .unwrap()
2068                .get(),
2069            0
2070        );
2071        assert_eq!(
2072            metrics
2073                .total_collector_skipped_checkpoints
2074                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2075                .unwrap()
2076                .get(),
2077            0
2078        );
2079
2080        let data = store.data.get("test").unwrap();
2081        assert_eq!(data.len(), 17);
2082        for i in 0..9 {
2083            assert!(data.get(&i).is_none());
2084        }
2085        for i in 9..26 {
2086            assert!(data.get(&i).is_some());
2087        }
2088        let main_pipeline_watermark = store.watermark("test").unwrap();
2089        // assert that the main pipeline's watermarks are not updated
2090        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2091        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2092        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2093        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2094        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2095    }
2096
2097    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2098    /// committed, and any checkpoints inflight < X will be skipped.
2099    #[tokio::test]
2100    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2101        let registry = Registry::new();
2102        let store = MockStore::default();
2103        let mut conn = store.connect().await.unwrap();
2104        // Set the main pipeline watermark.
2105        conn.set_committer_watermark(
2106            ControllableHandler::NAME,
2107            CommitterWatermark {
2108                checkpoint_hi_inclusive: 11,
2109                ..Default::default()
2110            },
2111        )
2112        .await
2113        .unwrap();
2114
2115        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2116        let temp_dir = tempfile::tempdir().unwrap();
2117        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2118            ingestion_dir: temp_dir.path().to_owned(),
2119            starting_checkpoint: 0,
2120            num_checkpoints: 501,
2121            checkpoint_size: 2,
2122        })
2123        .await;
2124        let indexer_args = IndexerArgs {
2125            first_checkpoint: Some(0),
2126            last_checkpoint: Some(500),
2127            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2128            ..Default::default()
2129        };
2130        let client_args = ClientArgs {
2131            ingestion: IngestionClientArgs {
2132                local_ingestion_path: Some(temp_dir.path().to_owned()),
2133                ..Default::default()
2134            },
2135            ..Default::default()
2136        };
2137        let ingestion_config = IngestionConfig::default();
2138        let mut tasked_indexer = Indexer::new(
2139            store.clone(),
2140            indexer_args,
2141            client_args,
2142            ingestion_config,
2143            None,
2144            &registry,
2145        )
2146        .await
2147        .unwrap();
2148        let mut allow_process = 10;
2149        // Limit the pipeline to process only checkpoints `[0, 10]`.
2150        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2151        let _ = tasked_indexer
2152            .concurrent_pipeline(
2153                controllable_handler,
2154                ConcurrentConfig {
2155                    committer: CommitterConfig {
2156                        collect_interval_ms: 10,
2157                        watermark_interval_ms: 10,
2158                        ..Default::default()
2159                    },
2160                    // High fixed concurrency so all checkpoints can be processed
2161                    // concurrently despite out-of-order arrival.
2162                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2163                    ..Default::default()
2164                },
2165            )
2166            .await;
2167        let metrics = tasked_indexer.indexer_metrics().clone();
2168
2169        let mut s_indexer = tasked_indexer.run().await.unwrap();
2170
2171        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2172        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2173        // committed.
2174        store
2175            .wait_for_watermark(
2176                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2177                10,
2178                Duration::from_secs(10),
2179            )
2180            .await;
2181
2182        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2183        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2184        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2185        // one at a time until the collector_reader_lo metric shows the new value.
2186        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2187            .await
2188            .unwrap();
2189
2190        let reader_lo = metrics
2191            .collector_reader_lo
2192            .with_label_values(&[ControllableHandler::NAME]);
2193
2194        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2195        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2196        // checkpoints have been processed.
2197        let mut interval = tokio::time::interval(Duration::from_millis(10));
2198        while reader_lo.get() != 250 {
2199            interval.tick().await;
2200            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2201            allow_process += 1;
2202            assert!(
2203                allow_process <= 500,
2204                "Released all checkpoints but collector never observed new reader_lo"
2205            );
2206            process_below.send(allow_process).ok();
2207        }
2208
2209        // At this point, the collector has observed reader_lo = 250. Release all remaining
2210        // checkpoints. Guarantees:
2211        // - [0, 10]: committed (before reader_lo was set)
2212        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2213        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2214        // - [250, 500]: committed (>= reader_lo)
2215        process_below.send(500).ok();
2216
2217        s_indexer.join().await.unwrap();
2218
2219        let data = store.data.get(ControllableHandler::NAME).unwrap();
2220
2221        // Checkpoints (allow_process, 250) must be skipped.
2222        for chkpt in (allow_process + 1)..250 {
2223            assert!(
2224                data.get(&chkpt).is_none(),
2225                "Checkpoint {chkpt} should have been skipped"
2226            );
2227        }
2228
2229        // Checkpoints >= reader_lo must be committed.
2230        for chkpt in 250..=500 {
2231            assert!(
2232                data.get(&chkpt).is_some(),
2233                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2234            );
2235        }
2236
2237        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2238        for chkpt in 0..=10 {
2239            assert!(
2240                data.get(&chkpt).is_some(),
2241                "Checkpoint {chkpt} should have been committed (baseline)"
2242            );
2243        }
2244    }
2245}