sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::ops::Bound;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Context;
10use anyhow::bail;
11use anyhow::ensure;
12use ingestion::ClientArgs;
13use ingestion::IngestionConfig;
14use ingestion::IngestionService;
15use ingestion::ingestion_client::IngestionClient;
16use metrics::IndexerMetrics;
17use prometheus::Registry;
18use sui_indexer_alt_framework_store_traits::ConcurrentStore;
19use sui_indexer_alt_framework_store_traits::Connection;
20use sui_indexer_alt_framework_store_traits::SequentialStore;
21use sui_indexer_alt_framework_store_traits::Store;
22use sui_indexer_alt_framework_store_traits::pipeline_task;
23use tracing::info;
24
25use crate::metrics::IngestionMetrics;
26use crate::pipeline::Processor;
27use crate::pipeline::concurrent::ConcurrentConfig;
28use crate::pipeline::concurrent::{self};
29use crate::pipeline::sequential::SequentialConfig;
30use crate::pipeline::sequential::{self};
31use crate::service::Service;
32
33pub use anyhow::Result;
34pub use sui_field_count::FieldCount;
35pub use sui_futures::service;
36/// External users access the store trait through framework::store
37pub use sui_indexer_alt_framework_store_traits as store;
38pub use sui_types as types;
39
40#[cfg(feature = "cluster")]
41pub mod cluster;
42pub mod config;
43pub mod ingestion;
44pub mod metrics;
45pub mod pipeline;
46#[cfg(feature = "postgres")]
47pub mod postgres;
48
49#[cfg(test)]
50pub mod mocks;
51
52/// Command-line arguments for the indexer
53#[derive(clap::Args, Default, Debug, Clone)]
54pub struct IndexerArgs {
55    /// Override the next checkpoint for all pipelines without a committer watermark to start
56    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
57    /// setting and always resume from their committer watermark + 1.
58    ///
59    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
60    /// is the minimum across all pipelines' next checkpoints.
61    #[arg(long)]
62    pub first_checkpoint: Option<u64>,
63
64    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
65    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
66    #[arg(long)]
67    pub last_checkpoint: Option<u64>,
68
69    /// Only run the following pipelines. If not provided, all pipelines found in the
70    /// configuration file will be run.
71    #[arg(long, action = clap::ArgAction::Append)]
72    pub pipeline: Vec<String>,
73
74    /// Additional configurations for running a tasked indexer.
75    #[clap(flatten)]
76    pub task: TaskArgs,
77}
78
79/// Command-line arguments for configuring a tasked indexer.
80#[derive(clap::Parser, Default, Debug, Clone)]
81pub struct TaskArgs {
82    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
83    /// delimiter defined on the store. This allows the same pipelines to run under multiple
84    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
85    /// entries in the database.
86    ///
87    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
88    ///
89    /// Sequential pipelines cannot be attached to a tasked indexer.
90    ///
91    /// The framework ensures that tasked pipelines never commit checkpoints below the main
92    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
93    #[arg(long, requires = "reader_interval_ms")]
94    task: Option<String>,
95
96    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
97    /// refetch its main pipeline's reader watermark.
98    ///
99    /// This is required when `--task` is set and should should ideally be set to a value that is
100    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
101    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
102    /// it.
103    ///
104    /// If the main pipeline does not have pruning enabled, this value can be set to some high
105    /// value, as the tasked pipeline will never see an updated reader watermark.
106    #[arg(long, requires = "task")]
107    reader_interval_ms: Option<u64>,
108}
109
110pub struct Indexer<S: Store> {
111    /// The storage backend that the indexer uses to write and query indexed data. This
112    /// generic implementation allows for plugging in different storage solutions that implement the
113    /// `Store` trait.
114    store: S,
115
116    /// Prometheus Metrics.
117    metrics: Arc<IndexerMetrics>,
118
119    /// Service for downloading and disseminating checkpoint data.
120    ingestion_service: IngestionService,
121
122    /// Optional override of the checkpoint lowerbound. When set, pipelines without a committer
123    /// watermark will start processing at this checkpoint.
124    first_checkpoint: Option<u64>,
125
126    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
127    /// this checkpoint.
128    last_checkpoint: Option<u64>,
129
130    /// The minimum `next_checkpoint` across all pipelines. This is the checkpoint for the indexer
131    /// to start ingesting from.
132    next_checkpoint: u64,
133
134    /// The minimum `next_checkpoint` across all sequential pipelines. This is used to initialize
135    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
136    next_sequential_checkpoint: Option<u64>,
137
138    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
139    /// delimiter defined on the store. This allows the same pipelines to run under multiple
140    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
141    /// entries in the database.
142    ///
143    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
144    ///
145    /// Sequential pipelines cannot be attached to a tasked indexer.
146    ///
147    /// The framework ensures that tasked pipelines never commit checkpoints below the main
148    /// pipeline’s pruner watermark.
149    task: Option<Task>,
150
151    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
152    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
153    /// a warning when the indexer is run.
154    enabled_pipelines: Option<BTreeSet<String>>,
155
156    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
157    /// with the same name isn't added twice.
158    added_pipelines: BTreeSet<&'static str>,
159
160    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
161    pipelines: Vec<Service>,
162}
163
164/// Configuration for a tasked indexer.
165#[derive(Clone)]
166pub(crate) struct Task {
167    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
168    /// record pipeline watermarks.
169    task: String,
170    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
171    /// pipeline's reader watermark.
172    reader_interval: Duration,
173}
174
175impl TaskArgs {
176    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
177        Self {
178            task: Some(task),
179            reader_interval_ms: Some(reader_interval_ms),
180        }
181    }
182
183    fn into_task(self) -> Option<Task> {
184        Some(Task {
185            task: self.task?,
186            reader_interval: Duration::from_millis(self.reader_interval_ms?),
187        })
188    }
189}
190
191impl<S: Store> Indexer<S> {
192    /// Create a new instance of the indexer framework from a store that implements the `Store`
193    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
194    /// arguments configure the following:
195    ///
196    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
197    ///   watermarks) and where to serve metrics from,
198    /// - Where to download checkpoints from,
199    /// - Concurrency and buffering parameters for downloading checkpoints.
200    ///
201    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
202    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
203    pub async fn new(
204        store: S,
205        indexer_args: IndexerArgs,
206        client_args: ClientArgs,
207        ingestion_config: IngestionConfig,
208        metrics_prefix: Option<&str>,
209        registry: &Registry,
210    ) -> Result<Self> {
211        let IndexerArgs {
212            first_checkpoint,
213            last_checkpoint,
214            pipeline,
215            task,
216        } = indexer_args;
217
218        let metrics = IndexerMetrics::new(metrics_prefix, registry);
219
220        let ingestion_service =
221            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
222
223        Ok(Self {
224            store,
225            metrics,
226            ingestion_service,
227            first_checkpoint,
228            last_checkpoint,
229            next_checkpoint: u64::MAX,
230            next_sequential_checkpoint: None,
231            task: task.into_task(),
232            enabled_pipelines: if pipeline.is_empty() {
233                None
234            } else {
235                Some(pipeline.into_iter().collect())
236            },
237            added_pipelines: BTreeSet::new(),
238            pipelines: vec![],
239        })
240    }
241
242    /// The store used by the indexer.
243    pub fn store(&self) -> &S {
244        &self.store
245    }
246
247    /// The ingestion client used by the indexer to fetch checkpoints.
248    pub fn ingestion_client(&self) -> &IngestionClient {
249        self.ingestion_service.ingestion_client()
250    }
251
252    /// The indexer's metrics.
253    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
254        &self.metrics
255    }
256
257    /// The ingestion service's metrics.
258    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
259        self.ingestion_service.metrics()
260    }
261
262    /// The pipelines that this indexer will run.
263    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
264        self.added_pipelines.iter().copied().filter(|p| {
265            self.enabled_pipelines
266                .as_ref()
267                .is_none_or(|e| e.contains(*p))
268        })
269    }
270
271    /// The minimum next checkpoint across all sequential pipelines. This value is used to
272    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
273    /// too far ahead of sequential pipelines.
274    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
275        self.next_sequential_checkpoint
276    }
277
278    /// Start ingesting checkpoints from `next_checkpoint`. Individual pipelines
279    /// will start processing and committing once the ingestion service has caught up to their
280    /// respective watermarks.
281    ///
282    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
283    pub async fn run(self) -> anyhow::Result<Service> {
284        if let Some(enabled_pipelines) = self.enabled_pipelines {
285            ensure!(
286                enabled_pipelines.is_empty(),
287                "Tried to enable pipelines that this indexer does not know about: \
288                {enabled_pipelines:#?}",
289            );
290        }
291
292        let start = self.next_checkpoint;
293        let end = self.last_checkpoint;
294        info!(start, end, "Ingestion range");
295
296        let mut service = self
297            .ingestion_service
298            .run(
299                (
300                    Bound::Included(start),
301                    end.map_or(Bound::Unbounded, Bound::Included),
302                ),
303                self.next_sequential_checkpoint,
304            )
305            .await
306            .context("Failed to start ingestion service")?;
307
308        for pipeline in self.pipelines {
309            service = service.merge(pipeline);
310        }
311
312        Ok(service)
313    }
314
315    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
316    /// checkpoint after its watermark, or if that doesn't exist, then the provided
317    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
318    ///
319    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
320    /// calculated above.
321    ///
322    /// Returns `Ok(None)` if the pipeline is disabled.
323    async fn add_pipeline<P: Processor + 'static>(
324        &mut self,
325        pipeline_task: String,
326    ) -> Result<Option<u64>> {
327        ensure!(
328            self.added_pipelines.insert(P::NAME),
329            "Pipeline {:?} already added",
330            P::NAME,
331        );
332
333        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
334            && !enabled_pipelines.remove(P::NAME)
335        {
336            info!(pipeline = P::NAME, "Skipping");
337            return Ok(None);
338        }
339
340        // Create a new record based on `proposed_next_checkpoint` if one does not exist.
341        // Otherwise, use the existing record and disregard the proposed value.
342        let proposed_next_checkpoint = self.first_checkpoint.unwrap_or(0);
343        let mut conn = self.store.connect().await?;
344        let init_watermark = conn
345            .init_watermark(&pipeline_task, proposed_next_checkpoint.checked_sub(1))
346            .await
347            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
348
349        let next_checkpoint = if let Some(init_watermark) = init_watermark {
350            if let Some(checkpoint_hi_inclusive) = init_watermark.checkpoint_hi_inclusive {
351                checkpoint_hi_inclusive + 1
352            } else {
353                0
354            }
355        } else {
356            proposed_next_checkpoint
357        };
358
359        self.next_checkpoint = next_checkpoint.min(self.next_checkpoint);
360
361        Ok(Some(next_checkpoint))
362    }
363}
364
365impl<S: ConcurrentStore> Indexer<S> {
366    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
367    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
368    ///
369    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
370    /// keep the watermark table up-to-date with the highest point they can guarantee all data
371    /// exists for, for their pipeline.
372    pub async fn concurrent_pipeline<H: concurrent::Handler<Store = S>>(
373        &mut self,
374        handler: H,
375        config: ConcurrentConfig,
376    ) -> Result<()> {
377        let pipeline_task =
378            pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
379        let Some(next_checkpoint) = self.add_pipeline::<H>(pipeline_task).await? else {
380            return Ok(());
381        };
382
383        self.pipelines.push(concurrent::pipeline::<H>(
384            handler,
385            next_checkpoint,
386            config,
387            self.store.clone(),
388            self.task.clone(),
389            self.ingestion_service.subscribe().0,
390            self.metrics.clone(),
391        ));
392
393        Ok(())
394    }
395}
396
397impl<T: SequentialStore> Indexer<T> {
398    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
399    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
400    ///
401    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
402    /// required to handle pipelines that modify data in-place (where each update is not an insert,
403    /// but could be a modification of an existing row, where ordering between updates is
404    /// important).
405    ///
406    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
407    /// number of checkpoints (configured by `checkpoint_lag`).
408    pub async fn sequential_pipeline<H: sequential::Handler<Store = T>>(
409        &mut self,
410        handler: H,
411        config: SequentialConfig,
412    ) -> Result<()> {
413        if self.task.is_some() {
414            bail!(
415                "Sequential pipelines do not support pipeline tasks. \
416                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
417                Running the same pipeline under a different task would violate these guarantees."
418            );
419        }
420
421        let Some(next_checkpoint) = self.add_pipeline::<H>(H::NAME.to_owned()).await? else {
422            return Ok(());
423        };
424
425        // Track the minimum next_checkpoint across all sequential pipelines
426        self.next_sequential_checkpoint = Some(
427            self.next_sequential_checkpoint
428                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
429        );
430
431        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
432
433        self.pipelines.push(sequential::pipeline::<H>(
434            handler,
435            next_checkpoint,
436            config,
437            self.store.clone(),
438            checkpoint_rx,
439            commit_hi_tx,
440            self.metrics.clone(),
441        ));
442
443        Ok(())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use std::sync::Arc;
450
451    use async_trait::async_trait;
452    use clap::Parser;
453    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
454    use sui_synthetic_ingestion::synthetic_ingestion;
455    use tokio::sync::watch;
456
457    use crate::FieldCount;
458    use crate::config::ConcurrencyConfig;
459    use crate::ingestion::ingestion_client::IngestionClientArgs;
460    use crate::mocks::store::MockStore;
461    use crate::pipeline::CommitterConfig;
462    use crate::pipeline::Processor;
463    use crate::pipeline::concurrent::ConcurrentConfig;
464    use crate::store::CommitterWatermark;
465    use crate::store::ConcurrentConnection as _;
466    use crate::store::Connection as _;
467
468    use super::*;
469
470    #[allow(dead_code)]
471    #[derive(Clone, FieldCount)]
472    struct MockValue(u64);
473
474    /// A handler that can be controlled externally to block checkpoint processing.
475    struct ControllableHandler {
476        /// Process checkpoints less than or equal to this value.
477        process_below: watch::Receiver<u64>,
478    }
479
480    impl ControllableHandler {
481        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
482            let (tx, rx) = watch::channel(limit);
483            (Self { process_below: rx }, tx)
484        }
485    }
486
487    #[async_trait]
488    impl Processor for ControllableHandler {
489        const NAME: &'static str = "controllable";
490        type Value = MockValue;
491
492        async fn process(
493            &self,
494            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
495        ) -> anyhow::Result<Vec<Self::Value>> {
496            let cp_num = checkpoint.summary.sequence_number;
497
498            // Wait until the checkpoint is allowed to be processed
499            self.process_below
500                .clone()
501                .wait_for(|&limit| cp_num <= limit)
502                .await
503                .ok();
504
505            Ok(vec![MockValue(cp_num)])
506        }
507    }
508
509    #[async_trait]
510    impl concurrent::Handler for ControllableHandler {
511        type Store = MockStore;
512        type Batch = Vec<MockValue>;
513
514        fn batch(
515            &self,
516            batch: &mut Self::Batch,
517            values: &mut std::vec::IntoIter<Self::Value>,
518        ) -> concurrent::BatchStatus {
519            batch.extend(values);
520            concurrent::BatchStatus::Ready
521        }
522
523        async fn commit<'a>(
524            &self,
525            batch: &Self::Batch,
526            conn: &mut <Self::Store as Store>::Connection<'a>,
527        ) -> anyhow::Result<usize> {
528            for value in batch {
529                conn.0
530                    .commit_data(Self::NAME, value.0, vec![value.0])
531                    .await?;
532            }
533            Ok(batch.len())
534        }
535    }
536
537    macro_rules! test_pipeline {
538        ($handler:ident, $name:literal) => {
539            struct $handler;
540
541            #[async_trait]
542            impl Processor for $handler {
543                const NAME: &'static str = $name;
544                type Value = MockValue;
545                async fn process(
546                    &self,
547                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
548                ) -> anyhow::Result<Vec<Self::Value>> {
549                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
550                }
551            }
552
553            #[async_trait]
554            impl crate::pipeline::concurrent::Handler for $handler {
555                type Store = MockStore;
556                type Batch = Vec<Self::Value>;
557
558                fn batch(
559                    &self,
560                    batch: &mut Self::Batch,
561                    values: &mut std::vec::IntoIter<Self::Value>,
562                ) -> crate::pipeline::concurrent::BatchStatus {
563                    batch.extend(values);
564                    crate::pipeline::concurrent::BatchStatus::Pending
565                }
566
567                async fn commit<'a>(
568                    &self,
569                    batch: &Self::Batch,
570                    conn: &mut <Self::Store as Store>::Connection<'a>,
571                ) -> anyhow::Result<usize> {
572                    for value in batch {
573                        conn.0
574                            .commit_data(Self::NAME, value.0, vec![value.0])
575                            .await?;
576                    }
577                    Ok(batch.len())
578                }
579            }
580
581            #[async_trait]
582            impl crate::pipeline::sequential::Handler for $handler {
583                type Store = MockStore;
584                type Batch = Vec<Self::Value>;
585
586                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
587                    batch.extend(values);
588                }
589
590                async fn commit<'a>(
591                    &self,
592                    _batch: &Self::Batch,
593                    _conn: &mut <Self::Store as Store>::Connection<'a>,
594                ) -> anyhow::Result<usize> {
595                    Ok(1)
596                }
597            }
598        };
599    }
600
601    test_pipeline!(MockHandler, "test_processor");
602    test_pipeline!(SequentialHandler, "sequential_handler");
603    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
604
605    async fn test_init_watermark(
606        first_checkpoint: Option<u64>,
607        is_concurrent: bool,
608    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
609        let registry = Registry::new();
610        let store = MockStore::default();
611
612        test_pipeline!(A, "pipeline_name");
613
614        let mut conn = store.connect().await.unwrap();
615
616        let indexer_args = IndexerArgs {
617            first_checkpoint,
618            ..IndexerArgs::default()
619        };
620        let temp_dir = tempfile::tempdir().unwrap();
621        let client_args = ClientArgs {
622            ingestion: IngestionClientArgs {
623                local_ingestion_path: Some(temp_dir.path().to_owned()),
624                ..Default::default()
625            },
626            ..Default::default()
627        };
628        let ingestion_config = IngestionConfig::default();
629
630        let mut indexer = Indexer::new(
631            store.clone(),
632            indexer_args,
633            client_args,
634            ingestion_config,
635            None,
636            &registry,
637        )
638        .await
639        .unwrap();
640
641        if is_concurrent {
642            indexer
643                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
644                .await
645                .unwrap();
646        } else {
647            indexer
648                .sequential_pipeline::<A>(A, SequentialConfig::default())
649                .await
650                .unwrap();
651        }
652
653        (
654            conn.committer_watermark(A::NAME).await.unwrap(),
655            conn.pruner_watermark(A::NAME, Duration::ZERO)
656                .await
657                .unwrap(),
658        )
659    }
660
661    #[test]
662    fn test_arg_parsing() {
663        #[derive(Parser)]
664        struct Args {
665            #[clap(flatten)]
666            indexer: IndexerArgs,
667        }
668
669        let args = Args::try_parse_from([
670            "cmd",
671            "--first-checkpoint",
672            "10",
673            "--last-checkpoint",
674            "100",
675            "--pipeline",
676            "a",
677            "--pipeline",
678            "b",
679            "--task",
680            "t",
681            "--reader-interval-ms",
682            "5000",
683        ])
684        .unwrap();
685
686        assert_eq!(args.indexer.first_checkpoint, Some(10));
687        assert_eq!(args.indexer.last_checkpoint, Some(100));
688        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
689        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
690        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
691    }
692
693    /// next_checkpoint is smallest among existing watermarks + 1.
694    #[tokio::test]
695    async fn test_next_checkpoint_all_pipelines_have_watermarks() {
696        let registry = Registry::new();
697        let store = MockStore::default();
698
699        test_pipeline!(A, "concurrent_a");
700        test_pipeline!(B, "concurrent_b");
701        test_pipeline!(C, "sequential_c");
702        test_pipeline!(D, "sequential_d");
703
704        let mut conn = store.connect().await.unwrap();
705
706        conn.init_watermark(A::NAME, Some(0)).await.unwrap();
707        conn.set_committer_watermark(
708            A::NAME,
709            CommitterWatermark {
710                checkpoint_hi_inclusive: 100,
711                ..Default::default()
712            },
713        )
714        .await
715        .unwrap();
716
717        conn.init_watermark(B::NAME, Some(0)).await.unwrap();
718        conn.set_committer_watermark(
719            B::NAME,
720            CommitterWatermark {
721                checkpoint_hi_inclusive: 10,
722                ..Default::default()
723            },
724        )
725        .await
726        .unwrap();
727
728        conn.init_watermark(C::NAME, Some(0)).await.unwrap();
729        conn.set_committer_watermark(
730            C::NAME,
731            CommitterWatermark {
732                checkpoint_hi_inclusive: 1,
733                ..Default::default()
734            },
735        )
736        .await
737        .unwrap();
738
739        conn.init_watermark(D::NAME, Some(0)).await.unwrap();
740        conn.set_committer_watermark(
741            D::NAME,
742            CommitterWatermark {
743                checkpoint_hi_inclusive: 50,
744                ..Default::default()
745            },
746        )
747        .await
748        .unwrap();
749
750        let indexer_args = IndexerArgs::default();
751        let temp_dir = tempfile::tempdir().unwrap();
752        let client_args = ClientArgs {
753            ingestion: IngestionClientArgs {
754                local_ingestion_path: Some(temp_dir.path().to_owned()),
755                ..Default::default()
756            },
757            ..Default::default()
758        };
759        let ingestion_config = IngestionConfig::default();
760
761        let mut indexer = Indexer::new(
762            store,
763            indexer_args,
764            client_args,
765            ingestion_config,
766            None,
767            &registry,
768        )
769        .await
770        .unwrap();
771
772        indexer
773            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
774            .await
775            .unwrap();
776        indexer
777            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
778            .await
779            .unwrap();
780        indexer
781            .sequential_pipeline::<C>(C, SequentialConfig::default())
782            .await
783            .unwrap();
784        indexer
785            .sequential_pipeline::<D>(D, SequentialConfig::default())
786            .await
787            .unwrap();
788
789        assert_eq!(indexer.first_checkpoint, None);
790        assert_eq!(indexer.last_checkpoint, None);
791        assert_eq!(indexer.next_checkpoint, 2);
792        assert_eq!(indexer.next_sequential_checkpoint, Some(2));
793    }
794
795    /// next_checkpoint is 0 when at least one pipeline has no watermark.
796    #[tokio::test]
797    async fn test_next_checkpoint_not_all_pipelines_have_watermarks() {
798        let registry = Registry::new();
799        let store = MockStore::default();
800
801        test_pipeline!(A, "concurrent_a");
802        test_pipeline!(B, "concurrent_b");
803        test_pipeline!(C, "sequential_c");
804        test_pipeline!(D, "sequential_d");
805
806        let mut conn = store.connect().await.unwrap();
807        conn.set_committer_watermark(
808            B::NAME,
809            CommitterWatermark {
810                checkpoint_hi_inclusive: 10,
811                ..Default::default()
812            },
813        )
814        .await
815        .unwrap();
816        conn.set_committer_watermark(
817            C::NAME,
818            CommitterWatermark {
819                checkpoint_hi_inclusive: 1,
820                ..Default::default()
821            },
822        )
823        .await
824        .unwrap();
825
826        let indexer_args = IndexerArgs::default();
827        let temp_dir = tempfile::tempdir().unwrap();
828        let client_args = ClientArgs {
829            ingestion: IngestionClientArgs {
830                local_ingestion_path: Some(temp_dir.path().to_owned()),
831                ..Default::default()
832            },
833            ..Default::default()
834        };
835        let ingestion_config = IngestionConfig::default();
836
837        let mut indexer = Indexer::new(
838            store,
839            indexer_args,
840            client_args,
841            ingestion_config,
842            None,
843            &registry,
844        )
845        .await
846        .unwrap();
847
848        indexer
849            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
850            .await
851            .unwrap();
852        indexer
853            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
854            .await
855            .unwrap();
856        indexer
857            .sequential_pipeline::<C>(C, SequentialConfig::default())
858            .await
859            .unwrap();
860        indexer
861            .sequential_pipeline::<D>(D, SequentialConfig::default())
862            .await
863            .unwrap();
864
865        assert_eq!(indexer.first_checkpoint, None);
866        assert_eq!(indexer.last_checkpoint, None);
867        assert_eq!(indexer.next_checkpoint, 0);
868        assert_eq!(indexer.next_sequential_checkpoint, Some(0));
869    }
870
871    /// next_checkpoint is 1 when smallest committer watermark is 0.
872    #[tokio::test]
873    async fn test_next_checkpoint_smallest_is_0() {
874        let registry = Registry::new();
875        let store = MockStore::default();
876
877        test_pipeline!(A, "concurrent_a");
878        test_pipeline!(B, "concurrent_b");
879        test_pipeline!(C, "sequential_c");
880        test_pipeline!(D, "sequential_d");
881
882        let mut conn = store.connect().await.unwrap();
883        conn.set_committer_watermark(
884            A::NAME,
885            CommitterWatermark {
886                checkpoint_hi_inclusive: 100,
887                ..Default::default()
888            },
889        )
890        .await
891        .unwrap();
892        conn.set_committer_watermark(
893            B::NAME,
894            CommitterWatermark {
895                checkpoint_hi_inclusive: 10,
896                ..Default::default()
897            },
898        )
899        .await
900        .unwrap();
901        conn.set_committer_watermark(
902            C::NAME,
903            CommitterWatermark {
904                checkpoint_hi_inclusive: 1,
905                ..Default::default()
906            },
907        )
908        .await
909        .unwrap();
910        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
911            .await
912            .unwrap();
913
914        let indexer_args = IndexerArgs::default();
915        let temp_dir = tempfile::tempdir().unwrap();
916        let client_args = ClientArgs {
917            ingestion: IngestionClientArgs {
918                local_ingestion_path: Some(temp_dir.path().to_owned()),
919                ..Default::default()
920            },
921            ..Default::default()
922        };
923        let ingestion_config = IngestionConfig::default();
924
925        let mut indexer = Indexer::new(
926            store,
927            indexer_args,
928            client_args,
929            ingestion_config,
930            None,
931            &registry,
932        )
933        .await
934        .unwrap();
935
936        indexer
937            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
938            .await
939            .unwrap();
940        indexer
941            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
942            .await
943            .unwrap();
944        indexer
945            .sequential_pipeline::<C>(C, SequentialConfig::default())
946            .await
947            .unwrap();
948        indexer
949            .sequential_pipeline::<D>(D, SequentialConfig::default())
950            .await
951            .unwrap();
952
953        assert_eq!(indexer.next_checkpoint, 1);
954    }
955
956    /// next_checkpoint is first_checkpoint when at least one pipeline has no
957    /// watermark, and first_checkpoint is smallest.
958    #[tokio::test]
959    async fn test_next_checkpoint_first_checkpoint_and_no_watermark() {
960        let registry = Registry::new();
961        let store = MockStore::default();
962
963        test_pipeline!(A, "concurrent_a");
964        test_pipeline!(B, "concurrent_b");
965        test_pipeline!(C, "sequential_c");
966        test_pipeline!(D, "sequential_d");
967
968        let mut conn = store.connect().await.unwrap();
969        conn.set_committer_watermark(
970            B::NAME,
971            CommitterWatermark {
972                checkpoint_hi_inclusive: 50,
973                ..Default::default()
974            },
975        )
976        .await
977        .unwrap();
978        conn.set_committer_watermark(
979            C::NAME,
980            CommitterWatermark {
981                checkpoint_hi_inclusive: 10,
982                ..Default::default()
983            },
984        )
985        .await
986        .unwrap();
987
988        let indexer_args = IndexerArgs {
989            first_checkpoint: Some(5),
990            ..Default::default()
991        };
992        let temp_dir = tempfile::tempdir().unwrap();
993        let client_args = ClientArgs {
994            ingestion: IngestionClientArgs {
995                local_ingestion_path: Some(temp_dir.path().to_owned()),
996                ..Default::default()
997            },
998            ..Default::default()
999        };
1000        let ingestion_config = IngestionConfig::default();
1001
1002        let mut indexer = Indexer::new(
1003            store,
1004            indexer_args,
1005            client_args,
1006            ingestion_config,
1007            None,
1008            &registry,
1009        )
1010        .await
1011        .unwrap();
1012
1013        indexer
1014            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1015            .await
1016            .unwrap();
1017        indexer
1018            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1019            .await
1020            .unwrap();
1021        indexer
1022            .sequential_pipeline::<C>(C, SequentialConfig::default())
1023            .await
1024            .unwrap();
1025        indexer
1026            .sequential_pipeline::<D>(D, SequentialConfig::default())
1027            .await
1028            .unwrap();
1029
1030        assert_eq!(indexer.first_checkpoint, Some(5));
1031        assert_eq!(indexer.last_checkpoint, None);
1032        assert_eq!(indexer.next_checkpoint, 5);
1033        assert_eq!(indexer.next_sequential_checkpoint, Some(5));
1034    }
1035
1036    /// next_checkpoint is smallest among existing watermarks + 1 if
1037    /// all pipelines have watermarks (ignores first_checkpoint).
1038    #[tokio::test]
1039    async fn test_next_checkpoint_ignore_first_checkpoint() {
1040        let registry = Registry::new();
1041        let store = MockStore::default();
1042
1043        test_pipeline!(B, "concurrent_b");
1044        test_pipeline!(C, "sequential_c");
1045
1046        let mut conn = store.connect().await.unwrap();
1047        conn.set_committer_watermark(
1048            B::NAME,
1049            CommitterWatermark {
1050                checkpoint_hi_inclusive: 50,
1051                ..Default::default()
1052            },
1053        )
1054        .await
1055        .unwrap();
1056        conn.set_committer_watermark(
1057            C::NAME,
1058            CommitterWatermark {
1059                checkpoint_hi_inclusive: 10,
1060                ..Default::default()
1061            },
1062        )
1063        .await
1064        .unwrap();
1065
1066        let indexer_args = IndexerArgs {
1067            first_checkpoint: Some(5),
1068            ..Default::default()
1069        };
1070        let temp_dir = tempfile::tempdir().unwrap();
1071        let client_args = ClientArgs {
1072            ingestion: IngestionClientArgs {
1073                local_ingestion_path: Some(temp_dir.path().to_owned()),
1074                ..Default::default()
1075            },
1076            ..Default::default()
1077        };
1078        let ingestion_config = IngestionConfig::default();
1079
1080        let mut indexer = Indexer::new(
1081            store,
1082            indexer_args,
1083            client_args,
1084            ingestion_config,
1085            None,
1086            &registry,
1087        )
1088        .await
1089        .unwrap();
1090
1091        indexer
1092            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1093            .await
1094            .unwrap();
1095        indexer
1096            .sequential_pipeline::<C>(C, SequentialConfig::default())
1097            .await
1098            .unwrap();
1099
1100        assert_eq!(indexer.first_checkpoint, Some(5));
1101        assert_eq!(indexer.last_checkpoint, None);
1102        assert_eq!(indexer.next_checkpoint, 11);
1103        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1104    }
1105
1106    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1107    /// will not be used as the starting point if it is not the smallest valid committer watermark
1108    /// to resume ingesting from.
1109    #[tokio::test]
1110    async fn test_next_checkpoint_large_first_checkpoint() {
1111        let registry = Registry::new();
1112        let store = MockStore::default();
1113
1114        test_pipeline!(A, "concurrent_a");
1115        test_pipeline!(B, "concurrent_b");
1116        test_pipeline!(C, "sequential_c");
1117
1118        let mut conn = store.connect().await.unwrap();
1119        conn.set_committer_watermark(
1120            B::NAME,
1121            CommitterWatermark {
1122                checkpoint_hi_inclusive: 50,
1123                ..Default::default()
1124            },
1125        )
1126        .await
1127        .unwrap();
1128        conn.set_committer_watermark(
1129            C::NAME,
1130            CommitterWatermark {
1131                checkpoint_hi_inclusive: 10,
1132                ..Default::default()
1133            },
1134        )
1135        .await
1136        .unwrap();
1137
1138        let indexer_args = IndexerArgs {
1139            first_checkpoint: Some(24),
1140            ..Default::default()
1141        };
1142        let temp_dir = tempfile::tempdir().unwrap();
1143        let client_args = ClientArgs {
1144            ingestion: IngestionClientArgs {
1145                local_ingestion_path: Some(temp_dir.path().to_owned()),
1146                ..Default::default()
1147            },
1148            ..Default::default()
1149        };
1150        let ingestion_config = IngestionConfig::default();
1151
1152        let mut indexer = Indexer::new(
1153            store,
1154            indexer_args,
1155            client_args,
1156            ingestion_config,
1157            None,
1158            &registry,
1159        )
1160        .await
1161        .unwrap();
1162
1163        indexer
1164            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1165            .await
1166            .unwrap();
1167
1168        indexer
1169            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1170            .await
1171            .unwrap();
1172
1173        indexer
1174            .sequential_pipeline::<C>(C, SequentialConfig::default())
1175            .await
1176            .unwrap();
1177
1178        assert_eq!(indexer.first_checkpoint, Some(24));
1179        assert_eq!(indexer.last_checkpoint, None);
1180        assert_eq!(indexer.next_checkpoint, 11);
1181        assert_eq!(indexer.next_sequential_checkpoint, Some(11));
1182    }
1183
1184    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1185    #[tokio::test]
1186    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1187        let registry = Registry::new();
1188        let store = MockStore::default();
1189
1190        test_pipeline!(A, "concurrent_a");
1191        test_pipeline!(B, "concurrent_b");
1192        test_pipeline!(C, "sequential_c");
1193        test_pipeline!(D, "sequential_d");
1194
1195        let mut conn = store.connect().await.unwrap();
1196        conn.set_committer_watermark(
1197            A::NAME,
1198            CommitterWatermark {
1199                checkpoint_hi_inclusive: 5,
1200                ..Default::default()
1201            },
1202        )
1203        .await
1204        .unwrap();
1205        conn.set_committer_watermark(
1206            B::NAME,
1207            CommitterWatermark {
1208                checkpoint_hi_inclusive: 10,
1209                ..Default::default()
1210            },
1211        )
1212        .await
1213        .unwrap();
1214        conn.set_committer_watermark(
1215            C::NAME,
1216            CommitterWatermark {
1217                checkpoint_hi_inclusive: 15,
1218                ..Default::default()
1219            },
1220        )
1221        .await
1222        .unwrap();
1223        conn.set_committer_watermark(
1224            D::NAME,
1225            CommitterWatermark {
1226                checkpoint_hi_inclusive: 20,
1227                ..Default::default()
1228            },
1229        )
1230        .await
1231        .unwrap();
1232
1233        // Create synthetic ingestion data
1234        let temp_dir = tempfile::tempdir().unwrap();
1235        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1236            ingestion_dir: temp_dir.path().to_owned(),
1237            starting_checkpoint: 0,
1238            num_checkpoints: 30,
1239            checkpoint_size: 1,
1240        })
1241        .await;
1242
1243        let indexer_args = IndexerArgs {
1244            last_checkpoint: Some(29),
1245            ..Default::default()
1246        };
1247
1248        let client_args = ClientArgs {
1249            ingestion: IngestionClientArgs {
1250                local_ingestion_path: Some(temp_dir.path().to_owned()),
1251                ..Default::default()
1252            },
1253            ..Default::default()
1254        };
1255
1256        let ingestion_config = IngestionConfig::default();
1257
1258        let mut indexer = Indexer::new(
1259            store.clone(),
1260            indexer_args,
1261            client_args,
1262            ingestion_config,
1263            None,
1264            &registry,
1265        )
1266        .await
1267        .unwrap();
1268
1269        indexer
1270            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1271            .await
1272            .unwrap();
1273        indexer
1274            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1275            .await
1276            .unwrap();
1277        indexer
1278            .sequential_pipeline::<C>(C, SequentialConfig::default())
1279            .await
1280            .unwrap();
1281        indexer
1282            .sequential_pipeline::<D>(D, SequentialConfig::default())
1283            .await
1284            .unwrap();
1285
1286        let ingestion_metrics = indexer.ingestion_metrics().clone();
1287        let indexer_metrics = indexer.indexer_metrics().clone();
1288
1289        indexer.run().await.unwrap().join().await.unwrap();
1290
1291        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1292        assert_eq!(
1293            indexer_metrics
1294                .total_watermarks_out_of_order
1295                .get_metric_with_label_values(&[A::NAME])
1296                .unwrap()
1297                .get(),
1298            0
1299        );
1300        assert_eq!(
1301            indexer_metrics
1302                .total_watermarks_out_of_order
1303                .get_metric_with_label_values(&[B::NAME])
1304                .unwrap()
1305                .get(),
1306            5
1307        );
1308        assert_eq!(
1309            indexer_metrics
1310                .total_watermarks_out_of_order
1311                .get_metric_with_label_values(&[C::NAME])
1312                .unwrap()
1313                .get(),
1314            10
1315        );
1316        assert_eq!(
1317            indexer_metrics
1318                .total_watermarks_out_of_order
1319                .get_metric_with_label_values(&[D::NAME])
1320                .unwrap()
1321                .get(),
1322            15
1323        );
1324    }
1325
1326    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1327    #[tokio::test]
1328    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1329        let registry = Registry::new();
1330        let store = MockStore::default();
1331
1332        test_pipeline!(A, "concurrent_a");
1333        test_pipeline!(B, "concurrent_b");
1334        test_pipeline!(C, "sequential_c");
1335        test_pipeline!(D, "sequential_d");
1336
1337        let mut conn = store.connect().await.unwrap();
1338        conn.set_committer_watermark(
1339            A::NAME,
1340            CommitterWatermark {
1341                checkpoint_hi_inclusive: 5,
1342                ..Default::default()
1343            },
1344        )
1345        .await
1346        .unwrap();
1347        conn.set_committer_watermark(
1348            B::NAME,
1349            CommitterWatermark {
1350                checkpoint_hi_inclusive: 10,
1351                ..Default::default()
1352            },
1353        )
1354        .await
1355        .unwrap();
1356        conn.set_committer_watermark(
1357            C::NAME,
1358            CommitterWatermark {
1359                checkpoint_hi_inclusive: 15,
1360                ..Default::default()
1361            },
1362        )
1363        .await
1364        .unwrap();
1365        conn.set_committer_watermark(
1366            D::NAME,
1367            CommitterWatermark {
1368                checkpoint_hi_inclusive: 20,
1369                ..Default::default()
1370            },
1371        )
1372        .await
1373        .unwrap();
1374
1375        // Create synthetic ingestion data
1376        let temp_dir = tempfile::tempdir().unwrap();
1377        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1378            ingestion_dir: temp_dir.path().to_owned(),
1379            starting_checkpoint: 0,
1380            num_checkpoints: 30,
1381            checkpoint_size: 1,
1382        })
1383        .await;
1384
1385        let indexer_args = IndexerArgs {
1386            first_checkpoint: Some(3),
1387            last_checkpoint: Some(29),
1388            ..Default::default()
1389        };
1390
1391        let client_args = ClientArgs {
1392            ingestion: IngestionClientArgs {
1393                local_ingestion_path: Some(temp_dir.path().to_owned()),
1394                ..Default::default()
1395            },
1396            ..Default::default()
1397        };
1398
1399        let ingestion_config = IngestionConfig::default();
1400
1401        let mut indexer = Indexer::new(
1402            store.clone(),
1403            indexer_args,
1404            client_args,
1405            ingestion_config,
1406            None,
1407            &registry,
1408        )
1409        .await
1410        .unwrap();
1411
1412        indexer
1413            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1414            .await
1415            .unwrap();
1416        indexer
1417            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1418            .await
1419            .unwrap();
1420        indexer
1421            .sequential_pipeline::<C>(C, SequentialConfig::default())
1422            .await
1423            .unwrap();
1424        indexer
1425            .sequential_pipeline::<D>(D, SequentialConfig::default())
1426            .await
1427            .unwrap();
1428
1429        let ingestion_metrics = indexer.ingestion_metrics().clone();
1430        let metrics = indexer.indexer_metrics().clone();
1431        indexer.run().await.unwrap().join().await.unwrap();
1432
1433        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1434        assert_eq!(
1435            metrics
1436                .total_watermarks_out_of_order
1437                .get_metric_with_label_values(&[A::NAME])
1438                .unwrap()
1439                .get(),
1440            0
1441        );
1442        assert_eq!(
1443            metrics
1444                .total_watermarks_out_of_order
1445                .get_metric_with_label_values(&[B::NAME])
1446                .unwrap()
1447                .get(),
1448            5
1449        );
1450        assert_eq!(
1451            metrics
1452                .total_watermarks_out_of_order
1453                .get_metric_with_label_values(&[C::NAME])
1454                .unwrap()
1455                .get(),
1456            10
1457        );
1458        assert_eq!(
1459            metrics
1460                .total_watermarks_out_of_order
1461                .get_metric_with_label_values(&[D::NAME])
1462                .unwrap()
1463                .get(),
1464            15
1465        );
1466    }
1467
1468    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1469    #[tokio::test]
1470    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1471        let registry = Registry::new();
1472        let store = MockStore::default();
1473
1474        test_pipeline!(A, "concurrent_a");
1475        test_pipeline!(B, "concurrent_b");
1476        test_pipeline!(C, "sequential_c");
1477        test_pipeline!(D, "sequential_d");
1478
1479        let mut conn = store.connect().await.unwrap();
1480        conn.set_committer_watermark(
1481            B::NAME,
1482            CommitterWatermark {
1483                checkpoint_hi_inclusive: 10,
1484                ..Default::default()
1485            },
1486        )
1487        .await
1488        .unwrap();
1489        conn.set_committer_watermark(
1490            C::NAME,
1491            CommitterWatermark {
1492                checkpoint_hi_inclusive: 15,
1493                ..Default::default()
1494            },
1495        )
1496        .await
1497        .unwrap();
1498        conn.set_committer_watermark(
1499            D::NAME,
1500            CommitterWatermark {
1501                checkpoint_hi_inclusive: 20,
1502                ..Default::default()
1503            },
1504        )
1505        .await
1506        .unwrap();
1507
1508        // Create synthetic ingestion data
1509        let temp_dir = tempfile::tempdir().unwrap();
1510        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1511            ingestion_dir: temp_dir.path().to_owned(),
1512            starting_checkpoint: 0,
1513            num_checkpoints: 30,
1514            checkpoint_size: 1,
1515        })
1516        .await;
1517
1518        let indexer_args = IndexerArgs {
1519            last_checkpoint: Some(29),
1520            ..Default::default()
1521        };
1522
1523        let client_args = ClientArgs {
1524            ingestion: IngestionClientArgs {
1525                local_ingestion_path: Some(temp_dir.path().to_owned()),
1526                ..Default::default()
1527            },
1528            ..Default::default()
1529        };
1530
1531        let ingestion_config = IngestionConfig::default();
1532
1533        let mut indexer = Indexer::new(
1534            store.clone(),
1535            indexer_args,
1536            client_args,
1537            ingestion_config,
1538            None,
1539            &registry,
1540        )
1541        .await
1542        .unwrap();
1543
1544        indexer
1545            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1546            .await
1547            .unwrap();
1548        indexer
1549            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1550            .await
1551            .unwrap();
1552        indexer
1553            .sequential_pipeline::<C>(C, SequentialConfig::default())
1554            .await
1555            .unwrap();
1556        indexer
1557            .sequential_pipeline::<D>(D, SequentialConfig::default())
1558            .await
1559            .unwrap();
1560
1561        let ingestion_metrics = indexer.ingestion_metrics().clone();
1562        let metrics = indexer.indexer_metrics().clone();
1563        indexer.run().await.unwrap().join().await.unwrap();
1564
1565        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1566        assert_eq!(
1567            metrics
1568                .total_watermarks_out_of_order
1569                .get_metric_with_label_values(&[A::NAME])
1570                .unwrap()
1571                .get(),
1572            0
1573        );
1574        assert_eq!(
1575            metrics
1576                .total_watermarks_out_of_order
1577                .get_metric_with_label_values(&[B::NAME])
1578                .unwrap()
1579                .get(),
1580            11
1581        );
1582        assert_eq!(
1583            metrics
1584                .total_watermarks_out_of_order
1585                .get_metric_with_label_values(&[C::NAME])
1586                .unwrap()
1587                .get(),
1588            16
1589        );
1590        assert_eq!(
1591            metrics
1592                .total_watermarks_out_of_order
1593                .get_metric_with_label_values(&[D::NAME])
1594                .unwrap()
1595                .get(),
1596            21
1597        );
1598    }
1599
1600    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1601    #[tokio::test]
1602    async fn test_indexer_ingestion_use_first_checkpoint() {
1603        let registry = Registry::new();
1604        let store = MockStore::default();
1605
1606        test_pipeline!(A, "concurrent_a");
1607        test_pipeline!(B, "concurrent_b");
1608        test_pipeline!(C, "sequential_c");
1609        test_pipeline!(D, "sequential_d");
1610
1611        let mut conn = store.connect().await.unwrap();
1612        conn.set_committer_watermark(
1613            B::NAME,
1614            CommitterWatermark {
1615                checkpoint_hi_inclusive: 10,
1616                ..Default::default()
1617            },
1618        )
1619        .await
1620        .unwrap();
1621        conn.set_committer_watermark(
1622            C::NAME,
1623            CommitterWatermark {
1624                checkpoint_hi_inclusive: 15,
1625                ..Default::default()
1626            },
1627        )
1628        .await
1629        .unwrap();
1630        conn.set_committer_watermark(
1631            D::NAME,
1632            CommitterWatermark {
1633                checkpoint_hi_inclusive: 20,
1634                ..Default::default()
1635            },
1636        )
1637        .await
1638        .unwrap();
1639
1640        // Create synthetic ingestion data
1641        let temp_dir = tempfile::tempdir().unwrap();
1642        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1643            ingestion_dir: temp_dir.path().to_owned(),
1644            starting_checkpoint: 0,
1645            num_checkpoints: 30,
1646            checkpoint_size: 1,
1647        })
1648        .await;
1649
1650        let indexer_args = IndexerArgs {
1651            first_checkpoint: Some(10),
1652            last_checkpoint: Some(29),
1653            ..Default::default()
1654        };
1655
1656        let client_args = ClientArgs {
1657            ingestion: IngestionClientArgs {
1658                local_ingestion_path: Some(temp_dir.path().to_owned()),
1659                ..Default::default()
1660            },
1661            ..Default::default()
1662        };
1663
1664        let ingestion_config = IngestionConfig::default();
1665
1666        let mut indexer = Indexer::new(
1667            store.clone(),
1668            indexer_args,
1669            client_args,
1670            ingestion_config,
1671            None,
1672            &registry,
1673        )
1674        .await
1675        .unwrap();
1676
1677        indexer
1678            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1679            .await
1680            .unwrap();
1681        indexer
1682            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1683            .await
1684            .unwrap();
1685        indexer
1686            .sequential_pipeline::<C>(C, SequentialConfig::default())
1687            .await
1688            .unwrap();
1689        indexer
1690            .sequential_pipeline::<D>(D, SequentialConfig::default())
1691            .await
1692            .unwrap();
1693
1694        let ingestion_metrics = indexer.ingestion_metrics().clone();
1695        let metrics = indexer.indexer_metrics().clone();
1696        indexer.run().await.unwrap().join().await.unwrap();
1697
1698        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1699        assert_eq!(
1700            metrics
1701                .total_watermarks_out_of_order
1702                .get_metric_with_label_values(&[A::NAME])
1703                .unwrap()
1704                .get(),
1705            0
1706        );
1707        assert_eq!(
1708            metrics
1709                .total_watermarks_out_of_order
1710                .get_metric_with_label_values(&[B::NAME])
1711                .unwrap()
1712                .get(),
1713            1
1714        );
1715        assert_eq!(
1716            metrics
1717                .total_watermarks_out_of_order
1718                .get_metric_with_label_values(&[C::NAME])
1719                .unwrap()
1720                .get(),
1721            6
1722        );
1723        assert_eq!(
1724            metrics
1725                .total_watermarks_out_of_order
1726                .get_metric_with_label_values(&[D::NAME])
1727                .unwrap()
1728                .get(),
1729            11
1730        );
1731    }
1732
1733    #[tokio::test]
1734    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1735        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1736        assert_eq!(committer_watermark, None);
1737        assert_eq!(pruner_watermark, None);
1738    }
1739
1740    #[tokio::test]
1741    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1742        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1743        assert_eq!(committer_watermark, None);
1744        assert_eq!(pruner_watermark, None);
1745    }
1746
1747    #[tokio::test]
1748    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1749        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1750
1751        let committer_watermark = committer_watermark.unwrap();
1752        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1753
1754        let pruner_watermark = pruner_watermark.unwrap();
1755        assert_eq!(pruner_watermark.reader_lo, 1);
1756        assert_eq!(pruner_watermark.pruner_hi, 1);
1757    }
1758
1759    #[tokio::test]
1760    async fn test_init_watermark_sequential() {
1761        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1762
1763        let committer_watermark = committer_watermark.unwrap();
1764        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1765
1766        let pruner_watermark = pruner_watermark.unwrap();
1767        assert_eq!(pruner_watermark.reader_lo, 1);
1768        assert_eq!(pruner_watermark.pruner_hi, 1);
1769    }
1770
1771    #[tokio::test]
1772    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1773        let registry = Registry::new();
1774        let store = MockStore::default();
1775
1776        // Set up different watermarks for three different sequential pipelines
1777        let mut conn = store.connect().await.unwrap();
1778
1779        // First handler at checkpoint 10
1780        conn.set_committer_watermark(
1781            MockHandler::NAME,
1782            CommitterWatermark {
1783                epoch_hi_inclusive: 0,
1784                checkpoint_hi_inclusive: 10,
1785                tx_hi: 20,
1786                timestamp_ms_hi_inclusive: 10000,
1787            },
1788        )
1789        .await
1790        .unwrap();
1791
1792        // SequentialHandler at checkpoint 5
1793        conn.set_committer_watermark(
1794            SequentialHandler::NAME,
1795            CommitterWatermark {
1796                epoch_hi_inclusive: 0,
1797                checkpoint_hi_inclusive: 5,
1798                tx_hi: 10,
1799                timestamp_ms_hi_inclusive: 5000,
1800            },
1801        )
1802        .await
1803        .unwrap();
1804
1805        // Create synthetic ingestion data
1806        let temp_dir = tempfile::tempdir().unwrap();
1807        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1808            ingestion_dir: temp_dir.path().to_owned(),
1809            starting_checkpoint: 0,
1810            num_checkpoints: 20,
1811            checkpoint_size: 2,
1812        })
1813        .await;
1814
1815        let indexer_args = IndexerArgs {
1816            first_checkpoint: None,
1817            last_checkpoint: Some(19),
1818            pipeline: vec![],
1819            ..Default::default()
1820        };
1821
1822        let client_args = ClientArgs {
1823            ingestion: IngestionClientArgs {
1824                local_ingestion_path: Some(temp_dir.path().to_owned()),
1825                ..Default::default()
1826            },
1827            ..Default::default()
1828        };
1829
1830        let ingestion_config = IngestionConfig::default();
1831
1832        let mut indexer = Indexer::new(
1833            store.clone(),
1834            indexer_args,
1835            client_args,
1836            ingestion_config,
1837            None,
1838            &registry,
1839        )
1840        .await
1841        .unwrap();
1842
1843        // Add first sequential pipeline
1844        indexer
1845            .sequential_pipeline(MockHandler, SequentialConfig::default())
1846            .await
1847            .unwrap();
1848
1849        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1850        assert_eq!(
1851            indexer.next_sequential_checkpoint(),
1852            Some(11),
1853            "next_sequential_checkpoint should be 11"
1854        );
1855
1856        // Add second sequential pipeline
1857        indexer
1858            .sequential_pipeline(SequentialHandler, SequentialConfig::default())
1859            .await
1860            .unwrap();
1861
1862        // Should change to 6 (minimum of 6 and 11)
1863        assert_eq!(
1864            indexer.next_sequential_checkpoint(),
1865            Some(6),
1866            "next_sequential_checkpoint should still be 6"
1867        );
1868
1869        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1870        indexer.run().await.unwrap().join().await.unwrap();
1871
1872        // Verify each pipeline made some progress independently
1873        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1874        let watermark2 = conn
1875            .committer_watermark(SequentialHandler::NAME)
1876            .await
1877            .unwrap();
1878
1879        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1880        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1881    }
1882
1883    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1884    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1885    /// committing checkpoints less than the main pipeline's reader watermark.
1886    #[tokio::test]
1887    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1888        let registry = Registry::new();
1889        let store = MockStore::default();
1890
1891        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1892        // reader watermark at `7`.
1893        let mut conn = store.connect().await.unwrap();
1894        conn.set_committer_watermark(
1895            MockCheckpointSequenceNumberHandler::NAME,
1896            CommitterWatermark {
1897                checkpoint_hi_inclusive: 10,
1898                ..Default::default()
1899            },
1900        )
1901        .await
1902        .unwrap();
1903        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1904            .await
1905            .unwrap();
1906
1907        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1908        // be ignored by the tasked indexer.
1909        let indexer_args = IndexerArgs {
1910            first_checkpoint: Some(0),
1911            last_checkpoint: Some(15),
1912            task: TaskArgs::tasked("task".to_string(), 10),
1913            ..Default::default()
1914        };
1915        let temp_dir = tempfile::tempdir().unwrap();
1916        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1917            ingestion_dir: temp_dir.path().to_owned(),
1918            starting_checkpoint: 0,
1919            num_checkpoints: 16,
1920            checkpoint_size: 2,
1921        })
1922        .await;
1923
1924        let client_args = ClientArgs {
1925            ingestion: IngestionClientArgs {
1926                local_ingestion_path: Some(temp_dir.path().to_owned()),
1927                ..Default::default()
1928            },
1929            ..Default::default()
1930        };
1931
1932        let ingestion_config = IngestionConfig::default();
1933
1934        let mut tasked_indexer = Indexer::new(
1935            store.clone(),
1936            indexer_args,
1937            client_args,
1938            ingestion_config,
1939            None,
1940            &registry,
1941        )
1942        .await
1943        .unwrap();
1944
1945        let _ = tasked_indexer
1946            .concurrent_pipeline(
1947                MockCheckpointSequenceNumberHandler,
1948                ConcurrentConfig::default(),
1949            )
1950            .await;
1951
1952        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1953        let metrics = tasked_indexer.indexer_metrics().clone();
1954
1955        tasked_indexer.run().await.unwrap().join().await.unwrap();
1956
1957        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1958        assert_eq!(
1959            metrics
1960                .total_collector_skipped_checkpoints
1961                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1962                .unwrap()
1963                .get(),
1964            7
1965        );
1966        let data = store
1967            .data
1968            .get(MockCheckpointSequenceNumberHandler::NAME)
1969            .unwrap();
1970        assert_eq!(data.len(), 9);
1971        for i in 0..7 {
1972            assert!(data.get(&i).is_none());
1973        }
1974        for i in 7..16 {
1975            assert!(data.get(&i).is_some());
1976        }
1977    }
1978
1979    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1980    #[tokio::test]
1981    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1982        let registry = Registry::new();
1983        let store = MockStore::default();
1984
1985        let mut conn = store.connect().await.unwrap();
1986        conn.set_committer_watermark(
1987            "test",
1988            CommitterWatermark {
1989                checkpoint_hi_inclusive: 10,
1990                ..Default::default()
1991            },
1992        )
1993        .await
1994        .unwrap();
1995        conn.set_reader_watermark("test", 5).await.unwrap();
1996
1997        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1998        // watermarks.
1999        let indexer_args = IndexerArgs {
2000            first_checkpoint: Some(9),
2001            last_checkpoint: Some(25),
2002            task: TaskArgs::tasked("task".to_string(), 10),
2003            ..Default::default()
2004        };
2005        let temp_dir = tempfile::tempdir().unwrap();
2006        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2007            ingestion_dir: temp_dir.path().to_owned(),
2008            starting_checkpoint: 0,
2009            num_checkpoints: 26,
2010            checkpoint_size: 2,
2011        })
2012        .await;
2013
2014        let client_args = ClientArgs {
2015            ingestion: IngestionClientArgs {
2016                local_ingestion_path: Some(temp_dir.path().to_owned()),
2017                ..Default::default()
2018            },
2019            ..Default::default()
2020        };
2021
2022        let ingestion_config = IngestionConfig::default();
2023
2024        let mut tasked_indexer = Indexer::new(
2025            store.clone(),
2026            indexer_args,
2027            client_args,
2028            ingestion_config,
2029            None,
2030            &registry,
2031        )
2032        .await
2033        .unwrap();
2034
2035        let _ = tasked_indexer
2036            .concurrent_pipeline(
2037                MockCheckpointSequenceNumberHandler,
2038                ConcurrentConfig::default(),
2039            )
2040            .await;
2041
2042        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2043        let metrics = tasked_indexer.indexer_metrics().clone();
2044
2045        tasked_indexer.run().await.unwrap().join().await.unwrap();
2046
2047        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2048        assert_eq!(
2049            metrics
2050                .total_watermarks_out_of_order
2051                .get_metric_with_label_values(&["test"])
2052                .unwrap()
2053                .get(),
2054            0
2055        );
2056        assert_eq!(
2057            metrics
2058                .total_collector_skipped_checkpoints
2059                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2060                .unwrap()
2061                .get(),
2062            0
2063        );
2064
2065        let data = store.data.get("test").unwrap();
2066        assert_eq!(data.len(), 17);
2067        for i in 0..9 {
2068            assert!(data.get(&i).is_none());
2069        }
2070        for i in 9..26 {
2071            assert!(data.get(&i).is_some());
2072        }
2073        let main_pipeline_watermark = store.watermark("test").unwrap();
2074        // assert that the main pipeline's watermarks are not updated
2075        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, Some(10));
2076        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2077        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2078        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, Some(25));
2079        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2080    }
2081
2082    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2083    /// committed, and any checkpoints inflight < X will be skipped.
2084    #[tokio::test]
2085    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2086        let registry = Registry::new();
2087        let store = MockStore::default();
2088        let mut conn = store.connect().await.unwrap();
2089        // Set the main pipeline watermark.
2090        conn.set_committer_watermark(
2091            ControllableHandler::NAME,
2092            CommitterWatermark {
2093                checkpoint_hi_inclusive: 11,
2094                ..Default::default()
2095            },
2096        )
2097        .await
2098        .unwrap();
2099
2100        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2101        let temp_dir = tempfile::tempdir().unwrap();
2102        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2103            ingestion_dir: temp_dir.path().to_owned(),
2104            starting_checkpoint: 0,
2105            num_checkpoints: 501,
2106            checkpoint_size: 2,
2107        })
2108        .await;
2109        let indexer_args = IndexerArgs {
2110            first_checkpoint: Some(0),
2111            last_checkpoint: Some(500),
2112            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2113            ..Default::default()
2114        };
2115        let client_args = ClientArgs {
2116            ingestion: IngestionClientArgs {
2117                local_ingestion_path: Some(temp_dir.path().to_owned()),
2118                ..Default::default()
2119            },
2120            ..Default::default()
2121        };
2122        let ingestion_config = IngestionConfig::default();
2123        let mut tasked_indexer = Indexer::new(
2124            store.clone(),
2125            indexer_args,
2126            client_args,
2127            ingestion_config,
2128            None,
2129            &registry,
2130        )
2131        .await
2132        .unwrap();
2133        let mut allow_process = 10;
2134        // Limit the pipeline to process only checkpoints `[0, 10]`.
2135        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2136        let _ = tasked_indexer
2137            .concurrent_pipeline(
2138                controllable_handler,
2139                ConcurrentConfig {
2140                    committer: CommitterConfig {
2141                        collect_interval_ms: 10,
2142                        watermark_interval_ms: 10,
2143                        ..Default::default()
2144                    },
2145                    // High fixed concurrency so all checkpoints can be processed
2146                    // concurrently despite out-of-order arrival.
2147                    fanout: Some(ConcurrencyConfig::Fixed { value: 501 }),
2148                    ..Default::default()
2149                },
2150            )
2151            .await;
2152        let metrics = tasked_indexer.indexer_metrics().clone();
2153
2154        let mut s_indexer = tasked_indexer.run().await.unwrap();
2155
2156        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2157        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2158        // committed.
2159        store
2160            .wait_for_watermark(
2161                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2162                10,
2163                Duration::from_secs(10),
2164            )
2165            .await;
2166
2167        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2168        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2169        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2170        // one at a time until the collector_reader_lo metric shows the new value.
2171        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2172            .await
2173            .unwrap();
2174
2175        let reader_lo = metrics
2176            .collector_reader_lo
2177            .with_label_values(&[ControllableHandler::NAME]);
2178
2179        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2180        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2181        // checkpoints have been processed.
2182        let mut interval = tokio::time::interval(Duration::from_millis(10));
2183        while reader_lo.get() != 250 {
2184            interval.tick().await;
2185            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2186            allow_process += 1;
2187            assert!(
2188                allow_process <= 500,
2189                "Released all checkpoints but collector never observed new reader_lo"
2190            );
2191            process_below.send(allow_process).ok();
2192        }
2193
2194        // At this point, the collector has observed reader_lo = 250. Release all remaining
2195        // checkpoints. Guarantees:
2196        // - [0, 10]: committed (before reader_lo was set)
2197        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2198        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2199        // - [250, 500]: committed (>= reader_lo)
2200        process_below.send(500).ok();
2201
2202        s_indexer.join().await.unwrap();
2203
2204        let data = store.data.get(ControllableHandler::NAME).unwrap();
2205
2206        // Checkpoints (allow_process, 250) must be skipped.
2207        for chkpt in (allow_process + 1)..250 {
2208            assert!(
2209                data.get(&chkpt).is_none(),
2210                "Checkpoint {chkpt} should have been skipped"
2211            );
2212        }
2213
2214        // Checkpoints >= reader_lo must be committed.
2215        for chkpt in 250..=500 {
2216            assert!(
2217                data.get(&chkpt).is_some(),
2218                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2219            );
2220        }
2221
2222        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2223        for chkpt in 0..=10 {
2224            assert!(
2225                data.get(&chkpt).is_some(),
2226                "Checkpoint {chkpt} should have been committed (baseline)"
2227            );
2228        }
2229    }
2230}