sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeSet, sync::Arc, time::Duration};
5
6use anyhow::{Context, bail, ensure};
7use ingestion::{ClientArgs, IngestionConfig, IngestionService, ingestion_client::IngestionClient};
8use metrics::IndexerMetrics;
9use prometheus::Registry;
10use sui_indexer_alt_framework_store_traits::{
11    Connection, Store, TransactionalStore, pipeline_task,
12};
13use tracing::info;
14
15pub use anyhow::Result;
16pub use sui_field_count::FieldCount;
17pub use sui_futures::service;
18/// External users access the store trait through framework::store
19pub use sui_indexer_alt_framework_store_traits as store;
20pub use sui_types as types;
21
22use crate::metrics::IngestionMetrics;
23use crate::pipeline::{
24    Processor,
25    concurrent::{self, ConcurrentConfig},
26    sequential::{self, Handler, SequentialConfig},
27};
28use crate::service::Service;
29
30#[cfg(feature = "cluster")]
31pub mod cluster;
32pub mod ingestion;
33pub mod metrics;
34pub mod pipeline;
35#[cfg(feature = "postgres")]
36pub mod postgres;
37
38#[cfg(test)]
39pub mod mocks;
40
41/// Command-line arguments for the indexer
42#[derive(clap::Args, Default, Debug, Clone)]
43pub struct IndexerArgs {
44    /// Override the next checkpoint for all pipelines without a committer watermark to start
45    /// processing from, which is 0 by default. Pipelines with existing watermarks will ignore this
46    /// setting and always resume from their committer watermark + 1.
47    ///
48    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
49    /// is the minimum across all pipelines' next checkpoints.
50    #[arg(long)]
51    pub first_checkpoint: Option<u64>,
52
53    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
54    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
55    #[arg(long)]
56    pub last_checkpoint: Option<u64>,
57
58    /// Only run the following pipelines. If not provided, all pipelines found in the
59    /// configuration file will be run.
60    #[arg(long, action = clap::ArgAction::Append)]
61    pub pipeline: Vec<String>,
62
63    /// Additional configurations for running a tasked indexer.
64    #[clap(flatten)]
65    pub task: TaskArgs,
66}
67
68/// Command-line arguments for configuring a tasked indexer.
69#[derive(clap::Parser, Default, Debug, Clone)]
70pub struct TaskArgs {
71    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
72    /// delimiter defined on the store. This allows the same pipelines to run under multiple
73    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
74    /// entries in the database.
75    ///
76    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
77    ///
78    /// Sequential pipelines cannot be attached to a tasked indexer.
79    ///
80    /// The framework ensures that tasked pipelines never commit checkpoints below the main
81    /// pipeline’s pruner watermark. Requires `--reader-interval-ms`.
82    #[arg(long, requires = "reader_interval_ms")]
83    task: Option<String>,
84
85    /// The interval in milliseconds at which each of the pipelines on a tasked indexer should
86    /// refetch its main pipeline's reader watermark. This is required when `--task` is set.
87    #[arg(long, requires = "task")]
88    reader_interval_ms: Option<u64>,
89}
90
91pub struct Indexer<S: Store> {
92    /// The storage backend that the indexer uses to write and query indexed data. This
93    /// generic implementation allows for plugging in different storage solutions that implement the
94    /// `Store` trait.
95    store: S,
96
97    /// Prometheus Metrics.
98    metrics: Arc<IndexerMetrics>,
99
100    /// Service for downloading and disseminating checkpoint data.
101    ingestion_service: IngestionService,
102
103    /// The next checkpoint for a pipeline without a committer watermark to start processing from,
104    /// which will be 0 by default. Pipelines with existing watermarks will ignore this setting and
105    /// always resume from their committer watermark + 1.
106    ///
107    /// Setting this value indirectly affects ingestion, as the checkpoint to start ingesting from
108    /// is the minimum across all pipelines' next checkpoints.
109    default_next_checkpoint: u64,
110
111    /// Optional override of the checkpoint upperbound. When set, the indexer will stop ingestion at
112    /// this checkpoint.
113    last_checkpoint: Option<u64>,
114
115    /// An optional task name for this indexer. When set, pipelines will record watermarks using the
116    /// delimiter defined on the store. This allows the same pipelines to run under multiple
117    /// indexers (e.g. for backfills or temporary workflows) while maintaining separate watermark
118    /// entries in the database.
119    ///
120    /// By default there is no task name, and watermarks are keyed only by `pipeline`.
121    ///
122    /// Sequential pipelines cannot be attached to a tasked indexer.
123    ///
124    /// The framework ensures that tasked pipelines never commit checkpoints below the main
125    /// pipeline’s pruner watermark.
126    task: Option<Task>,
127
128    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
129    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
130    /// a warning when the indexer is run.
131    enabled_pipelines: Option<BTreeSet<String>>,
132
133    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
134    /// with the same name isn't added twice.
135    added_pipelines: BTreeSet<&'static str>,
136
137    /// The checkpoint for the indexer to start ingesting from. This is derived from the committer
138    /// watermarks of pipelines added to the indexer. Pipelines without watermarks default to 0,
139    /// unless overridden by [Self::default_next_checkpoint].
140    first_ingestion_checkpoint: u64,
141
142    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
143    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
144    next_sequential_checkpoint: Option<u64>,
145
146    /// The service handles for every pipeline, used to manage lifetimes and graceful shutdown.
147    pipelines: Vec<Service>,
148}
149
150/// Configuration for a tasked indexer.
151#[derive(Clone)]
152pub(crate) struct Task {
153    /// Name of the tasked indexer, to be used with the delimiter defined on the indexer's store to
154    /// record pipeline watermarks.
155    task: String,
156    /// The interval at which each of the pipelines on a tasked indexer should refecth its main
157    /// pipeline's reader watermark.
158    reader_interval: Duration,
159}
160
161impl TaskArgs {
162    pub fn tasked(task: String, reader_interval_ms: u64) -> Self {
163        Self {
164            task: Some(task),
165            reader_interval_ms: Some(reader_interval_ms),
166        }
167    }
168
169    fn into_task(self) -> Option<Task> {
170        Some(Task {
171            task: self.task?,
172            reader_interval: Duration::from_millis(self.reader_interval_ms?),
173        })
174    }
175}
176
177impl<S: Store> Indexer<S> {
178    /// Create a new instance of the indexer framework from a store that implements the `Store`
179    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
180    /// arguments configure the following:
181    ///
182    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
183    ///   watermarks) and where to serve metrics from,
184    /// - Where to download checkpoints from,
185    /// - Concurrency and buffering parameters for downloading checkpoints.
186    ///
187    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
188    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
189    pub async fn new(
190        store: S,
191        indexer_args: IndexerArgs,
192        client_args: ClientArgs,
193        ingestion_config: IngestionConfig,
194        metrics_prefix: Option<&str>,
195        registry: &Registry,
196    ) -> Result<Self> {
197        let IndexerArgs {
198            first_checkpoint,
199            last_checkpoint,
200            pipeline,
201            task,
202        } = indexer_args;
203
204        let metrics = IndexerMetrics::new(metrics_prefix, registry);
205
206        let ingestion_service =
207            IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;
208
209        Ok(Self {
210            store,
211            metrics,
212            ingestion_service,
213            default_next_checkpoint: first_checkpoint.unwrap_or_default(),
214            last_checkpoint,
215            task: task.into_task(),
216            enabled_pipelines: if pipeline.is_empty() {
217                None
218            } else {
219                Some(pipeline.into_iter().collect())
220            },
221            added_pipelines: BTreeSet::new(),
222            first_ingestion_checkpoint: u64::MAX,
223            next_sequential_checkpoint: None,
224            pipelines: vec![],
225        })
226    }
227
228    /// The store used by the indexer.
229    pub fn store(&self) -> &S {
230        &self.store
231    }
232
233    /// The ingestion client used by the indexer to fetch checkpoints.
234    pub fn ingestion_client(&self) -> &IngestionClient {
235        self.ingestion_service.ingestion_client()
236    }
237
238    /// The indexer's metrics.
239    pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
240        &self.metrics
241    }
242
243    /// The ingestion service's metrics.
244    pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
245        self.ingestion_service.metrics()
246    }
247
248    /// The pipelines that this indexer will run.
249    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
250        self.added_pipelines.iter().copied().filter(|p| {
251            self.enabled_pipelines
252                .as_ref()
253                .is_none_or(|e| e.contains(*p))
254        })
255    }
256
257    /// The minimum next checkpoint across all sequential pipelines. This value is used to
258    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
259    /// too far ahead of sequential pipelines.
260    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
261        self.next_sequential_checkpoint
262    }
263
264    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
265    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
266    ///
267    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
268    /// keep the watermark table up-to-date with the highest point they can guarantee all data
269    /// exists for, for their pipeline.
270    pub async fn concurrent_pipeline<H>(
271        &mut self,
272        handler: H,
273        config: ConcurrentConfig,
274    ) -> Result<()>
275    where
276        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
277    {
278        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
279            return Ok(());
280        };
281
282        self.pipelines.push(concurrent::pipeline::<H>(
283            handler,
284            next_checkpoint,
285            config,
286            self.store.clone(),
287            self.task.clone(),
288            self.ingestion_service.subscribe().0,
289            self.metrics.clone(),
290        ));
291
292        Ok(())
293    }
294
295    /// Start ingesting checkpoints from `first_ingestion_checkpoint`. Individual pipelines
296    /// will start processing and committing once the ingestion service has caught up to their
297    /// respective watermarks.
298    ///
299    /// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
300    pub async fn run(self) -> Result<Service> {
301        if let Some(enabled_pipelines) = self.enabled_pipelines {
302            ensure!(
303                enabled_pipelines.is_empty(),
304                "Tried to enable pipelines that this indexer does not know about: \
305                {enabled_pipelines:#?}",
306            );
307        }
308
309        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
310
311        info!(self.first_ingestion_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
312
313        let mut service = self
314            .ingestion_service
315            .run(
316                self.first_ingestion_checkpoint..=last_checkpoint,
317                self.next_sequential_checkpoint,
318            )
319            .await
320            .context("Failed to start ingestion service")?;
321
322        for pipeline in self.pipelines {
323            service = service.merge(pipeline);
324        }
325
326        Ok(service)
327    }
328
329    /// Determine the checkpoint for the pipeline to resume processing from. This is either the
330    /// checkpoint after its watermark, or if that doesn't exist, then the provided
331    /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
332    ///
333    /// Update the starting ingestion checkpoint as the minimum across all the next checkpoints
334    /// calculated above.
335    ///
336    /// Returns `Ok(None)` if the pipeline is disabled.
337    async fn add_pipeline<P: Processor + 'static>(&mut self) -> Result<Option<u64>> {
338        ensure!(
339            self.added_pipelines.insert(P::NAME),
340            "Pipeline {:?} already added",
341            P::NAME,
342        );
343
344        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
345            && !enabled_pipelines.remove(P::NAME)
346        {
347            info!(pipeline = P::NAME, "Skipping");
348            return Ok(None);
349        }
350
351        let mut conn = self
352            .store
353            .connect()
354            .await
355            .context("Failed to establish connection to store")?;
356
357        let pipeline_task =
358            pipeline_task::<S>(P::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
359
360        let checkpoint_hi_inclusive = conn
361            .init_watermark(&pipeline_task, self.default_next_checkpoint)
362            .await
363            .with_context(|| format!("Failed to init watermark for {pipeline_task}"))?;
364
365        let next_checkpoint =
366            checkpoint_hi_inclusive.map_or(self.default_next_checkpoint, |c| c + 1);
367
368        self.first_ingestion_checkpoint = next_checkpoint.min(self.first_ingestion_checkpoint);
369
370        Ok(Some(next_checkpoint))
371    }
372}
373
374impl<T: TransactionalStore> Indexer<T> {
375    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
376    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
377    ///
378    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may be
379    /// required to handle pipelines that modify data in-place (where each update is not an insert,
380    /// but could be a modification of an existing row, where ordering between updates is
381    /// important).
382    ///
383    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
384    /// number of checkpoints (configured by `checkpoint_lag`).
385    pub async fn sequential_pipeline<H>(
386        &mut self,
387        handler: H,
388        config: SequentialConfig,
389    ) -> Result<()>
390    where
391        H: Handler<Store = T> + Send + Sync + 'static,
392    {
393        let Some(next_checkpoint) = self.add_pipeline::<H>().await? else {
394            return Ok(());
395        };
396
397        if self.task.is_some() {
398            bail!(
399                "Sequential pipelines do not support pipeline tasks. \
400                These pipelines guarantee that each checkpoint is committed exactly once and in order. \
401                Running the same pipeline under a different task would violate these guarantees."
402            );
403        }
404
405        // Track the minimum next_checkpoint across all sequential pipelines
406        self.next_sequential_checkpoint = Some(
407            self.next_sequential_checkpoint
408                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
409        );
410
411        let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
412
413        self.pipelines.push(sequential::pipeline::<H>(
414            handler,
415            next_checkpoint,
416            config,
417            self.store.clone(),
418            checkpoint_rx,
419            watermark_tx,
420            self.metrics.clone(),
421        ));
422
423        Ok(())
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use std::sync::Arc;
430
431    use async_trait::async_trait;
432    use clap::Parser;
433    use sui_indexer_alt_framework_store_traits::PrunerWatermark;
434    use sui_synthetic_ingestion::synthetic_ingestion;
435    use tokio::sync::watch;
436
437    use crate::FieldCount;
438    use crate::ingestion::ingestion_client::IngestionClientArgs;
439    use crate::mocks::store::MockStore;
440    use crate::pipeline::CommitterConfig;
441    use crate::pipeline::{Processor, concurrent::ConcurrentConfig};
442    use crate::store::CommitterWatermark;
443
444    use super::*;
445
446    #[allow(dead_code)]
447    #[derive(Clone, FieldCount)]
448    struct MockValue(u64);
449
450    /// A handler that can be controlled externally to block checkpoint processing.
451    struct ControllableHandler {
452        /// Process checkpoints less than or equal to this value.
453        process_below: watch::Receiver<u64>,
454    }
455
456    impl ControllableHandler {
457        fn with_limit(limit: u64) -> (Self, watch::Sender<u64>) {
458            let (tx, rx) = watch::channel(limit);
459            (Self { process_below: rx }, tx)
460        }
461    }
462
463    #[async_trait]
464    impl Processor for ControllableHandler {
465        const NAME: &'static str = "controllable";
466        /// The checkpoints to process come out of order. To account for potential flakiness in test
467        /// `test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo`, we set the FANOUT to
468        /// the total number of checkpoints to process, so we can ensure the correct checkpoints are
469        /// processed.
470        const FANOUT: usize = 501;
471        type Value = MockValue;
472
473        async fn process(
474            &self,
475            checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
476        ) -> anyhow::Result<Vec<Self::Value>> {
477            let cp_num = checkpoint.summary.sequence_number;
478
479            // Wait until the checkpoint is allowed to be processed
480            self.process_below
481                .clone()
482                .wait_for(|&limit| cp_num <= limit)
483                .await
484                .ok();
485
486            Ok(vec![MockValue(cp_num)])
487        }
488    }
489
490    #[async_trait]
491    impl crate::pipeline::concurrent::Handler for ControllableHandler {
492        type Store = MockStore;
493        type Batch = Vec<MockValue>;
494
495        fn batch(
496            &self,
497            batch: &mut Self::Batch,
498            values: &mut std::vec::IntoIter<Self::Value>,
499        ) -> crate::pipeline::concurrent::BatchStatus {
500            batch.extend(values);
501            crate::pipeline::concurrent::BatchStatus::Ready
502        }
503
504        async fn commit<'a>(
505            &self,
506            batch: &Self::Batch,
507            conn: &mut <Self::Store as Store>::Connection<'a>,
508        ) -> anyhow::Result<usize> {
509            for value in batch {
510                conn.0
511                    .commit_data(Self::NAME, value.0, vec![value.0])
512                    .await?;
513            }
514            Ok(batch.len())
515        }
516    }
517
518    macro_rules! test_pipeline {
519        ($handler:ident, $name:literal) => {
520            struct $handler;
521
522            #[async_trait]
523            impl Processor for $handler {
524                const NAME: &'static str = $name;
525                type Value = MockValue;
526                async fn process(
527                    &self,
528                    checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
529                ) -> anyhow::Result<Vec<Self::Value>> {
530                    Ok(vec![MockValue(checkpoint.summary.sequence_number)])
531                }
532            }
533
534            #[async_trait]
535            impl crate::pipeline::concurrent::Handler for $handler {
536                type Store = MockStore;
537                type Batch = Vec<Self::Value>;
538
539                fn batch(
540                    &self,
541                    batch: &mut Self::Batch,
542                    values: &mut std::vec::IntoIter<Self::Value>,
543                ) -> crate::pipeline::concurrent::BatchStatus {
544                    batch.extend(values);
545                    crate::pipeline::concurrent::BatchStatus::Pending
546                }
547
548                async fn commit<'a>(
549                    &self,
550                    batch: &Self::Batch,
551                    conn: &mut <Self::Store as Store>::Connection<'a>,
552                ) -> anyhow::Result<usize> {
553                    for value in batch {
554                        conn.0
555                            .commit_data(Self::NAME, value.0, vec![value.0])
556                            .await?;
557                    }
558                    Ok(batch.len())
559                }
560            }
561
562            #[async_trait]
563            impl crate::pipeline::sequential::Handler for $handler {
564                type Store = MockStore;
565                type Batch = Vec<Self::Value>;
566
567                fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
568                    batch.extend(values);
569                }
570
571                async fn commit<'a>(
572                    &self,
573                    _batch: &Self::Batch,
574                    _conn: &mut <Self::Store as Store>::Connection<'a>,
575                ) -> anyhow::Result<usize> {
576                    Ok(1)
577                }
578            }
579        };
580    }
581
582    test_pipeline!(MockHandler, "test_processor");
583    test_pipeline!(SequentialHandler, "sequential_handler");
584    test_pipeline!(MockCheckpointSequenceNumberHandler, "test");
585
586    async fn test_init_watermark(
587        first_checkpoint: Option<u64>,
588        is_concurrent: bool,
589    ) -> (Option<CommitterWatermark>, Option<PrunerWatermark>) {
590        let registry = Registry::new();
591        let store = MockStore::default();
592
593        test_pipeline!(A, "pipeline_name");
594
595        let mut conn = store.connect().await.unwrap();
596
597        let indexer_args = IndexerArgs {
598            first_checkpoint,
599            ..IndexerArgs::default()
600        };
601        let temp_dir = tempfile::tempdir().unwrap();
602        let client_args = ClientArgs {
603            ingestion: IngestionClientArgs {
604                local_ingestion_path: Some(temp_dir.path().to_owned()),
605                ..Default::default()
606            },
607            ..Default::default()
608        };
609        let ingestion_config = IngestionConfig::default();
610
611        let mut indexer = Indexer::new(
612            store.clone(),
613            indexer_args,
614            client_args,
615            ingestion_config,
616            None,
617            &registry,
618        )
619        .await
620        .unwrap();
621
622        if is_concurrent {
623            indexer
624                .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
625                .await
626                .unwrap();
627        } else {
628            indexer
629                .sequential_pipeline::<A>(A, SequentialConfig::default())
630                .await
631                .unwrap();
632        }
633
634        (
635            conn.committer_watermark(A::NAME).await.unwrap(),
636            conn.pruner_watermark(A::NAME, Duration::ZERO)
637                .await
638                .unwrap(),
639        )
640    }
641
642    #[test]
643    fn test_arg_parsing() {
644        #[derive(Parser)]
645        struct Args {
646            #[clap(flatten)]
647            indexer: IndexerArgs,
648        }
649
650        let args = Args::try_parse_from([
651            "cmd",
652            "--first-checkpoint",
653            "10",
654            "--last-checkpoint",
655            "100",
656            "--pipeline",
657            "a",
658            "--pipeline",
659            "b",
660            "--task",
661            "t",
662            "--reader-interval-ms",
663            "5000",
664        ])
665        .unwrap();
666
667        assert_eq!(args.indexer.first_checkpoint, Some(10));
668        assert_eq!(args.indexer.last_checkpoint, Some(100));
669        assert_eq!(args.indexer.pipeline, vec!["a", "b"]);
670        assert_eq!(args.indexer.task.task, Some("t".to_owned()));
671        assert_eq!(args.indexer.task.reader_interval_ms, Some(5000));
672    }
673
674    /// first_ingestion_checkpoint is smallest among existing watermarks + 1.
675    #[tokio::test]
676    async fn test_first_ingestion_checkpoint_all_pipelines_have_watermarks() {
677        let registry = Registry::new();
678        let store = MockStore::default();
679
680        test_pipeline!(A, "concurrent_a");
681        test_pipeline!(B, "concurrent_b");
682        test_pipeline!(C, "sequential_c");
683        test_pipeline!(D, "sequential_d");
684
685        let mut conn = store.connect().await.unwrap();
686        conn.set_committer_watermark(
687            A::NAME,
688            CommitterWatermark {
689                checkpoint_hi_inclusive: 100,
690                ..Default::default()
691            },
692        )
693        .await
694        .unwrap();
695        conn.set_committer_watermark(
696            B::NAME,
697            CommitterWatermark {
698                checkpoint_hi_inclusive: 10,
699                ..Default::default()
700            },
701        )
702        .await
703        .unwrap();
704        conn.set_committer_watermark(
705            C::NAME,
706            CommitterWatermark {
707                checkpoint_hi_inclusive: 1,
708                ..Default::default()
709            },
710        )
711        .await
712        .unwrap();
713        conn.set_committer_watermark(
714            D::NAME,
715            CommitterWatermark {
716                checkpoint_hi_inclusive: 50,
717                ..Default::default()
718            },
719        )
720        .await
721        .unwrap();
722
723        let indexer_args = IndexerArgs::default();
724        let temp_dir = tempfile::tempdir().unwrap();
725        let client_args = ClientArgs {
726            ingestion: IngestionClientArgs {
727                local_ingestion_path: Some(temp_dir.path().to_owned()),
728                ..Default::default()
729            },
730            ..Default::default()
731        };
732        let ingestion_config = IngestionConfig::default();
733
734        let mut indexer = Indexer::new(
735            store,
736            indexer_args,
737            client_args,
738            ingestion_config,
739            None,
740            &registry,
741        )
742        .await
743        .unwrap();
744
745        indexer
746            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
747            .await
748            .unwrap();
749        indexer
750            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
751            .await
752            .unwrap();
753        indexer
754            .sequential_pipeline::<C>(C, SequentialConfig::default())
755            .await
756            .unwrap();
757        indexer
758            .sequential_pipeline::<D>(D, SequentialConfig::default())
759            .await
760            .unwrap();
761
762        assert_eq!(indexer.first_ingestion_checkpoint, 2);
763    }
764
765    /// first_ingestion_checkpoint is 0 when at least one pipeline has no watermark.
766    #[tokio::test]
767    async fn test_first_ingestion_checkpoint_not_all_pipelines_have_watermarks() {
768        let registry = Registry::new();
769        let store = MockStore::default();
770
771        test_pipeline!(A, "concurrent_a");
772        test_pipeline!(B, "concurrent_b");
773        test_pipeline!(C, "sequential_c");
774        test_pipeline!(D, "sequential_d");
775
776        let mut conn = store.connect().await.unwrap();
777        conn.set_committer_watermark(
778            B::NAME,
779            CommitterWatermark {
780                checkpoint_hi_inclusive: 10,
781                ..Default::default()
782            },
783        )
784        .await
785        .unwrap();
786        conn.set_committer_watermark(
787            C::NAME,
788            CommitterWatermark {
789                checkpoint_hi_inclusive: 1,
790                ..Default::default()
791            },
792        )
793        .await
794        .unwrap();
795
796        let indexer_args = IndexerArgs::default();
797        let temp_dir = tempfile::tempdir().unwrap();
798        let client_args = ClientArgs {
799            ingestion: IngestionClientArgs {
800                local_ingestion_path: Some(temp_dir.path().to_owned()),
801                ..Default::default()
802            },
803            ..Default::default()
804        };
805        let ingestion_config = IngestionConfig::default();
806
807        let mut indexer = Indexer::new(
808            store,
809            indexer_args,
810            client_args,
811            ingestion_config,
812            None,
813            &registry,
814        )
815        .await
816        .unwrap();
817
818        indexer
819            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
820            .await
821            .unwrap();
822        indexer
823            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
824            .await
825            .unwrap();
826        indexer
827            .sequential_pipeline::<C>(C, SequentialConfig::default())
828            .await
829            .unwrap();
830        indexer
831            .sequential_pipeline::<D>(D, SequentialConfig::default())
832            .await
833            .unwrap();
834
835        assert_eq!(indexer.first_ingestion_checkpoint, 0);
836    }
837
838    /// first_ingestion_checkpoint is 1 when smallest committer watermark is 0.
839    #[tokio::test]
840    async fn test_first_ingestion_checkpoint_smallest_is_0() {
841        let registry = Registry::new();
842        let store = MockStore::default();
843
844        test_pipeline!(A, "concurrent_a");
845        test_pipeline!(B, "concurrent_b");
846        test_pipeline!(C, "sequential_c");
847        test_pipeline!(D, "sequential_d");
848
849        let mut conn = store.connect().await.unwrap();
850        conn.set_committer_watermark(
851            A::NAME,
852            CommitterWatermark {
853                checkpoint_hi_inclusive: 100,
854                ..Default::default()
855            },
856        )
857        .await
858        .unwrap();
859        conn.set_committer_watermark(
860            B::NAME,
861            CommitterWatermark {
862                checkpoint_hi_inclusive: 10,
863                ..Default::default()
864            },
865        )
866        .await
867        .unwrap();
868        conn.set_committer_watermark(
869            C::NAME,
870            CommitterWatermark {
871                checkpoint_hi_inclusive: 1,
872                ..Default::default()
873            },
874        )
875        .await
876        .unwrap();
877        conn.set_committer_watermark(D::NAME, CommitterWatermark::default())
878            .await
879            .unwrap();
880
881        let indexer_args = IndexerArgs::default();
882        let temp_dir = tempfile::tempdir().unwrap();
883        let client_args = ClientArgs {
884            ingestion: IngestionClientArgs {
885                local_ingestion_path: Some(temp_dir.path().to_owned()),
886                ..Default::default()
887            },
888            ..Default::default()
889        };
890        let ingestion_config = IngestionConfig::default();
891
892        let mut indexer = Indexer::new(
893            store,
894            indexer_args,
895            client_args,
896            ingestion_config,
897            None,
898            &registry,
899        )
900        .await
901        .unwrap();
902
903        indexer
904            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
905            .await
906            .unwrap();
907        indexer
908            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
909            .await
910            .unwrap();
911        indexer
912            .sequential_pipeline::<C>(C, SequentialConfig::default())
913            .await
914            .unwrap();
915        indexer
916            .sequential_pipeline::<D>(D, SequentialConfig::default())
917            .await
918            .unwrap();
919
920        assert_eq!(indexer.first_ingestion_checkpoint, 1);
921    }
922
923    /// first_ingestion_checkpoint is first_checkpoint when at least one pipeline has no
924    /// watermark, and first_checkpoint is smallest.
925    #[tokio::test]
926    async fn test_first_ingestion_checkpoint_first_checkpoint_and_no_watermark() {
927        let registry = Registry::new();
928        let store = MockStore::default();
929
930        test_pipeline!(A, "concurrent_a");
931        test_pipeline!(B, "concurrent_b");
932        test_pipeline!(C, "sequential_c");
933        test_pipeline!(D, "sequential_d");
934
935        let mut conn = store.connect().await.unwrap();
936        conn.set_committer_watermark(
937            B::NAME,
938            CommitterWatermark {
939                checkpoint_hi_inclusive: 50,
940                ..Default::default()
941            },
942        )
943        .await
944        .unwrap();
945        conn.set_committer_watermark(
946            C::NAME,
947            CommitterWatermark {
948                checkpoint_hi_inclusive: 10,
949                ..Default::default()
950            },
951        )
952        .await
953        .unwrap();
954
955        let indexer_args = IndexerArgs {
956            first_checkpoint: Some(5),
957            ..Default::default()
958        };
959        let temp_dir = tempfile::tempdir().unwrap();
960        let client_args = ClientArgs {
961            ingestion: IngestionClientArgs {
962                local_ingestion_path: Some(temp_dir.path().to_owned()),
963                ..Default::default()
964            },
965            ..Default::default()
966        };
967        let ingestion_config = IngestionConfig::default();
968
969        let mut indexer = Indexer::new(
970            store,
971            indexer_args,
972            client_args,
973            ingestion_config,
974            None,
975            &registry,
976        )
977        .await
978        .unwrap();
979
980        indexer
981            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
982            .await
983            .unwrap();
984        indexer
985            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
986            .await
987            .unwrap();
988        indexer
989            .sequential_pipeline::<C>(C, SequentialConfig::default())
990            .await
991            .unwrap();
992        indexer
993            .sequential_pipeline::<D>(D, SequentialConfig::default())
994            .await
995            .unwrap();
996
997        assert_eq!(indexer.first_ingestion_checkpoint, 5);
998    }
999
1000    /// first_ingestion_checkpoint is smallest among existing watermarks + 1 if
1001    /// first_checkpoint but all pipelines have watermarks (ignores first_checkpoint).
1002    #[tokio::test]
1003    async fn test_first_ingestion_checkpoint_ignore_first_checkpoint() {
1004        let registry = Registry::new();
1005        let store = MockStore::default();
1006
1007        test_pipeline!(B, "concurrent_b");
1008        test_pipeline!(C, "sequential_c");
1009
1010        let mut conn = store.connect().await.unwrap();
1011        conn.set_committer_watermark(
1012            B::NAME,
1013            CommitterWatermark {
1014                checkpoint_hi_inclusive: 50,
1015                ..Default::default()
1016            },
1017        )
1018        .await
1019        .unwrap();
1020        conn.set_committer_watermark(
1021            C::NAME,
1022            CommitterWatermark {
1023                checkpoint_hi_inclusive: 10,
1024                ..Default::default()
1025            },
1026        )
1027        .await
1028        .unwrap();
1029
1030        let indexer_args = IndexerArgs {
1031            first_checkpoint: Some(5),
1032            ..Default::default()
1033        };
1034        let temp_dir = tempfile::tempdir().unwrap();
1035        let client_args = ClientArgs {
1036            ingestion: IngestionClientArgs {
1037                local_ingestion_path: Some(temp_dir.path().to_owned()),
1038                ..Default::default()
1039            },
1040            ..Default::default()
1041        };
1042        let ingestion_config = IngestionConfig::default();
1043
1044        let mut indexer = Indexer::new(
1045            store,
1046            indexer_args,
1047            client_args,
1048            ingestion_config,
1049            None,
1050            &registry,
1051        )
1052        .await
1053        .unwrap();
1054
1055        indexer
1056            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1057            .await
1058            .unwrap();
1059        indexer
1060            .sequential_pipeline::<C>(C, SequentialConfig::default())
1061            .await
1062            .unwrap();
1063
1064        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1065    }
1066
1067    /// If the first_checkpoint is being considered, because pipelines are missing watermarks, it
1068    /// will not be used as the starting point if it is not the smallest valid committer watermark
1069    /// to resume ingesting from.
1070    #[tokio::test]
1071    async fn test_first_ingestion_checkpoint_large_first_checkpoint() {
1072        let registry = Registry::new();
1073        let store = MockStore::default();
1074
1075        test_pipeline!(A, "concurrent_a");
1076        test_pipeline!(B, "concurrent_b");
1077        test_pipeline!(C, "sequential_c");
1078
1079        let mut conn = store.connect().await.unwrap();
1080        conn.set_committer_watermark(
1081            B::NAME,
1082            CommitterWatermark {
1083                checkpoint_hi_inclusive: 50,
1084                ..Default::default()
1085            },
1086        )
1087        .await
1088        .unwrap();
1089        conn.set_committer_watermark(
1090            C::NAME,
1091            CommitterWatermark {
1092                checkpoint_hi_inclusive: 10,
1093                ..Default::default()
1094            },
1095        )
1096        .await
1097        .unwrap();
1098
1099        let indexer_args = IndexerArgs {
1100            first_checkpoint: Some(24),
1101            ..Default::default()
1102        };
1103        let temp_dir = tempfile::tempdir().unwrap();
1104        let client_args = ClientArgs {
1105            ingestion: IngestionClientArgs {
1106                local_ingestion_path: Some(temp_dir.path().to_owned()),
1107                ..Default::default()
1108            },
1109            ..Default::default()
1110        };
1111        let ingestion_config = IngestionConfig::default();
1112
1113        let mut indexer = Indexer::new(
1114            store,
1115            indexer_args,
1116            client_args,
1117            ingestion_config,
1118            None,
1119            &registry,
1120        )
1121        .await
1122        .unwrap();
1123
1124        indexer
1125            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1126            .await
1127            .unwrap();
1128
1129        indexer
1130            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1131            .await
1132            .unwrap();
1133
1134        indexer
1135            .sequential_pipeline::<C>(C, SequentialConfig::default())
1136            .await
1137            .unwrap();
1138
1139        assert_eq!(indexer.first_ingestion_checkpoint, 11);
1140    }
1141
1142    // test ingestion, all pipelines have watermarks, no first_checkpoint provided
1143    #[tokio::test]
1144    async fn test_indexer_ingestion_existing_watermarks_no_first_checkpoint() {
1145        let registry = Registry::new();
1146        let store = MockStore::default();
1147
1148        test_pipeline!(A, "concurrent_a");
1149        test_pipeline!(B, "concurrent_b");
1150        test_pipeline!(C, "sequential_c");
1151        test_pipeline!(D, "sequential_d");
1152
1153        let mut conn = store.connect().await.unwrap();
1154        conn.set_committer_watermark(
1155            A::NAME,
1156            CommitterWatermark {
1157                checkpoint_hi_inclusive: 5,
1158                ..Default::default()
1159            },
1160        )
1161        .await
1162        .unwrap();
1163        conn.set_committer_watermark(
1164            B::NAME,
1165            CommitterWatermark {
1166                checkpoint_hi_inclusive: 10,
1167                ..Default::default()
1168            },
1169        )
1170        .await
1171        .unwrap();
1172        conn.set_committer_watermark(
1173            C::NAME,
1174            CommitterWatermark {
1175                checkpoint_hi_inclusive: 15,
1176                ..Default::default()
1177            },
1178        )
1179        .await
1180        .unwrap();
1181        conn.set_committer_watermark(
1182            D::NAME,
1183            CommitterWatermark {
1184                checkpoint_hi_inclusive: 20,
1185                ..Default::default()
1186            },
1187        )
1188        .await
1189        .unwrap();
1190
1191        // Create synthetic ingestion data
1192        let temp_dir = tempfile::tempdir().unwrap();
1193        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1194            ingestion_dir: temp_dir.path().to_owned(),
1195            starting_checkpoint: 5,
1196            num_checkpoints: 25,
1197            checkpoint_size: 1,
1198        })
1199        .await;
1200
1201        let indexer_args = IndexerArgs {
1202            last_checkpoint: Some(29),
1203            ..Default::default()
1204        };
1205
1206        let client_args = ClientArgs {
1207            ingestion: IngestionClientArgs {
1208                local_ingestion_path: Some(temp_dir.path().to_owned()),
1209                ..Default::default()
1210            },
1211            ..Default::default()
1212        };
1213
1214        let ingestion_config = IngestionConfig::default();
1215
1216        let mut indexer = Indexer::new(
1217            store.clone(),
1218            indexer_args,
1219            client_args,
1220            ingestion_config,
1221            None,
1222            &registry,
1223        )
1224        .await
1225        .unwrap();
1226
1227        indexer
1228            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1229            .await
1230            .unwrap();
1231        indexer
1232            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1233            .await
1234            .unwrap();
1235        indexer
1236            .sequential_pipeline::<C>(C, SequentialConfig::default())
1237            .await
1238            .unwrap();
1239        indexer
1240            .sequential_pipeline::<D>(D, SequentialConfig::default())
1241            .await
1242            .unwrap();
1243
1244        let ingestion_metrics = indexer.ingestion_metrics().clone();
1245        let indexer_metrics = indexer.indexer_metrics().clone();
1246
1247        indexer.run().await.unwrap().join().await.unwrap();
1248
1249        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1250        assert_eq!(
1251            indexer_metrics
1252                .total_watermarks_out_of_order
1253                .get_metric_with_label_values(&[A::NAME])
1254                .unwrap()
1255                .get(),
1256            0
1257        );
1258        assert_eq!(
1259            indexer_metrics
1260                .total_watermarks_out_of_order
1261                .get_metric_with_label_values(&[B::NAME])
1262                .unwrap()
1263                .get(),
1264            5
1265        );
1266        assert_eq!(
1267            indexer_metrics
1268                .total_watermarks_out_of_order
1269                .get_metric_with_label_values(&[C::NAME])
1270                .unwrap()
1271                .get(),
1272            10
1273        );
1274        assert_eq!(
1275            indexer_metrics
1276                .total_watermarks_out_of_order
1277                .get_metric_with_label_values(&[D::NAME])
1278                .unwrap()
1279                .get(),
1280            15
1281        );
1282    }
1283
1284    // test ingestion, no pipelines missing watermarks, first_checkpoint provided
1285    #[tokio::test]
1286    async fn test_indexer_ingestion_existing_watermarks_ignore_first_checkpoint() {
1287        let registry = Registry::new();
1288        let store = MockStore::default();
1289
1290        test_pipeline!(A, "concurrent_a");
1291        test_pipeline!(B, "concurrent_b");
1292        test_pipeline!(C, "sequential_c");
1293        test_pipeline!(D, "sequential_d");
1294
1295        let mut conn = store.connect().await.unwrap();
1296        conn.set_committer_watermark(
1297            A::NAME,
1298            CommitterWatermark {
1299                checkpoint_hi_inclusive: 5,
1300                ..Default::default()
1301            },
1302        )
1303        .await
1304        .unwrap();
1305        conn.set_committer_watermark(
1306            B::NAME,
1307            CommitterWatermark {
1308                checkpoint_hi_inclusive: 10,
1309                ..Default::default()
1310            },
1311        )
1312        .await
1313        .unwrap();
1314        conn.set_committer_watermark(
1315            C::NAME,
1316            CommitterWatermark {
1317                checkpoint_hi_inclusive: 15,
1318                ..Default::default()
1319            },
1320        )
1321        .await
1322        .unwrap();
1323        conn.set_committer_watermark(
1324            D::NAME,
1325            CommitterWatermark {
1326                checkpoint_hi_inclusive: 20,
1327                ..Default::default()
1328            },
1329        )
1330        .await
1331        .unwrap();
1332
1333        // Create synthetic ingestion data
1334        let temp_dir = tempfile::tempdir().unwrap();
1335        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1336            ingestion_dir: temp_dir.path().to_owned(),
1337            starting_checkpoint: 5,
1338            num_checkpoints: 25,
1339            checkpoint_size: 1,
1340        })
1341        .await;
1342
1343        let indexer_args = IndexerArgs {
1344            first_checkpoint: Some(3),
1345            last_checkpoint: Some(29),
1346            ..Default::default()
1347        };
1348
1349        let client_args = ClientArgs {
1350            ingestion: IngestionClientArgs {
1351                local_ingestion_path: Some(temp_dir.path().to_owned()),
1352                ..Default::default()
1353            },
1354            ..Default::default()
1355        };
1356
1357        let ingestion_config = IngestionConfig::default();
1358
1359        let mut indexer = Indexer::new(
1360            store.clone(),
1361            indexer_args,
1362            client_args,
1363            ingestion_config,
1364            None,
1365            &registry,
1366        )
1367        .await
1368        .unwrap();
1369
1370        indexer
1371            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1372            .await
1373            .unwrap();
1374        indexer
1375            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1376            .await
1377            .unwrap();
1378        indexer
1379            .sequential_pipeline::<C>(C, SequentialConfig::default())
1380            .await
1381            .unwrap();
1382        indexer
1383            .sequential_pipeline::<D>(D, SequentialConfig::default())
1384            .await
1385            .unwrap();
1386
1387        let ingestion_metrics = indexer.ingestion_metrics().clone();
1388        let metrics = indexer.indexer_metrics().clone();
1389        indexer.run().await.unwrap().join().await.unwrap();
1390
1391        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 24);
1392        assert_eq!(
1393            metrics
1394                .total_watermarks_out_of_order
1395                .get_metric_with_label_values(&[A::NAME])
1396                .unwrap()
1397                .get(),
1398            0
1399        );
1400        assert_eq!(
1401            metrics
1402                .total_watermarks_out_of_order
1403                .get_metric_with_label_values(&[B::NAME])
1404                .unwrap()
1405                .get(),
1406            5
1407        );
1408        assert_eq!(
1409            metrics
1410                .total_watermarks_out_of_order
1411                .get_metric_with_label_values(&[C::NAME])
1412                .unwrap()
1413                .get(),
1414            10
1415        );
1416        assert_eq!(
1417            metrics
1418                .total_watermarks_out_of_order
1419                .get_metric_with_label_values(&[D::NAME])
1420                .unwrap()
1421                .get(),
1422            15
1423        );
1424    }
1425
1426    // test ingestion, some pipelines missing watermarks, no first_checkpoint provided
1427    #[tokio::test]
1428    async fn test_indexer_ingestion_missing_watermarks_no_first_checkpoint() {
1429        let registry = Registry::new();
1430        let store = MockStore::default();
1431
1432        test_pipeline!(A, "concurrent_a");
1433        test_pipeline!(B, "concurrent_b");
1434        test_pipeline!(C, "sequential_c");
1435        test_pipeline!(D, "sequential_d");
1436
1437        let mut conn = store.connect().await.unwrap();
1438        conn.set_committer_watermark(
1439            B::NAME,
1440            CommitterWatermark {
1441                checkpoint_hi_inclusive: 10,
1442                ..Default::default()
1443            },
1444        )
1445        .await
1446        .unwrap();
1447        conn.set_committer_watermark(
1448            C::NAME,
1449            CommitterWatermark {
1450                checkpoint_hi_inclusive: 15,
1451                ..Default::default()
1452            },
1453        )
1454        .await
1455        .unwrap();
1456        conn.set_committer_watermark(
1457            D::NAME,
1458            CommitterWatermark {
1459                checkpoint_hi_inclusive: 20,
1460                ..Default::default()
1461            },
1462        )
1463        .await
1464        .unwrap();
1465
1466        // Create synthetic ingestion data
1467        let temp_dir = tempfile::tempdir().unwrap();
1468        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1469            ingestion_dir: temp_dir.path().to_owned(),
1470            starting_checkpoint: 0,
1471            num_checkpoints: 30,
1472            checkpoint_size: 1,
1473        })
1474        .await;
1475
1476        let indexer_args = IndexerArgs {
1477            last_checkpoint: Some(29),
1478            ..Default::default()
1479        };
1480
1481        let client_args = ClientArgs {
1482            ingestion: IngestionClientArgs {
1483                local_ingestion_path: Some(temp_dir.path().to_owned()),
1484                ..Default::default()
1485            },
1486            ..Default::default()
1487        };
1488
1489        let ingestion_config = IngestionConfig::default();
1490
1491        let mut indexer = Indexer::new(
1492            store.clone(),
1493            indexer_args,
1494            client_args,
1495            ingestion_config,
1496            None,
1497            &registry,
1498        )
1499        .await
1500        .unwrap();
1501
1502        indexer
1503            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1504            .await
1505            .unwrap();
1506        indexer
1507            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1508            .await
1509            .unwrap();
1510        indexer
1511            .sequential_pipeline::<C>(C, SequentialConfig::default())
1512            .await
1513            .unwrap();
1514        indexer
1515            .sequential_pipeline::<D>(D, SequentialConfig::default())
1516            .await
1517            .unwrap();
1518
1519        let ingestion_metrics = indexer.ingestion_metrics().clone();
1520        let metrics = indexer.indexer_metrics().clone();
1521        indexer.run().await.unwrap().join().await.unwrap();
1522
1523        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 30);
1524        assert_eq!(
1525            metrics
1526                .total_watermarks_out_of_order
1527                .get_metric_with_label_values(&[A::NAME])
1528                .unwrap()
1529                .get(),
1530            0
1531        );
1532        assert_eq!(
1533            metrics
1534                .total_watermarks_out_of_order
1535                .get_metric_with_label_values(&[B::NAME])
1536                .unwrap()
1537                .get(),
1538            11
1539        );
1540        assert_eq!(
1541            metrics
1542                .total_watermarks_out_of_order
1543                .get_metric_with_label_values(&[C::NAME])
1544                .unwrap()
1545                .get(),
1546            16
1547        );
1548        assert_eq!(
1549            metrics
1550                .total_watermarks_out_of_order
1551                .get_metric_with_label_values(&[D::NAME])
1552                .unwrap()
1553                .get(),
1554            21
1555        );
1556    }
1557
1558    // test ingestion, some pipelines missing watermarks, use first_checkpoint
1559    #[tokio::test]
1560    async fn test_indexer_ingestion_use_first_checkpoint() {
1561        let registry = Registry::new();
1562        let store = MockStore::default();
1563
1564        test_pipeline!(A, "concurrent_a");
1565        test_pipeline!(B, "concurrent_b");
1566        test_pipeline!(C, "sequential_c");
1567        test_pipeline!(D, "sequential_d");
1568
1569        let mut conn = store.connect().await.unwrap();
1570        conn.set_committer_watermark(
1571            B::NAME,
1572            CommitterWatermark {
1573                checkpoint_hi_inclusive: 10,
1574                ..Default::default()
1575            },
1576        )
1577        .await
1578        .unwrap();
1579        conn.set_committer_watermark(
1580            C::NAME,
1581            CommitterWatermark {
1582                checkpoint_hi_inclusive: 15,
1583                ..Default::default()
1584            },
1585        )
1586        .await
1587        .unwrap();
1588        conn.set_committer_watermark(
1589            D::NAME,
1590            CommitterWatermark {
1591                checkpoint_hi_inclusive: 20,
1592                ..Default::default()
1593            },
1594        )
1595        .await
1596        .unwrap();
1597
1598        // Create synthetic ingestion data
1599        let temp_dir = tempfile::tempdir().unwrap();
1600        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1601            ingestion_dir: temp_dir.path().to_owned(),
1602            starting_checkpoint: 5,
1603            num_checkpoints: 25,
1604            checkpoint_size: 1,
1605        })
1606        .await;
1607
1608        let indexer_args = IndexerArgs {
1609            first_checkpoint: Some(10),
1610            last_checkpoint: Some(29),
1611            ..Default::default()
1612        };
1613
1614        let client_args = ClientArgs {
1615            ingestion: IngestionClientArgs {
1616                local_ingestion_path: Some(temp_dir.path().to_owned()),
1617                ..Default::default()
1618            },
1619            ..Default::default()
1620        };
1621
1622        let ingestion_config = IngestionConfig::default();
1623
1624        let mut indexer = Indexer::new(
1625            store.clone(),
1626            indexer_args,
1627            client_args,
1628            ingestion_config,
1629            None,
1630            &registry,
1631        )
1632        .await
1633        .unwrap();
1634
1635        indexer
1636            .concurrent_pipeline::<A>(A, ConcurrentConfig::default())
1637            .await
1638            .unwrap();
1639        indexer
1640            .concurrent_pipeline::<B>(B, ConcurrentConfig::default())
1641            .await
1642            .unwrap();
1643        indexer
1644            .sequential_pipeline::<C>(C, SequentialConfig::default())
1645            .await
1646            .unwrap();
1647        indexer
1648            .sequential_pipeline::<D>(D, SequentialConfig::default())
1649            .await
1650            .unwrap();
1651
1652        let ingestion_metrics = indexer.ingestion_metrics().clone();
1653        let metrics = indexer.indexer_metrics().clone();
1654        indexer.run().await.unwrap().join().await.unwrap();
1655
1656        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 20);
1657        assert_eq!(
1658            metrics
1659                .total_watermarks_out_of_order
1660                .get_metric_with_label_values(&[A::NAME])
1661                .unwrap()
1662                .get(),
1663            0
1664        );
1665        assert_eq!(
1666            metrics
1667                .total_watermarks_out_of_order
1668                .get_metric_with_label_values(&[B::NAME])
1669                .unwrap()
1670                .get(),
1671            1
1672        );
1673        assert_eq!(
1674            metrics
1675                .total_watermarks_out_of_order
1676                .get_metric_with_label_values(&[C::NAME])
1677                .unwrap()
1678                .get(),
1679            6
1680        );
1681        assert_eq!(
1682            metrics
1683                .total_watermarks_out_of_order
1684                .get_metric_with_label_values(&[D::NAME])
1685                .unwrap()
1686                .get(),
1687            11
1688        );
1689    }
1690
1691    #[tokio::test]
1692    async fn test_init_watermark_concurrent_no_first_checkpoint() {
1693        let (committer_watermark, pruner_watermark) = test_init_watermark(None, true).await;
1694        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1695        assert_eq!(committer_watermark, None);
1696        assert_eq!(pruner_watermark, None);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_init_watermark_concurrent_first_checkpoint_0() {
1701        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(0), true).await;
1702        // Indexer will not init the watermark, pipeline tasks will write commit watermarks as normal.
1703        assert_eq!(committer_watermark, None);
1704        assert_eq!(pruner_watermark, None);
1705    }
1706
1707    #[tokio::test]
1708    async fn test_init_watermark_concurrent_first_checkpoint_1() {
1709        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), true).await;
1710
1711        let committer_watermark = committer_watermark.unwrap();
1712        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1713
1714        let pruner_watermark = pruner_watermark.unwrap();
1715        assert_eq!(pruner_watermark.reader_lo, 1);
1716        assert_eq!(pruner_watermark.pruner_hi, 1);
1717    }
1718
1719    #[tokio::test]
1720    async fn test_init_watermark_sequential() {
1721        let (committer_watermark, pruner_watermark) = test_init_watermark(Some(1), false).await;
1722
1723        let committer_watermark = committer_watermark.unwrap();
1724        assert_eq!(committer_watermark.checkpoint_hi_inclusive, 0);
1725
1726        let pruner_watermark = pruner_watermark.unwrap();
1727        assert_eq!(pruner_watermark.reader_lo, 1);
1728        assert_eq!(pruner_watermark.pruner_hi, 1);
1729    }
1730
1731    #[tokio::test]
1732    async fn test_multiple_sequential_pipelines_next_checkpoint() {
1733        let registry = Registry::new();
1734        let store = MockStore::default();
1735
1736        // Set up different watermarks for three different sequential pipelines
1737        let mut conn = store.connect().await.unwrap();
1738
1739        // First handler at checkpoint 10
1740        conn.set_committer_watermark(
1741            MockHandler::NAME,
1742            CommitterWatermark {
1743                epoch_hi_inclusive: 0,
1744                checkpoint_hi_inclusive: 10,
1745                tx_hi: 20,
1746                timestamp_ms_hi_inclusive: 10000,
1747            },
1748        )
1749        .await
1750        .unwrap();
1751
1752        // SequentialHandler at checkpoint 5
1753        conn.set_committer_watermark(
1754            SequentialHandler::NAME,
1755            CommitterWatermark {
1756                epoch_hi_inclusive: 0,
1757                checkpoint_hi_inclusive: 5,
1758                tx_hi: 10,
1759                timestamp_ms_hi_inclusive: 5000,
1760            },
1761        )
1762        .await
1763        .unwrap();
1764
1765        // Create synthetic ingestion data
1766        let temp_dir = tempfile::tempdir().unwrap();
1767        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1768            ingestion_dir: temp_dir.path().to_owned(),
1769            starting_checkpoint: 0,
1770            num_checkpoints: 20,
1771            checkpoint_size: 2,
1772        })
1773        .await;
1774
1775        let indexer_args = IndexerArgs {
1776            first_checkpoint: None,
1777            last_checkpoint: Some(19),
1778            pipeline: vec![],
1779            ..Default::default()
1780        };
1781
1782        let client_args = ClientArgs {
1783            ingestion: IngestionClientArgs {
1784                local_ingestion_path: Some(temp_dir.path().to_owned()),
1785                ..Default::default()
1786            },
1787            ..Default::default()
1788        };
1789
1790        let ingestion_config = IngestionConfig::default();
1791
1792        let mut indexer = Indexer::new(
1793            store.clone(),
1794            indexer_args,
1795            client_args,
1796            ingestion_config,
1797            None,
1798            &registry,
1799        )
1800        .await
1801        .unwrap();
1802
1803        // Add first sequential pipeline
1804        indexer
1805            .sequential_pipeline(
1806                MockHandler,
1807                pipeline::sequential::SequentialConfig::default(),
1808            )
1809            .await
1810            .unwrap();
1811
1812        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1813        assert_eq!(
1814            indexer.next_sequential_checkpoint(),
1815            Some(11),
1816            "next_sequential_checkpoint should be 11"
1817        );
1818
1819        // Add second sequential pipeline
1820        indexer
1821            .sequential_pipeline(
1822                SequentialHandler,
1823                pipeline::sequential::SequentialConfig::default(),
1824            )
1825            .await
1826            .unwrap();
1827
1828        // Should change to 6 (minimum of 6 and 11)
1829        assert_eq!(
1830            indexer.next_sequential_checkpoint(),
1831            Some(6),
1832            "next_sequential_checkpoint should still be 6"
1833        );
1834
1835        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1836        indexer.run().await.unwrap().join().await.unwrap();
1837
1838        // Verify each pipeline made some progress independently
1839        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1840        let watermark2 = conn
1841            .committer_watermark(SequentialHandler::NAME)
1842            .await
1843            .unwrap();
1844
1845        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1846        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1847    }
1848
1849    /// When a tasked indexer is initialized such that a tasked pipeline is run with a
1850    /// `first_checkpoint` less than the main pipeline's reader_lo, the indexer will correctly skip
1851    /// committing checkpoints less than the main pipeline's reader watermark.
1852    #[tokio::test]
1853    async fn test_tasked_pipelines_ignore_below_main_reader_lo() {
1854        let registry = Registry::new();
1855        let store = MockStore::default();
1856
1857        // Mock the store as if we have a main pipeline with a committer watermark at `10` and a
1858        // reader watermark at `7`.
1859        let mut conn = store.connect().await.unwrap();
1860        conn.set_committer_watermark(
1861            MockCheckpointSequenceNumberHandler::NAME,
1862            CommitterWatermark {
1863                checkpoint_hi_inclusive: 10,
1864                ..Default::default()
1865            },
1866        )
1867        .await
1868        .unwrap();
1869        conn.set_reader_watermark(MockCheckpointSequenceNumberHandler::NAME, 7)
1870            .await
1871            .unwrap();
1872
1873        // Start a tasked indexer that will ingest from checkpoint 0. Checkpoints 0 through 6 should
1874        // be ignored by the tasked indexer.
1875        let indexer_args = IndexerArgs {
1876            first_checkpoint: Some(0),
1877            last_checkpoint: Some(15),
1878            pipeline: vec![],
1879            task: TaskArgs::tasked("task".to_string(), 10),
1880        };
1881        let temp_dir = tempfile::tempdir().unwrap();
1882        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1883            ingestion_dir: temp_dir.path().to_owned(),
1884            starting_checkpoint: 0,
1885            num_checkpoints: 16,
1886            checkpoint_size: 2,
1887        })
1888        .await;
1889
1890        let client_args = ClientArgs {
1891            ingestion: IngestionClientArgs {
1892                local_ingestion_path: Some(temp_dir.path().to_owned()),
1893                ..Default::default()
1894            },
1895            ..Default::default()
1896        };
1897
1898        let ingestion_config = IngestionConfig::default();
1899
1900        let mut tasked_indexer = Indexer::new(
1901            store.clone(),
1902            indexer_args,
1903            client_args,
1904            ingestion_config,
1905            None,
1906            &registry,
1907        )
1908        .await
1909        .unwrap();
1910
1911        let _ = tasked_indexer
1912            .concurrent_pipeline(
1913                MockCheckpointSequenceNumberHandler,
1914                ConcurrentConfig::default(),
1915            )
1916            .await;
1917
1918        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
1919        let metrics = tasked_indexer.indexer_metrics().clone();
1920
1921        tasked_indexer.run().await.unwrap().join().await.unwrap();
1922
1923        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 16);
1924        assert_eq!(
1925            metrics
1926                .total_collector_skipped_checkpoints
1927                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
1928                .unwrap()
1929                .get(),
1930            7
1931        );
1932        let data = store
1933            .data
1934            .get(MockCheckpointSequenceNumberHandler::NAME)
1935            .unwrap();
1936        assert_eq!(data.len(), 9);
1937        for i in 0..7 {
1938            assert!(data.get(&i).is_none());
1939        }
1940        for i in 7..16 {
1941            assert!(data.get(&i).is_some());
1942        }
1943    }
1944
1945    /// Tasked pipelines can run ahead of the main pipeline's committer watermark.
1946    #[tokio::test]
1947    async fn test_tasked_pipelines_surpass_main_pipeline_committer_hi() {
1948        let registry = Registry::new();
1949        let store = MockStore::default();
1950
1951        let mut conn = store.connect().await.unwrap();
1952        conn.set_committer_watermark(
1953            "test",
1954            CommitterWatermark {
1955                checkpoint_hi_inclusive: 10,
1956                ..Default::default()
1957            },
1958        )
1959        .await
1960        .unwrap();
1961        conn.set_reader_watermark("test", 5).await.unwrap();
1962
1963        // Start a tasked indexer that will ingest from checkpoint 9 and go past the main pipeline's
1964        // watermarks.
1965        let indexer_args = IndexerArgs {
1966            first_checkpoint: Some(9),
1967            last_checkpoint: Some(25),
1968            pipeline: vec![],
1969            task: TaskArgs::tasked("task".to_string(), 10),
1970        };
1971        let temp_dir = tempfile::tempdir().unwrap();
1972        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1973            ingestion_dir: temp_dir.path().to_owned(),
1974            starting_checkpoint: 9,
1975            num_checkpoints: 17,
1976            checkpoint_size: 2,
1977        })
1978        .await;
1979
1980        let client_args = ClientArgs {
1981            ingestion: IngestionClientArgs {
1982                local_ingestion_path: Some(temp_dir.path().to_owned()),
1983                ..Default::default()
1984            },
1985            ..Default::default()
1986        };
1987
1988        let ingestion_config = IngestionConfig::default();
1989
1990        let mut tasked_indexer = Indexer::new(
1991            store.clone(),
1992            indexer_args,
1993            client_args,
1994            ingestion_config,
1995            None,
1996            &registry,
1997        )
1998        .await
1999        .unwrap();
2000
2001        let _ = tasked_indexer
2002            .concurrent_pipeline(
2003                MockCheckpointSequenceNumberHandler,
2004                ConcurrentConfig::default(),
2005            )
2006            .await;
2007
2008        let ingestion_metrics = tasked_indexer.ingestion_metrics().clone();
2009        let metrics = tasked_indexer.indexer_metrics().clone();
2010
2011        tasked_indexer.run().await.unwrap().join().await.unwrap();
2012
2013        assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 17);
2014        assert_eq!(
2015            metrics
2016                .total_watermarks_out_of_order
2017                .get_metric_with_label_values(&["test"])
2018                .unwrap()
2019                .get(),
2020            0
2021        );
2022        assert_eq!(
2023            metrics
2024                .total_collector_skipped_checkpoints
2025                .get_metric_with_label_values(&[MockCheckpointSequenceNumberHandler::NAME])
2026                .unwrap()
2027                .get(),
2028            0
2029        );
2030
2031        let data = store.data.get("test").unwrap();
2032        assert!(data.len() == 17);
2033        for i in 0..9 {
2034            assert!(data.get(&i).is_none());
2035        }
2036        for i in 9..26 {
2037            assert!(data.get(&i).is_some());
2038        }
2039        let main_pipeline_watermark = store.watermark("test").unwrap();
2040        // assert that the main pipeline's watermarks are not updated
2041        assert_eq!(main_pipeline_watermark.checkpoint_hi_inclusive, 10);
2042        assert_eq!(main_pipeline_watermark.reader_lo, 5);
2043        let tasked_pipeline_watermark = store.watermark("test@task").unwrap();
2044        assert_eq!(tasked_pipeline_watermark.checkpoint_hi_inclusive, 25);
2045        assert_eq!(tasked_pipeline_watermark.reader_lo, 9);
2046    }
2047
2048    /// Test that when the collector observes `reader_lo = X`, that all checkpoints >= X will be
2049    /// committed, and any checkpoints inflight < X will be skipped.
2050    #[tokio::test]
2051    async fn test_tasked_pipelines_skip_checkpoints_trailing_main_reader_lo() {
2052        let registry = Registry::new();
2053        let store = MockStore::default();
2054        let mut conn = store.connect().await.unwrap();
2055        // Set the main pipeline watermark.
2056        conn.set_committer_watermark(
2057            ControllableHandler::NAME,
2058            CommitterWatermark {
2059                checkpoint_hi_inclusive: 11,
2060                ..Default::default()
2061            },
2062        )
2063        .await
2064        .unwrap();
2065
2066        // Generate 500 checkpoints upfront, for the indexer to process all at once.
2067        let temp_dir = tempfile::tempdir().unwrap();
2068        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
2069            ingestion_dir: temp_dir.path().to_owned(),
2070            starting_checkpoint: 0,
2071            num_checkpoints: 501,
2072            checkpoint_size: 2,
2073        })
2074        .await;
2075        let indexer_args = IndexerArgs {
2076            first_checkpoint: Some(0),
2077            last_checkpoint: Some(500),
2078            pipeline: vec![],
2079            task: TaskArgs::tasked("task".to_string(), 10 /* reader_interval_ms */),
2080        };
2081        let client_args = ClientArgs {
2082            ingestion: IngestionClientArgs {
2083                local_ingestion_path: Some(temp_dir.path().to_owned()),
2084                ..Default::default()
2085            },
2086            ..Default::default()
2087        };
2088        let ingestion_config = IngestionConfig::default();
2089        let mut tasked_indexer = Indexer::new(
2090            store.clone(),
2091            indexer_args,
2092            client_args,
2093            ingestion_config,
2094            None,
2095            &registry,
2096        )
2097        .await
2098        .unwrap();
2099        let mut allow_process = 10;
2100        // Limit the pipeline to process only checkpoints `[0, 10]`.
2101        let (controllable_handler, process_below) = ControllableHandler::with_limit(allow_process);
2102        let _ = tasked_indexer
2103            .concurrent_pipeline(
2104                controllable_handler,
2105                ConcurrentConfig {
2106                    committer: CommitterConfig {
2107                        collect_interval_ms: 10,
2108                        watermark_interval_ms: 10,
2109                        ..Default::default()
2110                    },
2111                    ..Default::default()
2112                },
2113            )
2114            .await;
2115        let metrics = tasked_indexer.indexer_metrics().clone();
2116
2117        let mut s_indexer = tasked_indexer.run().await.unwrap();
2118
2119        // Wait for pipeline to commit up to configured checkpoint 10 inclusive. With the main
2120        // pipeline `reader_lo` currently unset, all checkpoints are allowed and should be
2121        // committed.
2122        store
2123            .wait_for_watermark(
2124                &pipeline_task::<MockStore>(ControllableHandler::NAME, Some("task")).unwrap(),
2125                10,
2126                std::time::Duration::from_secs(10),
2127            )
2128            .await;
2129
2130        // Set the reader_lo to 250, simulating the main pipeline getting ahead. The
2131        // track_main_reader_lo task will eventually pick this up and update the atomic. The
2132        // collector reads from the atomic when it receives checkpoints, so we release checkpoints
2133        // one at a time until the collector_reader_lo metric shows the new value.
2134        conn.set_reader_watermark(ControllableHandler::NAME, 250)
2135            .await
2136            .unwrap();
2137
2138        let reader_lo = metrics
2139            .collector_reader_lo
2140            .with_label_values(&[ControllableHandler::NAME]);
2141
2142        // Send checkpoints one at a time at 10ms intervals. The tasked indexer has a reader refresh
2143        // interval of 10ms as well, so the collector should pick up the new reader_lo after a few
2144        // checkpoints have been processed.
2145        let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
2146        while reader_lo.get() != 250 {
2147            interval.tick().await;
2148            // allow_process is initialized to 11, bump to 11 for the next checkpoint
2149            allow_process += 1;
2150            assert!(
2151                allow_process <= 500,
2152                "Released all checkpoints but collector never observed new reader_lo"
2153            );
2154            process_below.send(allow_process).ok();
2155        }
2156
2157        // At this point, the collector has observed reader_lo = 250. Release all remaining
2158        // checkpoints. Guarantees:
2159        // - [0, 10]: committed (before reader_lo was set)
2160        // - [11, allow_process]: some committed, some skipped (timing-dependent during detection)
2161        // - (allow_process, 250): skipped (in-flight, filtered by collector)
2162        // - [250, 500]: committed (>= reader_lo)
2163        process_below.send(500).ok();
2164
2165        s_indexer.join().await.unwrap();
2166
2167        let data = store.data.get(ControllableHandler::NAME).unwrap();
2168
2169        // Checkpoints (allow_process, 250) must be skipped.
2170        for chkpt in (allow_process + 1)..250 {
2171            assert!(
2172                data.get(&chkpt).is_none(),
2173                "Checkpoint {chkpt} should have been skipped"
2174            );
2175        }
2176
2177        // Checkpoints >= reader_lo must be committed.
2178        for chkpt in 250..=500 {
2179            assert!(
2180                data.get(&chkpt).is_some(),
2181                "Checkpoint {chkpt} should have been committed (>= reader_lo)"
2182            );
2183        }
2184
2185        // Baseline: checkpoints [0, 10] were committed before reader_lo was set.
2186        for chkpt in 0..=10 {
2187            assert!(
2188                data.get(&chkpt).is_some(),
2189                "Checkpoint {chkpt} should have been committed (baseline)"
2190            );
2191        }
2192    }
2193}