sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeSet;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context;
9use anyhow::bail;
10use anyhow::ensure;
11use ingestion::ClientArgs;
12use ingestion::IngestionConfig;
13use ingestion::IngestionService;
14use ingestion::ingestion_client::IngestionClient;
15use metrics::IndexerMetrics;
16use prometheus::Registry;
17use sui_indexer_alt_framework_store_traits::Connection;
18use sui_indexer_alt_framework_store_traits::Store;
19use sui_indexer_alt_framework_store_traits::TransactionalStore;
20use sui_indexer_alt_framework_store_traits::pipeline_task;
21use tracing::info;
22
23use crate::metrics::IngestionMetrics;
24use crate::pipeline::Processor;
25use crate::pipeline::concurrent::ConcurrentConfig;
26use crate::pipeline::concurrent::{self};
27use crate::pipeline::sequential::Handler;
28use crate::pipeline::sequential::SequentialConfig;
29use crate::pipeline::sequential::{self};
30use crate::service::Service;
31
32pub use anyhow::Result;
33pub use sui_field_count::FieldCount;
34pub use sui_futures::service;
35/// External users access the store trait through framework::store
36pub use sui_indexer_alt_framework_store_traits as store;
37pub use sui_types as types;
38
39#[cfg(feature = "cluster")]
40pub mod cluster;
41pub mod ingestion;
42pub mod metrics;
43pub mod pipeline;
44#[cfg(feature = "postgres")]
45pub mod postgres;
46
47#[cfg(test)]
48pub mod mocks;
49
50/// Command-line arguments for the indexer
51#[derive(clap::Args, Default, Debug, Clone)]
52pub struct IndexerArgs {
53    /// Override the next checkpoint for all pipelines without a committer watermark to start
54    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
55    /// setting and always resume from their committer watermark + 1.
56    ///
57    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
58    /// is the minimum across all pipelines' next checkpoints.
59    #[arg(long)]
60    pub first_checkpoint: Option<u64>,
61
62    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
63    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
64    #[arg(long)]
65    pub last_checkpoint: Option<u64>,
66
67    /// Only run the following pipelines. If not provided, all pipelines found in the
68    /// configuration file will be run.
69    #[arg(long, action = clap::ArgAction::Append)]
70    pub pipeline: Vec<String>,
71
72    /// Additional configurations for running a tasked indexer.
73    #[clap(flatten)]
74    pub task: TaskArgs,
75}
76
77/// Command-line arguments for configuring a tasked indexer.
78#[derive(clap::Parser, Default, Debug, Clone)]
79pub struct TaskArgs {
80    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
81    /// delimiter defined on the store. This allows the same pipelines to run under multiple
82    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
83    /// entries in the database.
84    ///
85    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
86    ///
87    /// Sequential pipelines cannot be attached to a tasked indexer.
88    ///
89    /// The framework ensures that tasked pipelines never commit checkpoints below the main
90    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
91    #[arg(long, requires = "reader_interval_ms")]
92    task: Option<String>,
93
94    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
95    /// refetch its main pipeline's reader watermark.
96    ///
97    /// This is required when `--task` is set and should should ideally be set to a value that is
98    /// an order of magnitude smaller than the main pipeline's pruning interval, to ensure this
99    /// task pipeline can pick up the new reader watermark before the main pipeline prunes up to
100    /// it.
101    ///
102    /// If the main pipeline does not have pruning enabled, this value can be set to some high
103    /// value, as the tasked pipeline will never see an updated reader watermark.
104    #[arg(long, requires = "task")]
105    reader_interval_ms: Option<u64>,
106}
107
108pub struct Indexer<S: Store> {
109    /// The storage backend that the indexer uses to write and query indexed data. This
110    /// generic implementation allows for plugging in different storage solutions that implement the
111    /// `Store` trait.
112    store: S,
113
114    /// Prometheus Metrics.
115    metrics: Arc<IndexerMetrics>,
116
117    /// Service for downloading and disseminating checkpoint data.
118    ingestion_service: IngestionService,
119
120    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
121    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
122    /// always resume from their committer watermark + 1.
123    ///
124    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
125    /// is the minimum across all pipelines' next checkpoints.
126    default_next_checkpoint: u64,
127
128    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
129    /// this checkpoint.
130    last_checkpoint: Option<u64>,
131
132    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
133    /// delimiter defined on the store. This allows the same pipelines to run under multiple
134    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
135    /// entries in the database.
136    ///
137    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
138    ///
139    /// Sequential pipelines cannot be attached to a tasked indexer.
140    ///
141    /// The framework ensures that tasked pipelines never commit checkpoints below the main
142    /// pipeline’s pruner watermark.
143    task: Option<Task>,
144
145    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
146    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
147    /// a warning when the indexer is run.
148    enabled_pipelines: Option<BTreeSet<String>>,
149
150    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
151    /// with the same name isn't added twice.
152    added_pipelines: BTreeSet<&'static str>,
153
154    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
155    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
156    /// unless overridden by [Self::default_next_checkpoint].
157    first_ingestion_checkpoint: u64,
158
159    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
160    pipelines: Vec<Service>,
161}
162
163/// Configuration for a tasked indexer.
164#[derive(Clone)]
165pub(crate) struct Task {
166    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
167    /// record pipeline watermarks.
168    task: String,
169    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
170    /// pipeline's reader watermark.
171    reader_interval: Duration,
172}
173
174impl TaskArgs {
175    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
176        Self {
177            task: Some(task),
178            reader_interval_ms: Some(reader_interval_ms),
179        }
180    }
181
182    fn into_task(self) -> Option<Task> {
183        Some(Task {
184            task: self.task?,
185            reader_interval: Duration::from_millis(self.reader_interval_ms?),
186        })
187    }
188}
189
190impl<S: Store> Indexer<S> {
191    /// Create a new instance of the indexer framework from a store that implements the `Store`
192    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
193    /// arguments configure the following:
194    ///
195    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
196    ///   watermarks) and where to serve metrics from,
197    /// - Where to download checkpoints from,
198    /// - Concurrency and buffering parameters for downloading checkpoints.
199    ///
200    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
201    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
202    pub async fn new(
203        store: S,
204        indexer_args: IndexerArgs,
205        client_args: ClientArgs,
206        ingestion_config: IngestionConfig,
207        metrics_prefix: Option<&str>,
208        registry: &Registry,
209    ) -> Result<Self> {
210        let IndexerArgs {
211            first_checkpoint,
212            last_checkpoint,
213            pipeline,
214            task,
215        } = indexer_args;
216
217        let metrics = IndexerMetrics::new(metrics_prefix, registry);
218
219        let ingestion_service =
220            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
221
222        Ok(Self {
223            store,
224            metrics,
225            ingestion_service,
226            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
227            last_checkpoint,
228            task: task.into_task(),
229            enabled_pipelines: if pipeline.is_empty() {
230                None
231            } else {
232                Some(pipeline.into_iter().collect())
233            },
234            added_pipelines: BTreeSet::new(),
235            first_ingestion_checkpoint: u64::MAX,
236            pipelines: vec![],
237        })
238    }
239
240    /// The store used by the indexer.
241    pub fn store(&self) -> &S {
242        &self.store
243    }
244
245    /// The ingestion client used by the indexer to fetch checkpoints.
246    pub fn ingestion_client(&self) -> &IngestionClient {
247        self.ingestion_service.ingestion_client()
248    }
249
250    /// The indexer's metrics.
251    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
252        &self.metrics
253    }
254
255    /// The ingestion service's metrics.
256    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
257        self.ingestion_service.metrics()
258    }
259
260    /// The pipelines that this indexer will run.
261    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
262        self.added_pipelines.iter().copied().filter(|p| {
263            self.enabled_pipelines
264                .as_ref()
265                .is_none_or(|e| e.contains(*p))
266        })
267    }
268
269    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
270    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
271    ///
272    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
273    /// keep the watermark table up-to-date with the highest point they can guarantee all data
274    /// exists for, for their pipeline.
275    pub async fn concurrent_pipeline<H>(
276        &mut self,
277        handler: H,
278        config: ConcurrentConfig,
279    ) -> Result<()>
280    where
281        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
282    {
283        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
284            return Ok(());
285        };
286
287        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
288        self.pipelines.push(concurrent::pipeline::<H>(
289            handler,
290            next_checkpoint,
291            config,
292            self.store.clone(),
293            self.task.clone(),
294            checkpoint_rx,
295            commit_hi_tx,
296            self.metrics.clone(),
297        ));
298
299        Ok(())
300    }
301
302    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
303    /// will start processing and committing once the ingestion service has caught up to their
304    /// respective watermarks.
305    ///
306    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
307    pub async fn run(self) -> Result<Service> {
308        if let Some(enabled_pipelines) = self.enabled_pipelines {
309            ensure!(
310                enabled_pipelines.is_empty(),
311                "Tried to enable pipelines that this indexer does not know about: \
312                {enabled_pipelines:#?}",
313            );
314        }
315
316        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
317
318        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
319
320        let regulated = true;
321        let mut service = self
322            .ingestion_service
323            .run(self.first_ingestion_checkpoint..=last_checkpoint, regulated)
324            .await
325            .context("Failed to start ingestion service")?;
326
327        for pipeline in self.pipelines {
328            service = service.merge(pipeline);
329        }
330
331        Ok(service)
332    }
333
334    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
335    /// checkpoint after its watermark, or if that doesn't exist, then the provided
336    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
337    ///
338    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
339    /// calculated above.
340    ///
341    /// Returns `Ok(None)` if the pipeline is disabled.
342    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
343        ensure!(
344            self.added_pipelines.insert(P::NAME),
345            "Pipeline {:?} already added",
346            P::NAME,
347        );
348
349        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
350            && !enabled_pipelines.remove(P::NAME)
351        {
352            info!(pipeline = P::NAME, "Skipping");
353            return Ok(None);
354        }
355
356        let mut conn = self
357            .store
358            .connect()
359            .await
360            .context("Failed to establish connection to store")?;
361
362        let pipeline_task =
363            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
364
365        let checkpoint_hi_inclusive = conn
366            .init_watermark(&pipeline_task, self.default_next_checkpoint)
367            .await
368            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
369
370        let next_checkpoint =
371            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
372
373        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
374
375        Ok(Some(next_checkpoint))
376    }
377}
378
379impl<T: TransactionalStore> Indexer<T> {
380    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
381    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
382    ///
383    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
384    /// required to handle pipelines that modify data in-place (where each update is not an insert,
385    /// but could be a modification of an existing row, where ordering between updates is
386    /// important).
387    ///
388    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
389    /// number of checkpoints (configured by `checkpoint_lag`).
390    pub async fn sequential_pipeline<H>(
391        &mut self,
392        handler: H,
393        config: SequentialConfig,
394    ) -> Result<()>
395    where
396        H: Handler<Store = T> + Send + Sync + 'static,
397    {
398        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
399            return Ok(());
400        };
401
402        if self.task.is_some() {
403            bail!(
404                "Sequential pipelines do not support pipeline tasks. \
405                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
406                Running the same pipeline under a different task would violate these guarantees."
407            );
408        }
409
410        let (checkpoint_rx, commit_hi_tx) = self.ingestion_service.subscribe();
411
412        self.pipelines.push(sequential::pipeline::<H>(
413            handler,
414            next_checkpoint,
415            config,
416            self.store.clone(),
417            checkpoint_rx,
418            commit_hi_tx,
419            self.metrics.clone(),
420        ));
421
422        Ok(())
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use std::sync::Arc;
429
430    use async_trait::async_trait;
431    use clap::Parser;
432    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
433    use sui_synthetic_ingestion::synthetic_ingestion;
434    use tokio::sync::watch;
435
436    use crate::FieldCount;
437    use crate::ingestion::ingestion_client::IngestionClientArgs;
438    use crate::mocks::store::MockStore;
439    use crate::pipeline::CommitterConfig;
440    use crate::pipeline::Processor;
441    use crate::pipeline::concurrent::ConcurrentConfig;
442    use crate::store::CommitterWatermark;
443
444    use super::*;
445
446    #[allow(dead_code)]
447    #[derive(Clone, FieldCount)]
448    struct MockValue(u64);
449
450    /// A handler that can be controlled externally to block checkpoint processing.
451    struct ControllableHandler {
452        /// Process checkpoints less than or equal to this value.
453        process_below: watch::Receiver<u64>,
454    }
455
456    impl ControllableHandler {
457        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
458            let (tx, rx) = watch::channel(limit);
459            (Self { process_below: rx }, tx)
460        }
461    }
462
463    #[async_trait]
464    impl Processor for ControllableHandler {
465        const NAME: &'static str = "controllable";
466        /// The checkpoints to process come out of order. To account for potential flakiness in test
467        /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to
468        /// the total number of checkpoints to process, so we can ensure the correct checkpoints are
469        /// processed.
470        const FANOUT: usize = 501;
471        type Value = MockValue;
472
473        async fn process(
474            &self,
475            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
476        ) -> anyhow::Result<Vec<Self::Value>> {
477            let cp_num = checkpoint.summary.sequence_number;
478
479            // Wait until the checkpoint is allowed to be processed
480            self.process_below
481                .clone()
482                .wait_for(|&limit| cp_num <= limit)
483                .await
484                .ok();
485
486            Ok(vec![MockValue(cp_num)])
487        }
488    }
489
490    #[async_trait]
491    impl crate::pipeline::concurrent::Handler for ControllableHandler {
492        type Store = MockStore;
493        type Batch = Vec<MockValue>;
494
495        fn batch(
496            &self,
497            batch: &mut Self::Batch,
498            values: &mut std::vec::IntoIter<Self::Value>,
499        ) -> crate::pipeline::concurrent::BatchStatus {
500            batch.extend(values);
501            crate::pipeline::concurrent::BatchStatus::Ready
502        }
503
504        async fn commit<'a>(
505            &self,
506            batch: &Self::Batch,
507            conn: &mut <Self::Store as Store>::Connection<'a>,
508        ) -> anyhow::Result<usize> {
509            for value in batch {
510                conn.0
511                    .commit_data(Self::NAME, value.0, vec![value.0])
512                    .await?;
513            }
514            Ok(batch.len())
515        }
516    }
517
518    macro_rules! test_pipeline {
519        ($handler:ident, $name:literal) => {
520            struct $handler;
521
522            #[async_trait]
523            impl Processor for $handler {
524                const NAME: &'static str = $name;
525                type Value = MockValue;
526                async fn process(
527                    &self,
528                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
529                ) -> anyhow::Result<Vec<Self::Value>> {
530                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
531                }
532            }
533
534            #[async_trait]
535            impl crate::pipeline::concurrent::Handler for $handler {
536                type Store = MockStore;
537                type Batch = Vec<Self::Value>;
538
539                fn batch(
540                    &self,
541                    batch: &mut Self::Batch,
542                    values: &mut std::vec::IntoIter<Self::Value>,
543                ) -> crate::pipeline::concurrent::BatchStatus {
544                    batch.extend(values);
545                    crate::pipeline::concurrent::BatchStatus::Pending
546                }
547
548                async fn commit<'a>(
549                    &self,
550                    batch: &Self::Batch,
551                    conn: &mut <Self::Store as Store>::Connection<'a>,
552                ) -> anyhow::Result<usize> {
553                    for value in batch {
554                        conn.0
555                            .commit_data(Self::NAME, value.0, vec![value.0])
556                            .await?;
557                    }
558                    Ok(batch.len())
559                }
560            }
561
562            #[async_trait]
563            impl crate::pipeline::sequential::Handler for $handler {
564                type Store = MockStore;
565                type Batch = Vec<Self::Value>;
566
567                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
568                    batch.extend(values);
569                }
570
571                async fn commit<'a>(
572                    &self,
573                    _batch: &Self::Batch,
574                    _conn: &mut <Self::Store as Store>::Connection<'a>,
575                ) -> anyhow::Result<usize> {
576                    Ok(1)
577                }
578            }
579        };
580    }
581
582    test_pipeline!(MockHandler, "test_processor");
583    test_pipeline!(SequentialHandler, "sequential_handler");
584    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
585
586    async fn test_init_watermark(
587        first_checkpoint: Option<u64>,
588        is_concurrent: bool,
589    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
590        let registry = Registry::new();
591        let store = MockStore::default();
592
593        test_pipeline!(A, "pipeline_name");
594
595        let mut conn = store.connect().await.unwrap();
596
597        let indexer_args = IndexerArgs {
598            first_checkpoint,
599            ..IndexerArgs::default()
600        };
601        let temp_dir = tempfile::tempdir().unwrap();
602        let client_args = ClientArgs {
603            ingestion: IngestionClientArgs {
604                local_ingestion_path: Some(temp_dir.path().to_owned()),
605                ..Default::default()
606            },
607            ..Default::default()
608        };
609        let ingestion_config = IngestionConfig::default();
610
611        let mut indexer = Indexer::new(
612            store.clone(),
613            indexer_args,
614            client_args,
615            ingestion_config,
616            None,
617            &registry,
618        )
619        .await
620        .unwrap();
621
622        if is_concurrent {
623            indexer
624                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
625                .await
626                .unwrap();
627        } else {
628            indexer
629                .sequential_pipeline::<A>(A, SequentialConfig::default())
630                .await
631                .unwrap();
632        }
633
634        (
635            conn.committer_watermark(A::NAME).await.unwrap(),
636            conn.pruner_watermark(A::NAME, Duration::ZERO)
637                .await
638                .unwrap(),
639        )
640    }
641
642    #[test]
643    fn test_arg_parsing() {
644        #[derive(Parser)]
645        struct Args {
646            #[clap(flatten)]
647            indexer: IndexerArgs,
648        }
649
650        let args = Args::try_parse_from([
651            "cmd",
652            "--first-checkpoint",
653            "10",
654            "--last-checkpoint",
655            "100",
656            "--pipeline",
657            "a",
658            "--pipeline",
659            "b",
660            "--task",
661            "t",
662            "--reader-interval-ms",
663            "5000",
664        ])
665        .unwrap();
666
667        assert_eq!(args.indexer.first_checkpoint, Some(10));
668        assert_eq!(args.indexer.last_checkpoint, Some(100));
669        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
670        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
671        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
672    }
673
674    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
675    #[tokio::test]
676    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
677        let registry = Registry::new();
678        let store = MockStore::default();
679
680        test_pipeline!(A, "concurrent_a");
681        test_pipeline!(B, "concurrent_b");
682        test_pipeline!(C, "sequential_c");
683        test_pipeline!(D, "sequential_d");
684
685        let mut conn = store.connect().await.unwrap();
686        conn.set_committer_watermark(
687            A::NAME,
688            CommitterWatermark {
689                checkpoint_hi_inclusive: 100,
690                ..Default::default()
691            },
692        )
693        .await
694        .unwrap();
695        conn.set_committer_watermark(
696            B::NAME,
697            CommitterWatermark {
698                checkpoint_hi_inclusive: 10,
699                ..Default::default()
700            },
701        )
702        .await
703        .unwrap();
704        conn.set_committer_watermark(
705            C::NAME,
706            CommitterWatermark {
707                checkpoint_hi_inclusive: 1,
708                ..Default::default()
709            },
710        )
711        .await
712        .unwrap();
713        conn.set_committer_watermark(
714            D::NAME,
715            CommitterWatermark {
716                checkpoint_hi_inclusive: 50,
717                ..Default::default()
718            },
719        )
720        .await
721        .unwrap();
722
723        let indexer_args = IndexerArgs::default();
724        let temp_dir = tempfile::tempdir().unwrap();
725        let client_args = ClientArgs {
726            ingestion: IngestionClientArgs {
727                local_ingestion_path: Some(temp_dir.path().to_owned()),
728                ..Default::default()
729            },
730            ..Default::default()
731        };
732        let ingestion_config = IngestionConfig::default();
733
734        let mut indexer = Indexer::new(
735            store,
736            indexer_args,
737            client_args,
738            ingestion_config,
739            None,
740            &registry,
741        )
742        .await
743        .unwrap();
744
745        indexer
746            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
747            .await
748            .unwrap();
749        indexer
750            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
751            .await
752            .unwrap();
753        indexer
754            .sequential_pipeline::<C>(C, SequentialConfig::default())
755            .await
756            .unwrap();
757        indexer
758            .sequential_pipeline::<D>(D, SequentialConfig::default())
759            .await
760            .unwrap();
761
762        assert_eq!(indexer.first_ingestion_checkpoint, 2);
763    }
764
765    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
766    #[tokio::test]
767    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
768        let registry = Registry::new();
769        let store = MockStore::default();
770
771        test_pipeline!(A, "concurrent_a");
772        test_pipeline!(B, "concurrent_b");
773        test_pipeline!(C, "sequential_c");
774        test_pipeline!(D, "sequential_d");
775
776        let mut conn = store.connect().await.unwrap();
777        conn.set_committer_watermark(
778            B::NAME,
779            CommitterWatermark {
780                checkpoint_hi_inclusive: 10,
781                ..Default::default()
782            },
783        )
784        .await
785        .unwrap();
786        conn.set_committer_watermark(
787            C::NAME,
788            CommitterWatermark {
789                checkpoint_hi_inclusive: 1,
790                ..Default::default()
791            },
792        )
793        .await
794        .unwrap();
795
796        let indexer_args = IndexerArgs::default();
797        let temp_dir = tempfile::tempdir().unwrap();
798        let client_args = ClientArgs {
799            ingestion: IngestionClientArgs {
800                local_ingestion_path: Some(temp_dir.path().to_owned()),
801                ..Default::default()
802            },
803            ..Default::default()
804        };
805        let ingestion_config = IngestionConfig::default();
806
807        let mut indexer = Indexer::new(
808            store,
809            indexer_args,
810            client_args,
811            ingestion_config,
812            None,
813            &registry,
814        )
815        .await
816        .unwrap();
817
818        indexer
819            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
820            .await
821            .unwrap();
822        indexer
823            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
824            .await
825            .unwrap();
826        indexer
827            .sequential_pipeline::<C>(C, SequentialConfig::default())
828            .await
829            .unwrap();
830        indexer
831            .sequential_pipeline::<D>(D, SequentialConfig::default())
832            .await
833            .unwrap();
834
835        assert_eq!(indexer.first_ingestion_checkpoint, 0);
836    }
837
838    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
839    #[tokio::test]
840    async fn test_first_ingestion_checkpoint_smallest_is_0() {
841        let registry = Registry::new();
842        let store = MockStore::default();
843
844        test_pipeline!(A, "concurrent_a");
845        test_pipeline!(B, "concurrent_b");
846        test_pipeline!(C, "sequential_c");
847        test_pipeline!(D, "sequential_d");
848
849        let mut conn = store.connect().await.unwrap();
850        conn.set_committer_watermark(
851            A::NAME,
852            CommitterWatermark {
853                checkpoint_hi_inclusive: 100,
854                ..Default::default()
855            },
856        )
857        .await
858        .unwrap();
859        conn.set_committer_watermark(
860            B::NAME,
861            CommitterWatermark {
862                checkpoint_hi_inclusive: 10,
863                ..Default::default()
864            },
865        )
866        .await
867        .unwrap();
868        conn.set_committer_watermark(
869            C::NAME,
870            CommitterWatermark {
871                checkpoint_hi_inclusive: 1,
872                ..Default::default()
873            },
874        )
875        .await
876        .unwrap();
877        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
878            .await
879            .unwrap();
880
881        let indexer_args = IndexerArgs::default();
882        let temp_dir = tempfile::tempdir().unwrap();
883        let client_args = ClientArgs {
884            ingestion: IngestionClientArgs {
885                local_ingestion_path: Some(temp_dir.path().to_owned()),
886                ..Default::default()
887            },
888            ..Default::default()
889        };
890        let ingestion_config = IngestionConfig::default();
891
892        let mut indexer = Indexer::new(
893            store,
894            indexer_args,
895            client_args,
896            ingestion_config,
897            None,
898            &registry,
899        )
900        .await
901        .unwrap();
902
903        indexer
904            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
905            .await
906            .unwrap();
907        indexer
908            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
909            .await
910            .unwrap();
911        indexer
912            .sequential_pipeline::<C>(C, SequentialConfig::default())
913            .await
914            .unwrap();
915        indexer
916            .sequential_pipeline::<D>(D, SequentialConfig::default())
917            .await
918            .unwrap();
919
920        assert_eq!(indexer.first_ingestion_checkpoint, 1);
921    }
922
923    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
924    /// watermark, and first_checkpoint is smallest.
925    #[tokio::test]
926    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
927        let registry = Registry::new();
928        let store = MockStore::default();
929
930        test_pipeline!(A, "concurrent_a");
931        test_pipeline!(B, "concurrent_b");
932        test_pipeline!(C, "sequential_c");
933        test_pipeline!(D, "sequential_d");
934
935        let mut conn = store.connect().await.unwrap();
936        conn.set_committer_watermark(
937            B::NAME,
938            CommitterWatermark {
939                checkpoint_hi_inclusive: 50,
940                ..Default::default()
941            },
942        )
943        .await
944        .unwrap();
945        conn.set_committer_watermark(
946            C::NAME,
947            CommitterWatermark {
948                checkpoint_hi_inclusive: 10,
949                ..Default::default()
950            },
951        )
952        .await
953        .unwrap();
954
955        let indexer_args = IndexerArgs {
956            first_checkpoint: Some(5),
957            ..Default::default()
958        };
959        let temp_dir = tempfile::tempdir().unwrap();
960        let client_args = ClientArgs {
961            ingestion: IngestionClientArgs {
962                local_ingestion_path: Some(temp_dir.path().to_owned()),
963                ..Default::default()
964            },
965            ..Default::default()
966        };
967        let ingestion_config = IngestionConfig::default();
968
969        let mut indexer = Indexer::new(
970            store,
971            indexer_args,
972            client_args,
973            ingestion_config,
974            None,
975            &registry,
976        )
977        .await
978        .unwrap();
979
980        indexer
981            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
982            .await
983            .unwrap();
984        indexer
985            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
986            .await
987            .unwrap();
988        indexer
989            .sequential_pipeline::<C>(C, SequentialConfig::default())
990            .await
991            .unwrap();
992        indexer
993            .sequential_pipeline::<D>(D, SequentialConfig::default())
994            .await
995            .unwrap();
996
997        assert_eq!(indexer.first_ingestion_checkpoint, 5);
998    }
999
1000    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1001    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1002    #[tokio::test]
1003    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1004        let registry = Registry::new();
1005        let store = MockStore::default();
1006
1007        test_pipeline!(B, "concurrent_b");
1008        test_pipeline!(C, "sequential_c");
1009
1010        let mut conn = store.connect().await.unwrap();
1011        conn.set_committer_watermark(
1012            B::NAME,
1013            CommitterWatermark {
1014                checkpoint_hi_inclusive: 50,
1015                ..Default::default()
1016            },
1017        )
1018        .await
1019        .unwrap();
1020        conn.set_committer_watermark(
1021            C::NAME,
1022            CommitterWatermark {
1023                checkpoint_hi_inclusive: 10,
1024                ..Default::default()
1025            },
1026        )
1027        .await
1028        .unwrap();
1029
1030        let indexer_args = IndexerArgs {
1031            first_checkpoint: Some(5),
1032            ..Default::default()
1033        };
1034        let temp_dir = tempfile::tempdir().unwrap();
1035        let client_args = ClientArgs {
1036            ingestion: IngestionClientArgs {
1037                local_ingestion_path: Some(temp_dir.path().to_owned()),
1038                ..Default::default()
1039            },
1040            ..Default::default()
1041        };
1042        let ingestion_config = IngestionConfig::default();
1043
1044        let mut indexer = Indexer::new(
1045            store,
1046            indexer_args,
1047            client_args,
1048            ingestion_config,
1049            None,
1050            &registry,
1051        )
1052        .await
1053        .unwrap();
1054
1055        indexer
1056            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1057            .await
1058            .unwrap();
1059        indexer
1060            .sequential_pipeline::<C>(C, SequentialConfig::default())
1061            .await
1062            .unwrap();
1063
1064        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1065    }
1066
1067    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1068    /// will not be used as the starting point if it is not the smallest valid committer watermark
1069    /// to resume ingesting from.
1070    #[tokio::test]
1071    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1072        let registry = Registry::new();
1073        let store = MockStore::default();
1074
1075        test_pipeline!(A, "concurrent_a");
1076        test_pipeline!(B, "concurrent_b");
1077        test_pipeline!(C, "sequential_c");
1078
1079        let mut conn = store.connect().await.unwrap();
1080        conn.set_committer_watermark(
1081            B::NAME,
1082            CommitterWatermark {
1083                checkpoint_hi_inclusive: 50,
1084                ..Default::default()
1085            },
1086        )
1087        .await
1088        .unwrap();
1089        conn.set_committer_watermark(
1090            C::NAME,
1091            CommitterWatermark {
1092                checkpoint_hi_inclusive: 10,
1093                ..Default::default()
1094            },
1095        )
1096        .await
1097        .unwrap();
1098
1099        let indexer_args = IndexerArgs {
1100            first_checkpoint: Some(24),
1101            ..Default::default()
1102        };
1103        let temp_dir = tempfile::tempdir().unwrap();
1104        let client_args = ClientArgs {
1105            ingestion: IngestionClientArgs {
1106                local_ingestion_path: Some(temp_dir.path().to_owned()),
1107                ..Default::default()
1108            },
1109            ..Default::default()
1110        };
1111        let ingestion_config = IngestionConfig::default();
1112
1113        let mut indexer = Indexer::new(
1114            store,
1115            indexer_args,
1116            client_args,
1117            ingestion_config,
1118            None,
1119            &registry,
1120        )
1121        .await
1122        .unwrap();
1123
1124        indexer
1125            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1126            .await
1127            .unwrap();
1128
1129        indexer
1130            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1131            .await
1132            .unwrap();
1133
1134        indexer
1135            .sequential_pipeline::<C>(C, SequentialConfig::default())
1136            .await
1137            .unwrap();
1138
1139        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1140    }
1141
1142    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1143    #[tokio::test]
1144    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1145        let registry = Registry::new();
1146        let store = MockStore::default();
1147
1148        test_pipeline!(A, "concurrent_a");
1149        test_pipeline!(B, "concurrent_b");
1150        test_pipeline!(C, "sequential_c");
1151        test_pipeline!(D, "sequential_d");
1152
1153        let mut conn = store.connect().await.unwrap();
1154        conn.set_committer_watermark(
1155            A::NAME,
1156            CommitterWatermark {
1157                checkpoint_hi_inclusive: 5,
1158                ..Default::default()
1159            },
1160        )
1161        .await
1162        .unwrap();
1163        conn.set_committer_watermark(
1164            B::NAME,
1165            CommitterWatermark {
1166                checkpoint_hi_inclusive: 10,
1167                ..Default::default()
1168            },
1169        )
1170        .await
1171        .unwrap();
1172        conn.set_committer_watermark(
1173            C::NAME,
1174            CommitterWatermark {
1175                checkpoint_hi_inclusive: 15,
1176                ..Default::default()
1177            },
1178        )
1179        .await
1180        .unwrap();
1181        conn.set_committer_watermark(
1182            D::NAME,
1183            CommitterWatermark {
1184                checkpoint_hi_inclusive: 20,
1185                ..Default::default()
1186            },
1187        )
1188        .await
1189        .unwrap();
1190
1191        // Create synthetic ingestion data
1192        let temp_dir = tempfile::tempdir().unwrap();
1193        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1194            ingestion_dir: temp_dir.path().to_owned(),
1195            starting_checkpoint: 5,
1196            num_checkpoints: 25,
1197            checkpoint_size: 1,
1198        })
1199        .await;
1200
1201        let indexer_args = IndexerArgs {
1202            last_checkpoint: Some(29),
1203            ..Default::default()
1204        };
1205
1206        let client_args = ClientArgs {
1207            ingestion: IngestionClientArgs {
1208                local_ingestion_path: Some(temp_dir.path().to_owned()),
1209                ..Default::default()
1210            },
1211            ..Default::default()
1212        };
1213
1214        let ingestion_config = IngestionConfig::default();
1215
1216        let mut indexer = Indexer::new(
1217            store.clone(),
1218            indexer_args,
1219            client_args,
1220            ingestion_config,
1221            None,
1222            &registry,
1223        )
1224        .await
1225        .unwrap();
1226
1227        indexer
1228            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1229            .await
1230            .unwrap();
1231        indexer
1232            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1233            .await
1234            .unwrap();
1235        indexer
1236            .sequential_pipeline::<C>(C, SequentialConfig::default())
1237            .await
1238            .unwrap();
1239        indexer
1240            .sequential_pipeline::<D>(D, SequentialConfig::default())
1241            .await
1242            .unwrap();
1243
1244        let ingestion_metrics = indexer.ingestion_metrics().clone();
1245        let indexer_metrics = indexer.indexer_metrics().clone();
1246
1247        indexer.run().await.unwrap().join().await.unwrap();
1248
1249        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1250        assert_eq!(
1251            indexer_metrics
1252                .total_watermarks_out_of_order
1253                .get_metric_with_label_values(&[A::NAME])
1254                .unwrap()
1255                .get(),
1256            0
1257        );
1258        assert_eq!(
1259            indexer_metrics
1260                .total_watermarks_out_of_order
1261                .get_metric_with_label_values(&[B::NAME])
1262                .unwrap()
1263                .get(),
1264            5
1265        );
1266        assert_eq!(
1267            indexer_metrics
1268                .total_watermarks_out_of_order
1269                .get_metric_with_label_values(&[C::NAME])
1270                .unwrap()
1271                .get(),
1272            10
1273        );
1274        assert_eq!(
1275            indexer_metrics
1276                .total_watermarks_out_of_order
1277                .get_metric_with_label_values(&[D::NAME])
1278                .unwrap()
1279                .get(),
1280            15
1281        );
1282    }
1283
1284    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1285    #[tokio::test]
1286    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1287        let registry = Registry::new();
1288        let store = MockStore::default();
1289
1290        test_pipeline!(A, "concurrent_a");
1291        test_pipeline!(B, "concurrent_b");
1292        test_pipeline!(C, "sequential_c");
1293        test_pipeline!(D, "sequential_d");
1294
1295        let mut conn = store.connect().await.unwrap();
1296        conn.set_committer_watermark(
1297            A::NAME,
1298            CommitterWatermark {
1299                checkpoint_hi_inclusive: 5,
1300                ..Default::default()
1301            },
1302        )
1303        .await
1304        .unwrap();
1305        conn.set_committer_watermark(
1306            B::NAME,
1307            CommitterWatermark {
1308                checkpoint_hi_inclusive: 10,
1309                ..Default::default()
1310            },
1311        )
1312        .await
1313        .unwrap();
1314        conn.set_committer_watermark(
1315            C::NAME,
1316            CommitterWatermark {
1317                checkpoint_hi_inclusive: 15,
1318                ..Default::default()
1319            },
1320        )
1321        .await
1322        .unwrap();
1323        conn.set_committer_watermark(
1324            D::NAME,
1325            CommitterWatermark {
1326                checkpoint_hi_inclusive: 20,
1327                ..Default::default()
1328            },
1329        )
1330        .await
1331        .unwrap();
1332
1333        // Create synthetic ingestion data
1334        let temp_dir = tempfile::tempdir().unwrap();
1335        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1336            ingestion_dir: temp_dir.path().to_owned(),
1337            starting_checkpoint: 5,
1338            num_checkpoints: 25,
1339            checkpoint_size: 1,
1340        })
1341        .await;
1342
1343        let indexer_args = IndexerArgs {
1344            first_checkpoint: Some(3),
1345            last_checkpoint: Some(29),
1346            ..Default::default()
1347        };
1348
1349        let client_args = ClientArgs {
1350            ingestion: IngestionClientArgs {
1351                local_ingestion_path: Some(temp_dir.path().to_owned()),
1352                ..Default::default()
1353            },
1354            ..Default::default()
1355        };
1356
1357        let ingestion_config = IngestionConfig::default();
1358
1359        let mut indexer = Indexer::new(
1360            store.clone(),
1361            indexer_args,
1362            client_args,
1363            ingestion_config,
1364            None,
1365            &registry,
1366        )
1367        .await
1368        .unwrap();
1369
1370        indexer
1371            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1372            .await
1373            .unwrap();
1374        indexer
1375            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1376            .await
1377            .unwrap();
1378        indexer
1379            .sequential_pipeline::<C>(C, SequentialConfig::default())
1380            .await
1381            .unwrap();
1382        indexer
1383            .sequential_pipeline::<D>(D, SequentialConfig::default())
1384            .await
1385            .unwrap();
1386
1387        let ingestion_metrics = indexer.ingestion_metrics().clone();
1388        let metrics = indexer.indexer_metrics().clone();
1389        indexer.run().await.unwrap().join().await.unwrap();
1390
1391        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1392        assert_eq!(
1393            metrics
1394                .total_watermarks_out_of_order
1395                .get_metric_with_label_values(&[A::NAME])
1396                .unwrap()
1397                .get(),
1398            0
1399        );
1400        assert_eq!(
1401            metrics
1402                .total_watermarks_out_of_order
1403                .get_metric_with_label_values(&[B::NAME])
1404                .unwrap()
1405                .get(),
1406            5
1407        );
1408        assert_eq!(
1409            metrics
1410                .total_watermarks_out_of_order
1411                .get_metric_with_label_values(&[C::NAME])
1412                .unwrap()
1413                .get(),
1414            10
1415        );
1416        assert_eq!(
1417            metrics
1418                .total_watermarks_out_of_order
1419                .get_metric_with_label_values(&[D::NAME])
1420                .unwrap()
1421                .get(),
1422            15
1423        );
1424    }
1425
1426    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1427    #[tokio::test]
1428    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1429        let registry = Registry::new();
1430        let store = MockStore::default();
1431
1432        test_pipeline!(A, "concurrent_a");
1433        test_pipeline!(B, "concurrent_b");
1434        test_pipeline!(C, "sequential_c");
1435        test_pipeline!(D, "sequential_d");
1436
1437        let mut conn = store.connect().await.unwrap();
1438        conn.set_committer_watermark(
1439            B::NAME,
1440            CommitterWatermark {
1441                checkpoint_hi_inclusive: 10,
1442                ..Default::default()
1443            },
1444        )
1445        .await
1446        .unwrap();
1447        conn.set_committer_watermark(
1448            C::NAME,
1449            CommitterWatermark {
1450                checkpoint_hi_inclusive: 15,
1451                ..Default::default()
1452            },
1453        )
1454        .await
1455        .unwrap();
1456        conn.set_committer_watermark(
1457            D::NAME,
1458            CommitterWatermark {
1459                checkpoint_hi_inclusive: 20,
1460                ..Default::default()
1461            },
1462        )
1463        .await
1464        .unwrap();
1465
1466        // Create synthetic ingestion data
1467        let temp_dir = tempfile::tempdir().unwrap();
1468        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1469            ingestion_dir: temp_dir.path().to_owned(),
1470            starting_checkpoint: 0,
1471            num_checkpoints: 30,
1472            checkpoint_size: 1,
1473        })
1474        .await;
1475
1476        let indexer_args = IndexerArgs {
1477            last_checkpoint: Some(29),
1478            ..Default::default()
1479        };
1480
1481        let client_args = ClientArgs {
1482            ingestion: IngestionClientArgs {
1483                local_ingestion_path: Some(temp_dir.path().to_owned()),
1484                ..Default::default()
1485            },
1486            ..Default::default()
1487        };
1488
1489        let ingestion_config = IngestionConfig::default();
1490
1491        let mut indexer = Indexer::new(
1492            store.clone(),
1493            indexer_args,
1494            client_args,
1495            ingestion_config,
1496            None,
1497            &registry,
1498        )
1499        .await
1500        .unwrap();
1501
1502        indexer
1503            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1504            .await
1505            .unwrap();
1506        indexer
1507            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1508            .await
1509            .unwrap();
1510        indexer
1511            .sequential_pipeline::<C>(C, SequentialConfig::default())
1512            .await
1513            .unwrap();
1514        indexer
1515            .sequential_pipeline::<D>(D, SequentialConfig::default())
1516            .await
1517            .unwrap();
1518
1519        let ingestion_metrics = indexer.ingestion_metrics().clone();
1520        let metrics = indexer.indexer_metrics().clone();
1521        indexer.run().await.unwrap().join().await.unwrap();
1522
1523        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1524        assert_eq!(
1525            metrics
1526                .total_watermarks_out_of_order
1527                .get_metric_with_label_values(&[A::NAME])
1528                .unwrap()
1529                .get(),
1530            0
1531        );
1532        assert_eq!(
1533            metrics
1534                .total_watermarks_out_of_order
1535                .get_metric_with_label_values(&[B::NAME])
1536                .unwrap()
1537                .get(),
1538            11
1539        );
1540        assert_eq!(
1541            metrics
1542                .total_watermarks_out_of_order
1543                .get_metric_with_label_values(&[C::NAME])
1544                .unwrap()
1545                .get(),
1546            16
1547        );
1548        assert_eq!(
1549            metrics
1550                .total_watermarks_out_of_order
1551                .get_metric_with_label_values(&[D::NAME])
1552                .unwrap()
1553                .get(),
1554            21
1555        );
1556    }
1557
1558    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1559    #[tokio::test]
1560    async fn test_indexer_ingestion_use_first_checkpoint() {
1561        let registry = Registry::new();
1562        let store = MockStore::default();
1563
1564        test_pipeline!(A, "concurrent_a");
1565        test_pipeline!(B, "concurrent_b");
1566        test_pipeline!(C, "sequential_c");
1567        test_pipeline!(D, "sequential_d");
1568
1569        let mut conn = store.connect().await.unwrap();
1570        conn.set_committer_watermark(
1571            B::NAME,
1572            CommitterWatermark {
1573                checkpoint_hi_inclusive: 10,
1574                ..Default::default()
1575            },
1576        )
1577        .await
1578        .unwrap();
1579        conn.set_committer_watermark(
1580            C::NAME,
1581            CommitterWatermark {
1582                checkpoint_hi_inclusive: 15,
1583                ..Default::default()
1584            },
1585        )
1586        .await
1587        .unwrap();
1588        conn.set_committer_watermark(
1589            D::NAME,
1590            CommitterWatermark {
1591                checkpoint_hi_inclusive: 20,
1592                ..Default::default()
1593            },
1594        )
1595        .await
1596        .unwrap();
1597
1598        // Create synthetic ingestion data
1599        let temp_dir = tempfile::tempdir().unwrap();
1600        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1601            ingestion_dir: temp_dir.path().to_owned(),
1602            starting_checkpoint: 5,
1603            num_checkpoints: 25,
1604            checkpoint_size: 1,
1605        })
1606        .await;
1607
1608        let indexer_args = IndexerArgs {
1609            first_checkpoint: Some(10),
1610            last_checkpoint: Some(29),
1611            ..Default::default()
1612        };
1613
1614        let client_args = ClientArgs {
1615            ingestion: IngestionClientArgs {
1616                local_ingestion_path: Some(temp_dir.path().to_owned()),
1617                ..Default::default()
1618            },
1619            ..Default::default()
1620        };
1621
1622        let ingestion_config = IngestionConfig::default();
1623
1624        let mut indexer = Indexer::new(
1625            store.clone(),
1626            indexer_args,
1627            client_args,
1628            ingestion_config,
1629            None,
1630            &registry,
1631        )
1632        .await
1633        .unwrap();
1634
1635        indexer
1636            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1637            .await
1638            .unwrap();
1639        indexer
1640            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1641            .await
1642            .unwrap();
1643        indexer
1644            .sequential_pipeline::<C>(C, SequentialConfig::default())
1645            .await
1646            .unwrap();
1647        indexer
1648            .sequential_pipeline::<D>(D, SequentialConfig::default())
1649            .await
1650            .unwrap();
1651
1652        let ingestion_metrics = indexer.ingestion_metrics().clone();
1653        let metrics = indexer.indexer_metrics().clone();
1654        indexer.run().await.unwrap().join().await.unwrap();
1655
1656        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1657        assert_eq!(
1658            metrics
1659                .total_watermarks_out_of_order
1660                .get_metric_with_label_values(&[A::NAME])
1661                .unwrap()
1662                .get(),
1663            0
1664        );
1665        assert_eq!(
1666            metrics
1667                .total_watermarks_out_of_order
1668                .get_metric_with_label_values(&[B::NAME])
1669                .unwrap()
1670                .get(),
1671            1
1672        );
1673        assert_eq!(
1674            metrics
1675                .total_watermarks_out_of_order
1676                .get_metric_with_label_values(&[C::NAME])
1677                .unwrap()
1678                .get(),
1679            6
1680        );
1681        assert_eq!(
1682            metrics
1683                .total_watermarks_out_of_order
1684                .get_metric_with_label_values(&[D::NAME])
1685                .unwrap()
1686                .get(),
1687            11
1688        );
1689    }
1690
1691    #[tokio::test]
1692    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1693        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1694        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1695        assert_eq!(committer_watermark, None);
1696        assert_eq!(pruner_watermark, None);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1701        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1702        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1703        assert_eq!(committer_watermark, None);
1704        assert_eq!(pruner_watermark, None);
1705    }
1706
1707    #[tokio::test]
1708    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1709        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1710
1711        let committer_watermark = committer_watermark.unwrap();
1712        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1713
1714        let pruner_watermark = pruner_watermark.unwrap();
1715        assert_eq!(pruner_watermark.reader_lo, 1);
1716        assert_eq!(pruner_watermark.pruner_hi, 1);
1717    }
1718
1719    #[tokio::test]
1720    async fn test_init_watermark_sequential() {
1721        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1722
1723        let committer_watermark = committer_watermark.unwrap();
1724        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1725
1726        let pruner_watermark = pruner_watermark.unwrap();
1727        assert_eq!(pruner_watermark.reader_lo, 1);
1728        assert_eq!(pruner_watermark.pruner_hi, 1);
1729    }
1730
1731    #[tokio::test]
1732    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1733        let registry = Registry::new();
1734        let store = MockStore::default();
1735
1736        // Set up different watermarks for three different sequential pipelines
1737        let mut conn = store.connect().await.unwrap();
1738
1739        // First handler at checkpoint 10
1740        conn.set_committer_watermark(
1741            MockHandler::NAME,
1742            CommitterWatermark {
1743                epoch_hi_inclusive: 0,
1744                checkpoint_hi_inclusive: 10,
1745                tx_hi: 20,
1746                timestamp_ms_hi_inclusive: 10000,
1747            },
1748        )
1749        .await
1750        .unwrap();
1751
1752        // SequentialHandler at checkpoint 5
1753        conn.set_committer_watermark(
1754            SequentialHandler::NAME,
1755            CommitterWatermark {
1756                epoch_hi_inclusive: 0,
1757                checkpoint_hi_inclusive: 5,
1758                tx_hi: 10,
1759                timestamp_ms_hi_inclusive: 5000,
1760            },
1761        )
1762        .await
1763        .unwrap();
1764
1765        // Create synthetic ingestion data
1766        let temp_dir = tempfile::tempdir().unwrap();
1767        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1768            ingestion_dir: temp_dir.path().to_owned(),
1769            starting_checkpoint: 0,
1770            num_checkpoints: 20,
1771            checkpoint_size: 2,
1772        })
1773        .await;
1774
1775        let indexer_args = IndexerArgs {
1776            first_checkpoint: None,
1777            last_checkpoint: Some(19),
1778            pipeline: vec![],
1779            ..Default::default()
1780        };
1781
1782        let client_args = ClientArgs {
1783            ingestion: IngestionClientArgs {
1784                local_ingestion_path: Some(temp_dir.path().to_owned()),
1785                ..Default::default()
1786            },
1787            ..Default::default()
1788        };
1789
1790        let ingestion_config = IngestionConfig::default();
1791
1792        let mut indexer = Indexer::new(
1793            store.clone(),
1794            indexer_args,
1795            client_args,
1796            ingestion_config,
1797            None,
1798            &registry,
1799        )
1800        .await
1801        .unwrap();
1802
1803        // Add first sequential pipeline
1804        indexer
1805            .sequential_pipeline(
1806                MockHandler,
1807                pipeline::sequential::SequentialConfig::default(),
1808            )
1809            .await
1810            .unwrap();
1811
1812        // Verify first ingestion checkpoint is set correctly (10 + 1 = 11)
1813        assert_eq!(
1814            indexer.first_ingestion_checkpoint, 11,
1815            "first_ingestion_checkpoint should be 11"
1816        );
1817
1818        // Add second sequential pipeline
1819        indexer
1820            .sequential_pipeline(
1821                SequentialHandler,
1822                pipeline::sequential::SequentialConfig::default(),
1823            )
1824            .await
1825            .unwrap();
1826
1827        // Should change to 6 (minimum of 6 and 11)
1828        assert_eq!(
1829            indexer.first_ingestion_checkpoint, 6,
1830            "first_ingestion_checkpoint should still be 6"
1831        );
1832
1833        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1834        indexer.run().await.unwrap().join().await.unwrap();
1835
1836        // Verify each pipeline made some progress independently
1837        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1838        let watermark2 = conn
1839            .committer_watermark(SequentialHandler::NAME)
1840            .await
1841            .unwrap();
1842
1843        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1844        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1845    }
1846
1847    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1848    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1849    /// committing checkpoints less than the main pipeline's reader watermark.
1850    #[tokio::test]
1851    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1852        let registry = Registry::new();
1853        let store = MockStore::default();
1854
1855        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1856        // reader watermark at `7`.
1857        let mut conn = store.connect().await.unwrap();
1858        conn.set_committer_watermark(
1859            MockCheckpointSequenceNumberHandler::NAME,
1860            CommitterWatermark {
1861                checkpoint_hi_inclusive: 10,
1862                ..Default::default()
1863            },
1864        )
1865        .await
1866        .unwrap();
1867        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1868            .await
1869            .unwrap();
1870
1871        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1872        // be ignored by the tasked indexer.
1873        let indexer_args = IndexerArgs {
1874            first_checkpoint: Some(0),
1875            last_checkpoint: Some(15),
1876            pipeline: vec![],
1877            task: TaskArgs::tasked("task".to_string(), 10),
1878        };
1879        let temp_dir = tempfile::tempdir().unwrap();
1880        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1881            ingestion_dir: temp_dir.path().to_owned(),
1882            starting_checkpoint: 0,
1883            num_checkpoints: 16,
1884            checkpoint_size: 2,
1885        })
1886        .await;
1887
1888        let client_args = ClientArgs {
1889            ingestion: IngestionClientArgs {
1890                local_ingestion_path: Some(temp_dir.path().to_owned()),
1891                ..Default::default()
1892            },
1893            ..Default::default()
1894        };
1895
1896        let ingestion_config = IngestionConfig::default();
1897
1898        let mut tasked_indexer = Indexer::new(
1899            store.clone(),
1900            indexer_args,
1901            client_args,
1902            ingestion_config,
1903            None,
1904            &registry,
1905        )
1906        .await
1907        .unwrap();
1908
1909        let _ = tasked_indexer
1910            .concurrent_pipeline(
1911                MockCheckpointSequenceNumberHandler,
1912                ConcurrentConfig::default(),
1913            )
1914            .await;
1915
1916        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1917        let metrics = tasked_indexer.indexer_metrics().clone();
1918
1919        tasked_indexer.run().await.unwrap().join().await.unwrap();
1920
1921        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1922        assert_eq!(
1923            metrics
1924                .total_collector_skipped_checkpoints
1925                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1926                .unwrap()
1927                .get(),
1928            7
1929        );
1930        let data = store
1931            .data
1932            .get(MockCheckpointSequenceNumberHandler::NAME)
1933            .unwrap();
1934        assert_eq!(data.len(), 9);
1935        for i in 0..7 {
1936            assert!(data.get(&i).is_none());
1937        }
1938        for i in 7..16 {
1939            assert!(data.get(&i).is_some());
1940        }
1941    }
1942
1943    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1944    #[tokio::test]
1945    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1946        let registry = Registry::new();
1947        let store = MockStore::default();
1948
1949        let mut conn = store.connect().await.unwrap();
1950        conn.set_committer_watermark(
1951            "test",
1952            CommitterWatermark {
1953                checkpoint_hi_inclusive: 10,
1954                ..Default::default()
1955            },
1956        )
1957        .await
1958        .unwrap();
1959        conn.set_reader_watermark("test", 5).await.unwrap();
1960
1961        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1962        // watermarks.
1963        let indexer_args = IndexerArgs {
1964            first_checkpoint: Some(9),
1965            last_checkpoint: Some(25),
1966            pipeline: vec![],
1967            task: TaskArgs::tasked("task".to_string(), 10),
1968        };
1969        let temp_dir = tempfile::tempdir().unwrap();
1970        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1971            ingestion_dir: temp_dir.path().to_owned(),
1972            starting_checkpoint: 9,
1973            num_checkpoints: 17,
1974            checkpoint_size: 2,
1975        })
1976        .await;
1977
1978        let client_args = ClientArgs {
1979            ingestion: IngestionClientArgs {
1980                local_ingestion_path: Some(temp_dir.path().to_owned()),
1981                ..Default::default()
1982            },
1983            ..Default::default()
1984        };
1985
1986        let ingestion_config = IngestionConfig::default();
1987
1988        let mut tasked_indexer = Indexer::new(
1989            store.clone(),
1990            indexer_args,
1991            client_args,
1992            ingestion_config,
1993            None,
1994            &registry,
1995        )
1996        .await
1997        .unwrap();
1998
1999        let _ = tasked_indexer
2000            .concurrent_pipeline(
2001                MockCheckpointSequenceNumberHandler,
2002                ConcurrentConfig::default(),
2003            )
2004            .await;
2005
2006        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2007        let metrics = tasked_indexer.indexer_metrics().clone();
2008
2009        tasked_indexer.run().await.unwrap().join().await.unwrap();
2010
2011        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2012        assert_eq!(
2013            metrics
2014                .total_watermarks_out_of_order
2015                .get_metric_with_label_values(&["test"])
2016                .unwrap()
2017                .get(),
2018            0
2019        );
2020        assert_eq!(
2021            metrics
2022                .total_collector_skipped_checkpoints
2023                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2024                .unwrap()
2025                .get(),
2026            0
2027        );
2028
2029        let data = store.data.get("test").unwrap();
2030        assert!(data.len() == 17);
2031        for i in 0..9 {
2032            assert!(data.get(&i).is_none());
2033        }
2034        for i in 9..26 {
2035            assert!(data.get(&i).is_some());
2036        }
2037        let main_pipeline_watermark = store.watermark("test").unwrap();
2038        // assert that the main pipeline's watermarks are not updated
2039        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2040        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2041        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2042        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2043        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2044    }
2045
2046    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2047    /// committed, and any checkpoints inflight < X will be skipped.
2048    #[tokio::test]
2049    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2050        let registry = Registry::new();
2051        let store = MockStore::default();
2052        let mut conn = store.connect().await.unwrap();
2053        // Set the main pipeline watermark.
2054        conn.set_committer_watermark(
2055            ControllableHandler::NAME,
2056            CommitterWatermark {
2057                checkpoint_hi_inclusive: 11,
2058                ..Default::default()
2059            },
2060        )
2061        .await
2062        .unwrap();
2063
2064        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2065        let temp_dir = tempfile::tempdir().unwrap();
2066        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2067            ingestion_dir: temp_dir.path().to_owned(),
2068            starting_checkpoint: 0,
2069            num_checkpoints: 501,
2070            checkpoint_size: 2,
2071        })
2072        .await;
2073        let indexer_args = IndexerArgs {
2074            first_checkpoint: Some(0),
2075            last_checkpoint: Some(500),
2076            pipeline: vec![],
2077            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2078        };
2079        let client_args = ClientArgs {
2080            ingestion: IngestionClientArgs {
2081                local_ingestion_path: Some(temp_dir.path().to_owned()),
2082                ..Default::default()
2083            },
2084            ..Default::default()
2085        };
2086        let ingestion_config = IngestionConfig::default();
2087        let mut tasked_indexer = Indexer::new(
2088            store.clone(),
2089            indexer_args,
2090            client_args,
2091            ingestion_config,
2092            None,
2093            &registry,
2094        )
2095        .await
2096        .unwrap();
2097        let mut allow_process = 10;
2098        // Limit the pipeline to process only checkpoints `[0, 10]`.
2099        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2100        let _ = tasked_indexer
2101            .concurrent_pipeline(
2102                controllable_handler,
2103                ConcurrentConfig {
2104                    committer: CommitterConfig {
2105                        collect_interval_ms: 10,
2106                        watermark_interval_ms: 10,
2107                        ..Default::default()
2108                    },
2109                    ..Default::default()
2110                },
2111            )
2112            .await;
2113        let metrics = tasked_indexer.indexer_metrics().clone();
2114
2115        let mut s_indexer = tasked_indexer.run().await.unwrap();
2116
2117        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2118        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2119        // committed.
2120        store
2121            .wait_for_watermark(
2122                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2123                10,
2124                std::time::Duration::from_secs(10),
2125            )
2126            .await;
2127
2128        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2129        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2130        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2131        // one at a time until the collector_reader_lo metric shows the new value.
2132        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2133            .await
2134            .unwrap();
2135
2136        let reader_lo = metrics
2137            .collector_reader_lo
2138            .with_label_values(&[ControllableHandler::NAME]);
2139
2140        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2141        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2142        // checkpoints have been processed.
2143        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2144        while reader_lo.get() != 250 {
2145            interval.tick().await;
2146            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2147            allow_process += 1;
2148            assert!(
2149                allow_process <= 500,
2150                "Released all checkpoints but collector never observed new reader_lo"
2151            );
2152            process_below.send(allow_process).ok();
2153        }
2154
2155        // At this point, the collector has observed reader_lo = 250. Release all remaining
2156        // checkpoints. Guarantees:
2157        // - [0, 10]: committed (before reader_lo was set)
2158        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2159        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2160        // - [250, 500]: committed (>= reader_lo)
2161        process_below.send(500).ok();
2162
2163        s_indexer.join().await.unwrap();
2164
2165        let data = store.data.get(ControllableHandler::NAME).unwrap();
2166
2167        // Checkpoints (allow_process, 250) must be skipped.
2168        for chkpt in (allow_process + 1)..250 {
2169            assert!(
2170                data.get(&chkpt).is_none(),
2171                "Checkpoint {chkpt} should have been skipped"
2172            );
2173        }
2174
2175        // Checkpoints >= reader_lo must be committed.
2176        for chkpt in 250..=500 {
2177            assert!(
2178                data.get(&chkpt).is_some(),
2179                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2180            );
2181        }
2182
2183        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2184        for chkpt in 0..=10 {
2185            assert!(
2186                data.get(&chkpt).is_some(),
2187                "Checkpoint {chkpt} should have been committed (baseline)"
2188            );
2189        }
2190    }
2191}