sui_indexer_alt_framework/pipeline/sequential/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use serde::Deserialize;
8use serde::Serialize;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::info;
12
13use crate::config::ConcurrencyConfig;
14use crate::ingestion::ingestion_client::CheckpointEnvelope;
15use crate::metrics::IndexerMetrics;
16use crate::pipeline::CommitterConfig;
17use crate::pipeline::IngestionConfig;
18use crate::pipeline::Processor;
19use crate::pipeline::processor::processor;
20use crate::pipeline::sequential::collector::BatchedRows;
21use crate::pipeline::sequential::collector::collector;
22use crate::pipeline::sequential::committer::committer;
23use crate::store::SequentialStore;
24use crate::store::Store;
25
26mod collector;
27mod committer;
28
29/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
30/// implementing [Processor]) into rows for their table, how to combine multiple rows into a single
31/// DB operation, and then how to write those rows atomically to the database.
32///
33/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
34/// associated values).
35///
36/// Sequential handlers can only be used in sequential pipelines, where checkpoint data is
37/// processed out-of-order, but then gathered and written in order. If multiple checkpoints are
38/// available, the pipeline will attempt to combine their writes taking advantage of batching to
39/// avoid emitting redundant writes.
40///
41/// Back-pressure is handled by the bounded subscriber channel from the ingestion service, the
42/// same as concurrent pipelines: the channel blocks broadcaster sends when full, and the adaptive
43/// ingestion controller cuts fetch concurrency as the channel fills up.
44#[async_trait]
45pub trait Handler: Processor {
46    type Store: SequentialStore;
47
48    /// If at least this many rows are pending, the committer will commit them eagerly.
49    const MIN_EAGER_ROWS: usize = 50;
50
51    /// Soft cap: once this many rows are pending, the collector stops eagerly draining
52    /// its input channel and yields to the flush phase. Receive is never hard-gated — unlike
53    /// concurrent pipelines, a missing predecessor may be buried in the input channel, and
54    /// blocking receive would risk deadlock. The cap only bounds receive-to-flush latency in
55    /// the happy path.
56    const MAX_PENDING_ROWS: usize = 5000;
57
58    /// Maximum number of checkpoints to try and write in a single batch. The larger this number
59    /// is, the more chances the pipeline has to merge redundant writes, but the longer each write
60    /// transaction is likely to be.
61    const MAX_BATCH_CHECKPOINTS: usize = 5 * 60;
62
63    /// A type to combine multiple `Self::Value`-s into. This can be used to avoid redundant writes
64    /// by combining multiple rows into one (e.g. if one row supersedes another, the latter can be
65    /// omitted).
66    type Batch: Default + Send + Sync + 'static;
67
68    /// Add `values` from processing a checkpoint to the current `batch`. Checkpoints are
69    /// guaranteed to be presented to the batch in checkpoint order. The handler takes ownership
70    /// of the iterator and consumes all values.
71    ///
72    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
73    /// or `BatchStatus::Pending` if the batch can accept more values.
74    ///
75    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
76    /// may also decide to commit a batch based on the trait parameters above.
77    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
78
79    /// Take a batch of values and commit them to the database, returning the number of rows
80    /// affected.
81    async fn commit<'a>(
82        &self,
83        batch: &Self::Batch,
84        conn: &mut <Self::Store as Store>::Connection<'a>,
85    ) -> anyhow::Result<usize>;
86}
87
88/// Configuration for a sequential pipeline
89#[derive(Serialize, Deserialize, Debug, Clone, Default)]
90pub struct SequentialConfig {
91    /// Configuration for the writer, that makes forward progress.
92    pub committer: CommitterConfig,
93
94    /// Per-pipeline ingestion overrides.
95    pub ingestion: IngestionConfig,
96
97    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
98    pub fanout: Option<ConcurrencyConfig>,
99
100    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
101    pub min_eager_rows: Option<usize>,
102
103    /// Override for `Handler::MAX_PENDING_ROWS` (soft cap).
104    pub max_pending_rows: Option<usize>,
105
106    /// Override for `Handler::MAX_BATCH_CHECKPOINTS` (checkpoints per write batch).
107    pub max_batch_checkpoints: Option<usize>,
108
109    /// Size of the channel between the processor and committer.
110    pub processor_channel_size: Option<usize>,
111
112    /// Depth of the channel between the collector and committer tasks. Allows the collector
113    /// to build the next batch while the previous batch is being flushed to the DB.
114    pub pipeline_depth: Option<usize>,
115}
116
117/// Start a new sequential (in-order) indexing pipeline, served by the handler, `H`. Starting
118/// strictly after the `watermark` (or from the beginning if no watermark was provided).
119///
120/// Each pipeline consists of a processor which takes checkpoint data and breaks it down into rows,
121/// ready for insertion, and a committer which orders the rows and combines them into batches to
122/// write to the database.
123///
124/// Commits are performed in checkpoint order, potentially involving multiple checkpoints at a
125/// time. The call to [Handler::commit] and the associated watermark update are performed in a
126/// transaction to ensure atomicity. Unlike in the case of concurrent pipelines, the data passed to
127/// [Handler::commit] is not chunked up, so the handler must perform this step itself, if
128/// necessary.
129///
130/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
131/// channels are created to communicate between its various components. The pipeline will shutdown
132/// if any of its input or output channels close, any of its independent tasks fail, or if it is
133/// signalled to shutdown through the returned service handle.
134pub(crate) fn pipeline<H: Handler>(
135    handler: H,
136    next_checkpoint: u64,
137    config: SequentialConfig,
138    store: H::Store,
139    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
140    metrics: Arc<IndexerMetrics>,
141) -> Service {
142    info!(
143        pipeline = H::NAME,
144        "Starting pipeline with config: {config:#?}",
145    );
146
147    let concurrency = config
148        .fanout
149        .clone()
150        .unwrap_or(ConcurrencyConfig::Adaptive {
151            initial: 1,
152            min: 1,
153            max: num_cpus::get(),
154            dead_band: None,
155        });
156    let min_eager_rows = config.min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
157    let max_pending_rows = config.max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
158    let max_batch_checkpoints = config
159        .max_batch_checkpoints
160        .unwrap_or(H::MAX_BATCH_CHECKPOINTS);
161
162    let processor_channel_size = config.processor_channel_size.unwrap_or(num_cpus::get() / 2);
163    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
164
165    let pipeline_depth = config
166        .pipeline_depth
167        .unwrap_or_else(|| (num_cpus::get() / 2).max(4));
168    let (collector_tx, committer_rx) = mpsc::channel::<BatchedRows<H>>(pipeline_depth);
169
170    let handler = Arc::new(handler);
171
172    let s_processor = processor(
173        handler.clone(),
174        checkpoint_rx,
175        processor_tx,
176        metrics.clone(),
177        concurrency,
178        store.clone(),
179    );
180
181    let s_collector = collector::<H>(
182        handler.clone(),
183        config,
184        next_checkpoint,
185        collector_rx,
186        metrics.clone(),
187        min_eager_rows,
188        max_pending_rows,
189        max_batch_checkpoints,
190        collector_tx,
191    );
192
193    let s_committer = committer::<H>(handler, store, metrics.clone(), committer_rx);
194
195    s_processor.merge(s_collector).merge(s_committer)
196}
197
198#[cfg(test)]
199mod tests {
200    use std::time::Duration;
201
202    use prometheus::Registry;
203    use sui_types::full_checkpoint_content::Checkpoint;
204
205    use crate::mocks::store::FallibleMockConnection;
206    use crate::mocks::store::FallibleMockStore;
207    use crate::pipeline::IndexedCheckpoint;
208
209    use super::*;
210
211    // Test implementation of Handler
212    #[derive(Default)]
213    struct TestHandler;
214
215    #[async_trait]
216    impl Processor for TestHandler {
217        const NAME: &'static str = "test";
218        type Value = u64;
219
220        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
221            Ok(vec![])
222        }
223    }
224
225    #[async_trait]
226    impl Handler for TestHandler {
227        type Store = FallibleMockStore;
228        type Batch = Vec<u64>;
229        const MAX_BATCH_CHECKPOINTS: usize = 3; // Using small max value for testing.
230        const MIN_EAGER_ROWS: usize = 4; // Using small eager value for testing.
231
232        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
233            batch.extend(values);
234        }
235
236        async fn commit<'a>(
237            &self,
238            batch: &Self::Batch,
239            conn: &mut FallibleMockConnection<'a>,
240        ) -> anyhow::Result<usize> {
241            if !batch.is_empty() {
242                let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
243                sequential_data.extend(batch.iter().cloned());
244            }
245            Ok(batch.len())
246        }
247    }
248
249    struct TestSetup {
250        store: FallibleMockStore,
251        checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
252        #[allow(unused)]
253        service: Service,
254    }
255
256    /// Emulates adding a sequential pipeline to the indexer. Bypasses the processor stage and
257    /// feeds [IndexedCheckpoint]s directly to the collector. `next_checkpoint` is the starting
258    /// checkpoint for the indexer.
259    fn setup_test(
260        next_checkpoint: u64,
261        config: SequentialConfig,
262        store: FallibleMockStore,
263    ) -> TestSetup {
264        let metrics = IndexerMetrics::new(None, &Registry::default());
265
266        let min_eager_rows = config.min_eager_rows.unwrap_or(TestHandler::MIN_EAGER_ROWS);
267        let max_pending_rows = config
268            .max_pending_rows
269            .unwrap_or(TestHandler::MAX_PENDING_ROWS);
270        let max_batch_checkpoints = config
271            .max_batch_checkpoints
272            .unwrap_or(TestHandler::MAX_BATCH_CHECKPOINTS);
273        let pipeline_depth = config
274            .pipeline_depth
275            .unwrap_or_else(|| (num_cpus::get() / 2).max(4));
276
277        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
278        let (collector_tx, committer_rx) =
279            mpsc::channel::<BatchedRows<TestHandler>>(pipeline_depth);
280
281        let store_clone = store.clone();
282        let handler = Arc::new(TestHandler);
283
284        let s_collector = collector(
285            handler.clone(),
286            config,
287            next_checkpoint,
288            checkpoint_rx,
289            metrics.clone(),
290            min_eager_rows,
291            max_pending_rows,
292            max_batch_checkpoints,
293            collector_tx,
294        );
295        let s_committer = committer(handler, store_clone, metrics, committer_rx);
296
297        TestSetup {
298            store,
299            checkpoint_tx,
300            service: s_collector.merge(s_committer),
301        }
302    }
303
304    async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
305        setup
306            .checkpoint_tx
307            .send(create_checkpoint(checkpoint))
308            .await
309            .unwrap();
310    }
311
312    fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
313        IndexedCheckpoint::new(
314            checkpoint,        // epoch
315            checkpoint,        // checkpoint number
316            checkpoint,        // tx_hi
317            checkpoint * 1000, // timestamp
318            vec![checkpoint],  // values
319        )
320    }
321
322    #[tokio::test]
323    async fn test_committer_processes_sequential_checkpoints() {
324        let config = SequentialConfig::default();
325        let mut setup = setup_test(0, config, FallibleMockStore::default());
326
327        // Send checkpoints in order
328        for i in 0..3 {
329            send_checkpoint(&mut setup, i).await;
330        }
331
332        // Wait for processing
333        tokio::time::sleep(Duration::from_millis(200)).await;
334
335        // Verify data was written in order
336        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
337
338        // Verify watermark was updated
339        let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
340        assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
341        assert_eq!(watermark.tx_hi, 2);
342    }
343
344    /// Configure `FallibleMockStore` with no watermark, and emulate `first_checkpoint` by passing
345    /// the `initial_watermark` into the setup.
346    #[tokio::test]
347    async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
348        let config = SequentialConfig::default();
349        let mut setup = setup_test(5, config, FallibleMockStore::default());
350
351        // Verify watermark hasn't progressed
352        let watermark = setup.store.watermark(TestHandler::NAME);
353        assert!(watermark.is_none());
354
355        // Send checkpoints in order
356        for i in 0..5 {
357            send_checkpoint(&mut setup, i).await;
358        }
359
360        // Wait for processing
361        tokio::time::sleep(Duration::from_millis(1000)).await;
362
363        // Verify watermark hasn't progressed
364        let watermark = setup.store.watermark(TestHandler::NAME);
365        assert!(watermark.is_none());
366
367        for i in 5..8 {
368            send_checkpoint(&mut setup, i).await;
369        }
370
371        // Wait for processing
372        tokio::time::sleep(Duration::from_millis(1000)).await;
373
374        // Verify data was written in order
375        assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
376
377        // Verify watermark was updated
378        let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
379        assert_eq!(watermark.checkpoint_hi_inclusive, Some(7));
380        assert_eq!(watermark.tx_hi, 7);
381    }
382
383    #[tokio::test]
384    async fn test_committer_processes_out_of_order_checkpoints() {
385        let config = SequentialConfig::default();
386        let mut setup = setup_test(0, config, FallibleMockStore::default());
387
388        // Send checkpoints out of order
389        for i in [1, 0, 2] {
390            send_checkpoint(&mut setup, i).await;
391        }
392
393        // Wait for processing
394        tokio::time::sleep(Duration::from_millis(200)).await;
395
396        // Verify data was written in order despite receiving out of order
397        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
398
399        // Verify watermark was updated
400        let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
401        assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
402        assert_eq!(watermark.tx_hi, 2);
403    }
404
405    #[tokio::test]
406    async fn test_committer_commit_up_to_max_batch_checkpoints() {
407        let config = SequentialConfig::default();
408        let mut setup = setup_test(0, config, FallibleMockStore::default());
409
410        // Send checkpoints up to MAX_BATCH_CHECKPOINTS
411        for i in 0..4 {
412            send_checkpoint(&mut setup, i).await;
413        }
414
415        // Wait for processing
416        tokio::time::sleep(Duration::from_millis(200)).await;
417
418        // Verify data is written in order across batches
419        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
420    }
421
422    #[tokio::test]
423    async fn test_committer_commits_eagerly() {
424        let config = SequentialConfig {
425            committer: CommitterConfig {
426                collect_interval_ms: 4_000, // Long polling to test eager commit
427                ..Default::default()
428            },
429            ..Default::default()
430        };
431        let mut setup = setup_test(0, config, FallibleMockStore::default());
432
433        // Wait for initial poll to be over
434        tokio::time::sleep(Duration::from_millis(200)).await;
435
436        // Send checkpoints 0-2
437        for i in 0..3 {
438            send_checkpoint(&mut setup, i).await;
439        }
440
441        // Verify no checkpoints are written yet (not enough rows for eager commit)
442        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
443
444        // Send checkpoint 3 to trigger the eager commit (3 + 1 >= MIN_EAGER_ROWS)
445        send_checkpoint(&mut setup, 3).await;
446
447        // Wait for processing
448        tokio::time::sleep(Duration::from_millis(200)).await;
449
450        // Verify all checkpoints are written
451        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
452    }
453
454    #[tokio::test]
455    async fn test_committer_retries_on_transaction_failure() {
456        let config = SequentialConfig {
457            committer: CommitterConfig {
458                collect_interval_ms: 1_000, // Long polling to test retry logic
459                ..Default::default()
460            },
461            ..Default::default()
462        };
463
464        // Create store with transaction failure configuration
465        let store = FallibleMockStore::default().with_transaction_failures(1); // Will fail once before succeeding
466
467        let mut setup = setup_test(10, config, store);
468
469        // Send a checkpoint
470        send_checkpoint(&mut setup, 10).await;
471
472        // Wait long enough for the collector to poll-tick (collect_interval = 1s),
473        // hand the batch to the committer, and for the committer to complete one failed
474        // attempt + one successful retry under exponential backoff (100ms initial).
475        tokio::time::sleep(Duration::from_millis(1_500)).await;
476
477        assert_eq!(setup.store.get_sequential_data(), vec![10]);
478    }
479
480    /// Smoke test for pipelined operation under a slow-commit store: with
481    /// `pipeline_depth = 1` and two full batches to process, both batches must land and
482    /// the watermark must reach the last checkpoint.
483    #[tokio::test]
484    async fn pipelined_commit_runs_under_slow_commit() {
485        let config = SequentialConfig {
486            committer: CommitterConfig::default(),
487            max_batch_checkpoints: Some(3),
488            min_eager_rows: Some(1),
489            pipeline_depth: Some(1),
490            ..Default::default()
491        };
492
493        let store = FallibleMockStore::default().with_commit_delay(700);
494        let mut setup = setup_test(0, config, store);
495
496        for i in 0..6 {
497            send_checkpoint(&mut setup, i).await;
498        }
499
500        // Two batches × 700ms commit each + slack.
501        tokio::time::sleep(Duration::from_millis(2_000)).await;
502
503        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3, 4, 5]);
504        let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
505        assert_eq!(watermark.checkpoint_hi_inclusive, Some(5));
506    }
507
508    /// Watermarks must advance strictly in batch order, even under pipelining.
509    #[tokio::test]
510    async fn pipelined_commit_preserves_watermark_ordering() {
511        let config = SequentialConfig {
512            committer: CommitterConfig::default(),
513            max_batch_checkpoints: Some(2),
514            min_eager_rows: Some(1),
515            pipeline_depth: Some(2),
516            ..Default::default()
517        };
518
519        let store = FallibleMockStore::default().with_commit_delay(100);
520        let mut setup = setup_test(0, config, store);
521
522        for i in 0..6 {
523            send_checkpoint(&mut setup, i).await;
524        }
525
526        tokio::time::sleep(Duration::from_millis(1_500)).await;
527
528        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3, 4, 5]);
529        let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
530        assert_eq!(watermark.checkpoint_hi_inclusive, Some(5));
531        assert_eq!(watermark.tx_hi, 5);
532    }
533}