sui_indexer_alt_framework/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeSet, sync::Arc};
5
6use anyhow::{Context, ensure};
7use futures::future;
8use ingestion::{ClientArgs, IngestionConfig, IngestionService, client::IngestionClient};
9use metrics::IndexerMetrics;
10use pipeline::{
11    Processor,
12    concurrent::{self, ConcurrentConfig},
13    sequential::{self, Handler, SequentialConfig},
14};
15use prometheus::Registry;
16use sui_indexer_alt_framework_store_traits::{
17    CommitterWatermark, Connection, Store, TransactionalStore,
18};
19use tokio::task::JoinHandle;
20use tokio_util::sync::CancellationToken;
21use tracing::{info, warn};
22
23pub use anyhow::Result;
24pub use sui_field_count::FieldCount;
25/// External users access the store trait through framework::store
26pub use sui_indexer_alt_framework_store_traits as store;
27pub use sui_types as types;
28
29#[cfg(feature = "cluster")]
30pub mod cluster;
31pub mod ingestion;
32pub mod metrics;
33pub mod pipeline;
34#[cfg(feature = "postgres")]
35pub mod postgres;
36pub mod task;
37
38#[cfg(test)]
39pub mod mocks;
40
41/// Command-line arguments for the indexer
42#[derive(clap::Args, Default, Debug, Clone)]
43pub struct IndexerArgs {
44    /// Override for the checkpoint to start ingestion from -- useful for backfills. Otherwise, by
45    /// default, ingestion will start just after the lowest checkpoint watermark across all active
46    /// pipelines.
47    ///
48    /// For both concurrent and sequential pipelines, if a first checkpoint is configured, and a
49    /// watermark does not exist for the pipeline, the indexer will also tell the pipeline to start
50    /// from this value.
51    ///
52    /// Unless `--skip-watermark` is set, this value must be less than or equal to the global high
53    /// watermark (preventing the indexer from introducing a gap in the data). This exception only
54    /// applies to concurrent pipelines, and these pipelines will also not report watermark updates.
55    ///
56    /// Sequential pipelines will always start committing from the next checkpoint after its
57    /// watermark.
58    ///
59    /// Concurrent pipelines will always start committing from `first_checkpoint`. These pipelines
60    /// will not report watermark updates if `skip_watermark` is set.
61    #[arg(long)]
62    pub first_checkpoint: Option<u64>,
63
64    /// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
65    /// default, ingestion will not stop, and will continue to poll for new checkpoints.
66    #[arg(long)]
67    pub last_checkpoint: Option<u64>,
68
69    /// Only run the following pipelines. If not provided, all pipelines found in the
70    /// configuration file will be run.
71    #[arg(long, action = clap::ArgAction::Append)]
72    pub pipeline: Vec<String>,
73
74    /// Don't write to the watermark tables for concurrent pipelines.
75    #[arg(long)]
76    pub skip_watermark: bool,
77}
78
79pub struct Indexer<S: Store> {
80    /// The storage backend that the indexer uses to write and query indexed data. This
81    /// generic implementation allows for plugging in different storage solutions that implement the
82    /// `Store` trait.
83    store: S,
84
85    /// Prometheus Metrics.
86    metrics: Arc<IndexerMetrics>,
87
88    /// Service for downloading and disseminating checkpoint data.
89    ingestion_service: IngestionService,
90
91    /// Optional override of the checkpoint lowerbound.
92    first_checkpoint: Option<u64>,
93
94    /// Optional override of the checkpoint upperbound.
95    last_checkpoint: Option<u64>,
96
97    /// Don't write to the watermark tables for concurrent pipelines.
98    skip_watermark: bool,
99
100    /// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
101    /// run. Any pipelines that are present in this filter but not added to the indexer will yield
102    /// a warning when the indexer is run.
103    enabled_pipelines: Option<BTreeSet<String>>,
104
105    /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
106    /// with the same name isn't added twice.
107    added_pipelines: BTreeSet<&'static str>,
108
109    /// Cancellation token shared among all continuous tasks in the service.
110    cancel: CancellationToken,
111
112    /// The checkpoint lowerbound derived from watermarks of pipelines added to the indexer. When
113    /// the indexer runs, it will start from this point, unless this has been overridden by
114    /// [Self::first_checkpoint].
115    first_checkpoint_from_watermark: u64,
116
117    /// The minimum next_checkpoint across all sequential pipelines. This is used to initialize
118    /// the regulator to prevent ingestion from running too far ahead of sequential pipelines.
119    next_sequential_checkpoint: Option<u64>,
120
121    /// The handles for every task spawned by this indexer, used to manage graceful shutdown.
122    handles: Vec<JoinHandle<()>>,
123}
124
125impl<S: Store> Indexer<S> {
126    /// Create a new instance of the indexer framework from a store that implements the `Store`
127    /// trait, along with `indexer_args`, `client_args`, and `ingestion_config`. Together, these
128    /// arguments configure the following:
129    ///
130    /// - What is indexed (which checkpoints, which pipelines, whether to track and update
131    ///   watermarks) and where to serve metrics from,
132    /// - Where to download checkpoints from,
133    /// - Concurrency and buffering parameters for downloading checkpoints.
134    ///
135    /// After initialization, at least one pipeline must be added using [Self::concurrent_pipeline]
136    /// or [Self::sequential_pipeline], before the indexer is started using [Self::run].
137    pub async fn new(
138        store: S,
139        indexer_args: IndexerArgs,
140        client_args: ClientArgs,
141        ingestion_config: IngestionConfig,
142        metrics_prefix: Option<&str>,
143        registry: &Registry,
144        cancel: CancellationToken,
145    ) -> Result<Self> {
146        let IndexerArgs {
147            first_checkpoint,
148            last_checkpoint,
149            pipeline,
150            skip_watermark,
151        } = indexer_args;
152
153        let metrics = IndexerMetrics::new(metrics_prefix, registry);
154
155        let ingestion_service = IngestionService::new(
156            client_args,
157            ingestion_config,
158            metrics.clone(),
159            cancel.clone(),
160        )?;
161
162        Ok(Self {
163            store,
164            metrics,
165            ingestion_service,
166            first_checkpoint,
167            last_checkpoint,
168            skip_watermark,
169            enabled_pipelines: if pipeline.is_empty() {
170                None
171            } else {
172                Some(pipeline.into_iter().collect())
173            },
174            added_pipelines: BTreeSet::new(),
175            cancel,
176            first_checkpoint_from_watermark: u64::MAX,
177            next_sequential_checkpoint: None,
178            handles: vec![],
179        })
180    }
181
182    /// The store used by the indexer.
183    pub fn store(&self) -> &S {
184        &self.store
185    }
186
187    /// The ingestion client used by the indexer to fetch checkpoints.
188    pub fn ingestion_client(&self) -> &IngestionClient {
189        self.ingestion_service.ingestion_client()
190    }
191
192    /// The indexer's metrics.
193    pub fn metrics(&self) -> &Arc<IndexerMetrics> {
194        &self.metrics
195    }
196
197    /// The pipelines that this indexer will run.
198    pub fn pipelines(&self) -> impl Iterator<Item = &'static str> + '_ {
199        self.added_pipelines.iter().copied().filter(|p| {
200            self.enabled_pipelines
201                .as_ref()
202                .is_none_or(|e| e.contains(*p))
203        })
204    }
205
206    /// The minimum next checkpoint across all sequential pipelines. This value is used to
207    /// initialize the ingestion regulator's high watermark to prevent ingestion from running
208    /// too far ahead of sequential pipelines.
209    pub fn next_sequential_checkpoint(&self) -> Option<u64> {
210        self.next_sequential_checkpoint
211    }
212
213    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
214    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
215    ///
216    /// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
217    /// keep the watermark table up-to-date with the highest point they can guarantee all data
218    /// exists for, for their pipeline.
219    pub async fn concurrent_pipeline<H>(
220        &mut self,
221        handler: H,
222        config: ConcurrentConfig,
223    ) -> Result<()>
224    where
225        H: concurrent::Handler<Store = S> + Send + Sync + 'static,
226    {
227        let Some(watermark) = self.add_pipeline::<H>().await? else {
228            return Ok(());
229        };
230
231        // If `first_checkpoint` does not violate the consistency check, concurrent pipelines will
232        // prefer to resume from the `first_checkpoint` if configured.
233        let next_checkpoint = match (watermark, self.first_checkpoint) {
234            (Some(watermark), Some(first_checkpoint)) => {
235                // Setting `skip_watermark` allows concurrent pipelines to not be considered in the
236                // consistency check. The indexer will still fail to start if `first_checkpoint`
237                // fails for a sequential pipeline in the indexer.
238                if !self.skip_watermark {
239                    ensure!(
240                        first_checkpoint <= watermark.checkpoint_hi_inclusive + 1,
241                        "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \
242                        This could create gaps in the data.",
243                        H::NAME,
244                        first_checkpoint,
245                        watermark.checkpoint_hi_inclusive,
246                    );
247                }
248                first_checkpoint
249            }
250            (Some(watermark), _) => watermark.checkpoint_hi_inclusive + 1,
251            (_, Some(first_checkpoint)) => first_checkpoint,
252            (None, None) => 0,
253        };
254
255        self.handles.push(concurrent::pipeline::<H>(
256            handler,
257            next_checkpoint,
258            config,
259            self.skip_watermark,
260            self.store.clone(),
261            self.ingestion_service.subscribe().0,
262            self.metrics.clone(),
263            self.cancel.clone(),
264        ));
265
266        Ok(())
267    }
268
269    /// Start ingesting checkpoints. Ingestion either starts from the
270    /// `first_checkpoint_from_watermark` calculated based on the smallest watermark of all active
271    /// pipelines or `first_checkpoint` if configured. Individual pipelines will start processing
272    /// and committing once the ingestion service has caught up to their respective watermarks.
273    ///
274    /// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided, or
275    /// will continue until it tracks the tip of the network.
276    pub async fn run(mut self) -> Result<JoinHandle<()>> {
277        if let Some(enabled_pipelines) = self.enabled_pipelines {
278            ensure!(
279                enabled_pipelines.is_empty(),
280                "Tried to enable pipelines that this indexer does not know about: \
281                {enabled_pipelines:#?}",
282            );
283        }
284
285        // If an override has been provided, start ingestion from there, otherwise start ingestion
286        // from just after the lowest committer watermark across all enabled pipelines.
287        let first_checkpoint = self
288            .first_checkpoint
289            .unwrap_or(self.first_checkpoint_from_watermark);
290
291        let last_checkpoint = self.last_checkpoint.unwrap_or(u64::MAX);
292
293        info!(first_checkpoint, last_checkpoint = ?self.last_checkpoint, "Ingestion range");
294
295        let broadcaster_handle = self
296            .ingestion_service
297            .run(
298                first_checkpoint..=last_checkpoint,
299                self.next_sequential_checkpoint,
300            )
301            .await
302            .context("Failed to start ingestion service")?;
303
304        self.handles.push(broadcaster_handle);
305
306        Ok(tokio::spawn(async move {
307            // Wait for the ingestion service and all its related tasks to wind down gracefully:
308            // If ingestion has been configured to only handle a specific range of checkpoints, we
309            // want to make sure that tasks are allowed to run to completion before shutting them
310            // down.
311            future::join_all(self.handles).await;
312            info!("Indexing pipeline gracefully shut down");
313        }))
314    }
315
316    /// Update the indexer's starting ingestion checkpoint based on the watermark for the pipeline
317    /// by adding for handler `H` (as long as it's enabled). Returns `Ok(None)` if the pipeline is
318    /// disabled, `Ok(Some(None))` if the pipeline is enabled but its watermark is not found, and
319    /// `Ok(Some(Some(watermark)))` if the pipeline is enabled and the watermark is found.
320    async fn add_pipeline<P: Processor + 'static>(
321        &mut self,
322    ) -> Result<Option<Option<CommitterWatermark>>> {
323        ensure!(
324            self.added_pipelines.insert(P::NAME),
325            "Pipeline {:?} already added",
326            P::NAME,
327        );
328
329        if let Some(enabled_pipelines) = &mut self.enabled_pipelines
330            && !enabled_pipelines.remove(P::NAME)
331        {
332            info!(pipeline = P::NAME, "Skipping");
333            return Ok(None);
334        }
335
336        let mut conn = self
337            .store
338            .connect()
339            .await
340            .context("Failed to establish connection to store")?;
341
342        let watermark = conn
343            .committer_watermark(P::NAME)
344            .await
345            .with_context(|| format!("Failed to get watermark for {}", P::NAME))?;
346
347        let expected_first_checkpoint = watermark
348            .as_ref()
349            .map(|w| w.checkpoint_hi_inclusive + 1)
350            .unwrap_or_default();
351
352        self.first_checkpoint_from_watermark =
353            expected_first_checkpoint.min(self.first_checkpoint_from_watermark);
354
355        Ok(Some(watermark))
356    }
357}
358
359impl<T: TransactionalStore> Indexer<T> {
360    /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started,
361    /// they will be idle until the ingestion service starts, and serves it checkpoint data.
362    ///
363    /// Sequential pipelines commit checkpoint data in-order which sacrifices throughput, but may
364    /// be required to handle pipelines that modify data in-place (where each update is not an
365    /// insert, but could be a modification of an existing row, where ordering between updates is
366    /// important).
367    ///
368    /// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
369    /// number of checkpoints (configured by `checkpoint_lag`).
370    pub async fn sequential_pipeline<H>(
371        &mut self,
372        handler: H,
373        config: SequentialConfig,
374    ) -> Result<()>
375    where
376        H: Handler<Store = T> + Send + Sync + 'static,
377    {
378        let Some(watermark) = self.add_pipeline::<H>().await? else {
379            return Ok(());
380        };
381
382        if self.skip_watermark {
383            warn!(
384                pipeline = H::NAME,
385                "--skip-watermarks enabled and ignored for sequential pipeline"
386            );
387        }
388
389        // If `first_checkpoint` does not violate the consistency check, sequential pipelines will
390        // prefer to resume from the existing watermark unless no watermark exists.
391        let next_checkpoint = match (watermark, self.first_checkpoint) {
392            (Some(watermark), Some(first_checkpoint)) => {
393                // Sequential pipelines must write data in the order of checkpoints. If there is a
394                // gap, this violates the property.
395                ensure!(
396                    first_checkpoint <= watermark.checkpoint_hi_inclusive + 1,
397                    "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. \
398                     This could create gaps in the data.",
399                    H::NAME,
400                    first_checkpoint,
401                    watermark.checkpoint_hi_inclusive,
402                );
403                // Otherwise, sequential pipelines will wait until the processed checkpoint next
404                // after its current watermark.
405                warn!(
406                    pipeline = H::NAME,
407                    first_checkpoint,
408                    committer_hi = watermark.checkpoint_hi_inclusive,
409                    "Ignoring --first-checkpoint and will resume from committer_hi",
410                );
411                watermark.checkpoint_hi_inclusive + 1
412            }
413            // If a watermark exists, the pipeline will wait for the processed checkpoint next after
414            // its watermark.
415            (Some(watermark), _) => watermark.checkpoint_hi_inclusive + 1,
416            // If no watermark exists, the first checkpoint can be anything.
417            (_, Some(first_checkpoint)) => first_checkpoint,
418            (None, None) => 0,
419        };
420
421        // Track the minimum next_checkpoint across all sequential pipelines
422        self.next_sequential_checkpoint = Some(
423            self.next_sequential_checkpoint
424                .map_or(next_checkpoint, |n| n.min(next_checkpoint)),
425        );
426
427        let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();
428
429        self.handles.push(sequential::pipeline::<H>(
430            handler,
431            next_checkpoint,
432            config,
433            self.store.clone(),
434            checkpoint_rx,
435            watermark_tx,
436            self.metrics.clone(),
437            self.cancel.clone(),
438        ));
439
440        Ok(())
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use crate::FieldCount;
448    use crate::mocks::store::MockStore;
449    use crate::pipeline::{Processor, concurrent::ConcurrentConfig};
450    use crate::store::CommitterWatermark;
451    use async_trait::async_trait;
452    use std::sync::Arc;
453    use sui_synthetic_ingestion::synthetic_ingestion;
454    use tokio_util::sync::CancellationToken;
455
456    #[async_trait]
457    impl Processor for MockHandler {
458        const NAME: &'static str = "test_processor";
459        type Value = MockValue;
460        async fn process(
461            &self,
462            _checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
463        ) -> anyhow::Result<Vec<Self::Value>> {
464            Ok(vec![MockValue(1)])
465        }
466    }
467
468    #[allow(dead_code)]
469    #[derive(Clone, FieldCount)]
470    struct MockValue(u64);
471
472    struct MockHandler;
473
474    #[async_trait]
475    impl crate::pipeline::concurrent::Handler for MockHandler {
476        type Store = MockStore;
477        type Batch = Vec<MockValue>;
478
479        fn batch(
480            &self,
481            batch: &mut Self::Batch,
482            values: &mut std::vec::IntoIter<Self::Value>,
483        ) -> crate::pipeline::concurrent::BatchStatus {
484            batch.extend(values);
485            crate::pipeline::concurrent::BatchStatus::Pending
486        }
487
488        async fn commit<'a>(
489            &self,
490            _batch: &Self::Batch,
491            _conn: &mut <Self::Store as Store>::Connection<'a>,
492        ) -> anyhow::Result<usize> {
493            Ok(1)
494        }
495    }
496
497    #[async_trait]
498    impl crate::pipeline::sequential::Handler for MockHandler {
499        type Store = MockStore;
500        type Batch = Vec<Self::Value>;
501
502        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
503            batch.extend(values);
504        }
505
506        async fn commit<'a>(
507            &self,
508            _batch: &Self::Batch,
509            _conn: &mut <Self::Store as Store>::Connection<'a>,
510        ) -> anyhow::Result<usize> {
511            Ok(1)
512        }
513    }
514
515    // One more test handler for testing multiple sequential pipelines
516    struct SequentialHandler;
517
518    #[async_trait]
519    impl Processor for SequentialHandler {
520        const NAME: &'static str = "sequential_handler";
521        type Value = MockValue;
522        async fn process(
523            &self,
524            _checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
525        ) -> anyhow::Result<Vec<Self::Value>> {
526            Ok(vec![MockValue(1)])
527        }
528    }
529
530    #[async_trait]
531    impl crate::pipeline::sequential::Handler for SequentialHandler {
532        type Store = MockStore;
533        type Batch = Vec<MockValue>;
534
535        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
536            batch.extend(values);
537        }
538
539        async fn commit<'a>(
540            &self,
541            _batch: &Self::Batch,
542            _conn: &mut <Self::Store as Store>::Connection<'a>,
543        ) -> anyhow::Result<usize> {
544            Ok(1)
545        }
546    }
547
548    #[tokio::test]
549    async fn test_first_checkpoint_from_watermark() {
550        let cancel = CancellationToken::new();
551        let registry = Registry::new();
552
553        let store = MockStore::default();
554        let mut conn = store.connect().await.unwrap();
555        conn.set_committer_watermark(
556            "test_processor",
557            CommitterWatermark {
558                epoch_hi_inclusive: 1,
559                checkpoint_hi_inclusive: 100,
560                tx_hi: 1000,
561                timestamp_ms_hi_inclusive: 1000000,
562            },
563        )
564        .await
565        .unwrap();
566
567        let indexer_args = IndexerArgs {
568            first_checkpoint: Some(50),
569            last_checkpoint: None,
570            pipeline: vec![],
571            skip_watermark: false,
572        };
573        let temp_dir = tempfile::tempdir().unwrap();
574        let client_args = ClientArgs {
575            local_ingestion_path: Some(temp_dir.path().to_owned()),
576            ..Default::default()
577        };
578
579        let ingestion_config = IngestionConfig::default();
580
581        let mut indexer = Indexer::new(
582            store,
583            indexer_args,
584            client_args,
585            ingestion_config,
586            None,
587            &registry,
588            cancel,
589        )
590        .await
591        .unwrap();
592
593        indexer
594            .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
595            .await
596            .unwrap();
597
598        assert_eq!(indexer.first_checkpoint_from_watermark, 101);
599    }
600
601    #[tokio::test]
602    async fn test_indexer_concurrent_pipeline_disallow_inconsistent_first_checkpoint() {
603        let cancel = CancellationToken::new();
604        let registry = Registry::new();
605
606        let store = MockStore::default();
607        let mut conn = store.connect().await.unwrap();
608        conn.set_committer_watermark(
609            "test_processor",
610            CommitterWatermark {
611                epoch_hi_inclusive: 1,
612                checkpoint_hi_inclusive: 100,
613                tx_hi: 1000,
614                timestamp_ms_hi_inclusive: 1000000,
615            },
616        )
617        .await
618        .unwrap();
619
620        let indexer_args = IndexerArgs {
621            first_checkpoint: Some(1001),
622            last_checkpoint: None,
623            pipeline: vec![],
624            skip_watermark: false,
625        };
626        let temp_dir = tempfile::tempdir().unwrap();
627        let client_args = ClientArgs {
628            local_ingestion_path: Some(temp_dir.path().to_owned()),
629            ..Default::default()
630        };
631
632        let ingestion_config = IngestionConfig::default();
633
634        let mut indexer = Indexer::new(
635            store,
636            indexer_args,
637            client_args,
638            ingestion_config,
639            None,
640            &registry,
641            cancel,
642        )
643        .await
644        .unwrap();
645
646        let result = indexer
647            .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
648            .await;
649
650        assert!(result.is_err());
651    }
652
653    #[tokio::test]
654    async fn test_indexer_concurrent_pipeline_allow_inconsistent_first_checkpoint_with_skip_watermark()
655     {
656        let cancel = CancellationToken::new();
657        let registry = Registry::new();
658
659        let store = MockStore::default();
660        let mut conn = store.connect().await.unwrap();
661        conn.set_committer_watermark(
662            "test_processor",
663            CommitterWatermark {
664                epoch_hi_inclusive: 1,
665                checkpoint_hi_inclusive: 100,
666                tx_hi: 1000,
667                timestamp_ms_hi_inclusive: 1000000,
668            },
669        )
670        .await
671        .unwrap();
672
673        let indexer_args = IndexerArgs {
674            first_checkpoint: Some(1001),
675            last_checkpoint: None,
676            pipeline: vec![],
677            skip_watermark: true,
678        };
679        let temp_dir = tempfile::tempdir().unwrap();
680        let client_args = ClientArgs {
681            local_ingestion_path: Some(temp_dir.path().to_owned()),
682            ..Default::default()
683        };
684
685        let ingestion_config = IngestionConfig::default();
686
687        let mut indexer = Indexer::new(
688            store,
689            indexer_args,
690            client_args,
691            ingestion_config,
692            None,
693            &registry,
694            cancel,
695        )
696        .await
697        .unwrap();
698
699        let result = indexer
700            .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
701            .await;
702
703        assert!(result.is_ok());
704    }
705
706    #[tokio::test]
707    async fn test_indexer_sequential_pipeline_disallow_inconsistent_first_checkpoint() {
708        let cancel = CancellationToken::new();
709        let registry = Registry::new();
710
711        let store = MockStore::default();
712        let mut conn = store.connect().await.unwrap();
713        conn.set_committer_watermark(
714            "test_processor",
715            CommitterWatermark {
716                epoch_hi_inclusive: 1,
717                checkpoint_hi_inclusive: 100,
718                tx_hi: 1000,
719                timestamp_ms_hi_inclusive: 1000000,
720            },
721        )
722        .await
723        .unwrap();
724
725        let indexer_args = IndexerArgs {
726            first_checkpoint: Some(1001),
727            last_checkpoint: None,
728            pipeline: vec![],
729            skip_watermark: false,
730        };
731        let temp_dir = tempfile::tempdir().unwrap();
732        let client_args = ClientArgs {
733            local_ingestion_path: Some(temp_dir.path().to_owned()),
734            ..Default::default()
735        };
736
737        let ingestion_config = IngestionConfig::default();
738
739        let mut indexer = Indexer::new(
740            store,
741            indexer_args,
742            client_args,
743            ingestion_config,
744            None,
745            &registry,
746            cancel,
747        )
748        .await
749        .unwrap();
750
751        let result = indexer
752            .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
753            .await;
754
755        assert!(result.is_err());
756    }
757
758    #[tokio::test]
759    async fn test_indexer_sequential_pipeline_disallow_inconsistent_first_checkpoint_with_skip_watermark()
760     {
761        let cancel = CancellationToken::new();
762        let registry = Registry::new();
763
764        let store = MockStore::default();
765        let mut conn = store.connect().await.unwrap();
766        conn.set_committer_watermark(
767            "test_processor",
768            CommitterWatermark {
769                epoch_hi_inclusive: 1,
770                checkpoint_hi_inclusive: 100,
771                tx_hi: 1000,
772                timestamp_ms_hi_inclusive: 1000000,
773            },
774        )
775        .await
776        .unwrap();
777
778        let indexer_args = IndexerArgs {
779            first_checkpoint: Some(1001),
780            last_checkpoint: None,
781            pipeline: vec![],
782            skip_watermark: true,
783        };
784        let temp_dir = tempfile::tempdir().unwrap();
785        let client_args = ClientArgs {
786            local_ingestion_path: Some(temp_dir.path().to_owned()),
787            ..Default::default()
788        };
789
790        let ingestion_config = IngestionConfig::default();
791
792        let mut indexer = Indexer::new(
793            store,
794            indexer_args,
795            client_args,
796            ingestion_config,
797            None,
798            &registry,
799            cancel,
800        )
801        .await
802        .unwrap();
803
804        let result = indexer
805            .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
806            .await;
807
808        assert!(result.is_err());
809    }
810
811    #[tokio::test]
812    async fn test_indexer_sequential_pipeline_always_resume_from_watermark() {
813        let cancel = CancellationToken::new();
814        let registry = Registry::new();
815        let store = MockStore::default();
816        let pipeline_checkpoint_hi = 10;
817        let indexer_first_checkpoint = 5;
818        let num_ingested_checkpoints = 10;
819
820        let mut conn = store.connect().await.unwrap();
821        conn.set_committer_watermark(
822            "test_processor",
823            CommitterWatermark {
824                epoch_hi_inclusive: 1,
825                checkpoint_hi_inclusive: pipeline_checkpoint_hi,
826                tx_hi: 1000,
827                timestamp_ms_hi_inclusive: 1000000,
828            },
829        )
830        .await
831        .unwrap();
832
833        let indexer_args = IndexerArgs {
834            first_checkpoint: Some(indexer_first_checkpoint),
835            last_checkpoint: Some(indexer_first_checkpoint + num_ingested_checkpoints - 1),
836            pipeline: vec![],
837            skip_watermark: true,
838        };
839        let temp_dir = tempfile::tempdir().unwrap();
840        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
841            ingestion_dir: temp_dir.path().to_owned(),
842            starting_checkpoint: indexer_first_checkpoint,
843            num_checkpoints: num_ingested_checkpoints,
844            checkpoint_size: 2,
845        })
846        .await;
847
848        let client_args = ClientArgs {
849            local_ingestion_path: Some(temp_dir.path().to_owned()),
850            ..Default::default()
851        };
852
853        let ingestion_config = IngestionConfig::default();
854
855        let mut indexer = Indexer::new(
856            store,
857            indexer_args,
858            client_args,
859            ingestion_config,
860            None,
861            &registry,
862            cancel,
863        )
864        .await
865        .unwrap();
866
867        let _ = indexer
868            .sequential_pipeline::<MockHandler>(MockHandler, SequentialConfig::default())
869            .await;
870
871        let metrics = indexer.metrics().clone();
872
873        indexer.run().await.unwrap().await.unwrap();
874
875        assert_eq!(
876            metrics.total_ingested_checkpoints.get(),
877            num_ingested_checkpoints
878        );
879        assert_eq!(
880            metrics
881                .total_watermarks_out_of_order
882                .get_metric_with_label_values(&["test_processor"])
883                .unwrap()
884                .get(),
885            pipeline_checkpoint_hi - indexer_first_checkpoint + 1
886        );
887    }
888
889    #[tokio::test]
890    async fn test_indexer_concurrent_pipeline_always_resume_from_first_checkpoint() {
891        let cancel = CancellationToken::new();
892        let registry = Registry::new();
893        let store = MockStore::default();
894        let pipeline_checkpoint_hi = 10;
895        let indexer_first_checkpoint = 5;
896        let num_ingested_checkpoints = 10;
897
898        let mut conn = store.connect().await.unwrap();
899        conn.set_committer_watermark(
900            "test_processor",
901            CommitterWatermark {
902                epoch_hi_inclusive: 1,
903                checkpoint_hi_inclusive: pipeline_checkpoint_hi,
904                tx_hi: 1000,
905                timestamp_ms_hi_inclusive: 1000000,
906            },
907        )
908        .await
909        .unwrap();
910
911        let indexer_args = IndexerArgs {
912            first_checkpoint: Some(indexer_first_checkpoint),
913            last_checkpoint: Some(indexer_first_checkpoint + num_ingested_checkpoints - 1),
914            pipeline: vec![],
915            skip_watermark: true,
916        };
917        let temp_dir = tempfile::tempdir().unwrap();
918        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
919            ingestion_dir: temp_dir.path().to_owned(),
920            starting_checkpoint: indexer_first_checkpoint,
921            num_checkpoints: num_ingested_checkpoints,
922            checkpoint_size: 2,
923        })
924        .await;
925
926        let client_args = ClientArgs {
927            local_ingestion_path: Some(temp_dir.path().to_owned()),
928            ..Default::default()
929        };
930
931        let ingestion_config = IngestionConfig::default();
932
933        let mut indexer = Indexer::new(
934            store,
935            indexer_args,
936            client_args,
937            ingestion_config,
938            None,
939            &registry,
940            cancel,
941        )
942        .await
943        .unwrap();
944
945        let _ = indexer
946            .concurrent_pipeline::<MockHandler>(MockHandler, ConcurrentConfig::default())
947            .await;
948
949        let metrics = indexer.metrics().clone();
950
951        indexer.run().await.unwrap().await.unwrap();
952
953        assert_eq!(
954            metrics.total_ingested_checkpoints.get(),
955            num_ingested_checkpoints
956        );
957        assert_eq!(
958            metrics
959                .total_watermarks_out_of_order
960                .get_metric_with_label_values(&["test_processor"])
961                .unwrap()
962                .get(),
963            0
964        );
965    }
966
967    #[tokio::test]
968    async fn test_multiple_sequential_pipelines_next_checkpoint() {
969        let cancel = CancellationToken::new();
970        let registry = Registry::new();
971        let store = MockStore::default();
972
973        // Set up different watermarks for three different sequential pipelines
974        let mut conn = store.connect().await.unwrap();
975
976        // First handler at checkpoint 10
977        conn.set_committer_watermark(
978            MockHandler::NAME,
979            CommitterWatermark {
980                epoch_hi_inclusive: 0,
981                checkpoint_hi_inclusive: 10,
982                tx_hi: 20,
983                timestamp_ms_hi_inclusive: 10000,
984            },
985        )
986        .await
987        .unwrap();
988
989        // SequentialHandler at checkpoint 5
990        conn.set_committer_watermark(
991            SequentialHandler::NAME,
992            CommitterWatermark {
993                epoch_hi_inclusive: 0,
994                checkpoint_hi_inclusive: 5,
995                tx_hi: 10,
996                timestamp_ms_hi_inclusive: 5000,
997            },
998        )
999        .await
1000        .unwrap();
1001
1002        // Create synthetic ingestion data
1003        let temp_dir = tempfile::tempdir().unwrap();
1004        synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
1005            ingestion_dir: temp_dir.path().to_owned(),
1006            starting_checkpoint: 0,
1007            num_checkpoints: 20,
1008            checkpoint_size: 2,
1009        })
1010        .await;
1011
1012        let indexer_args = IndexerArgs {
1013            first_checkpoint: None,
1014            last_checkpoint: Some(19),
1015            pipeline: vec![],
1016            skip_watermark: false,
1017        };
1018
1019        let client_args = ClientArgs {
1020            local_ingestion_path: Some(temp_dir.path().to_owned()),
1021            ..Default::default()
1022        };
1023
1024        let ingestion_config = IngestionConfig::default();
1025
1026        let mut indexer = Indexer::new(
1027            store.clone(),
1028            indexer_args,
1029            client_args,
1030            ingestion_config,
1031            None,
1032            &registry,
1033            cancel.clone(),
1034        )
1035        .await
1036        .unwrap();
1037
1038        // Add first sequential pipeline
1039        indexer
1040            .sequential_pipeline(
1041                MockHandler,
1042                pipeline::sequential::SequentialConfig::default(),
1043            )
1044            .await
1045            .unwrap();
1046
1047        // Verify next_sequential_checkpoint is set correctly (10 + 1 = 11)
1048        assert_eq!(
1049            indexer.next_sequential_checkpoint(),
1050            Some(11),
1051            "next_sequential_checkpoint should be 11"
1052        );
1053
1054        // Add second sequential pipeline
1055        indexer
1056            .sequential_pipeline(
1057                SequentialHandler,
1058                pipeline::sequential::SequentialConfig::default(),
1059            )
1060            .await
1061            .unwrap();
1062
1063        // Should change to 6 (minimum of 6 and 11)
1064        assert_eq!(
1065            indexer.next_sequential_checkpoint(),
1066            Some(6),
1067            "next_sequential_checkpoint should still be 6"
1068        );
1069
1070        // Run indexer to verify it can make progress past the initial hi and finish ingesting.
1071        indexer.run().await.unwrap().await.unwrap();
1072
1073        // Verify each pipeline made some progress independently
1074        let watermark1 = conn.committer_watermark(MockHandler::NAME).await.unwrap();
1075        let watermark2 = conn
1076            .committer_watermark(SequentialHandler::NAME)
1077            .await
1078            .unwrap();
1079
1080        assert_eq!(watermark1.unwrap().checkpoint_hi_inclusive, 19);
1081        assert_eq!(watermark2.unwrap().checkpoint_hi_inclusive, 19);
1082    }
1083}