sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use sui_futures::service::Service;
9use tokio::sync::{SetOnce, mpsc};
10use tracing::info;
11
12use crate::{
13    Task, metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint,
14};
15
16use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
17
18use self::{
19    collector::collector, commit_watermark::commit_watermark, committer::committer,
20    main_reader_lo::track_main_reader_lo, pruner::pruner, reader_watermark::reader_watermark,
21};
22
23mod collector;
24mod commit_watermark;
25mod committer;
26mod main_reader_lo;
27mod pruner;
28mod reader_watermark;
29
30/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum BatchStatus {
33    /// The batch can accept more values.
34    Pending,
35    /// The batch is full and should be committed.
36    Ready,
37}
38
39/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
40/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
41///
42/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
43/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
44/// usage, but each handle may choose to override these defaults, e.g.
45///
46/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
47///   sizes).
48/// - Handlers that do more work during processing may wish to increase their fanout so more of it
49///   can be done concurrently, to preserve throughput.
50///
51/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
52/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
53/// checkpoint below which all data has been committed.
54///
55/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
56/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
57/// back to the ingestion service.
58#[async_trait]
59pub trait Handler: Processor {
60    type Store: Store;
61    type Batch: Default + Send + Sync + 'static;
62
63    /// If at least this many rows are pending, the committer will commit them eagerly.
64    const MIN_EAGER_ROWS: usize = 50;
65
66    /// If there are more than this many rows pending, the committer applies backpressure.
67    const MAX_PENDING_ROWS: usize = 5000;
68
69    /// The maximum number of watermarks that can show up in a single batch.
70    /// This limit exists to deal with pipelines that produce no data for a majority of
71    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
72    const MAX_WATERMARK_UPDATES: usize = 10_000;
73
74    /// Add values from the iterator to the batch. The implementation may take all, some, or none
75    /// of the values from the iterator by calling `.next()`.
76    ///
77    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
78    /// or `BatchStatus::Pending` if the batch can accept more values.
79    ///
80    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
81    /// may also decide to commit a batch based on the trait parameters above.
82    fn batch(
83        &self,
84        batch: &mut Self::Batch,
85        values: &mut std::vec::IntoIter<Self::Value>,
86    ) -> BatchStatus;
87
88    /// Commit the batch to the database, returning the number of rows affected.
89    async fn commit<'a>(
90        &self,
91        batch: &Self::Batch,
92        conn: &mut <Self::Store as Store>::Connection<'a>,
93    ) -> anyhow::Result<usize>;
94
95    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
96    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
97    async fn prune<'a>(
98        &self,
99        _from: u64,
100        _to_exclusive: u64,
101        _conn: &mut <Self::Store as Store>::Connection<'a>,
102    ) -> anyhow::Result<usize> {
103        Ok(0)
104    }
105}
106
107/// Configuration for a concurrent pipeline
108#[derive(Serialize, Deserialize, Debug, Clone, Default)]
109pub struct ConcurrentConfig {
110    /// Configuration for the writer, that makes forward progress.
111    pub committer: CommitterConfig,
112
113    /// Configuration for the pruner, that deletes old data.
114    pub pruner: Option<PrunerConfig>,
115}
116
117#[derive(Serialize, Deserialize, Debug, Clone)]
118pub struct PrunerConfig {
119    /// How often the pruner should check whether there is any data to prune, in milliseconds.
120    pub interval_ms: u64,
121
122    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
123    /// this new watermark, in milliseconds.
124    pub delay_ms: u64,
125
126    /// How much data to keep, this is measured in checkpoints.
127    pub retention: u64,
128
129    /// The maximum range to try and prune in one request, measured in checkpoints.
130    pub max_chunk_size: u64,
131
132    /// The max number of tasks to run in parallel for pruning.
133    pub prune_concurrency: u64,
134}
135
136/// Values ready to be written to the database. This is an internal type used to communicate
137/// between the collector and the committer parts of the pipeline.
138///
139/// Values inside each batch may or may not be from the same checkpoint. Values in the same
140/// checkpoint can also be split across multiple batches.
141struct BatchedRows<H: Handler> {
142    /// The batch to write
143    batch: H::Batch,
144    /// Number of rows in the batch
145    batch_len: usize,
146    /// Proportions of all the watermarks that are represented in this chunk
147    watermark: Vec<WatermarkPart>,
148}
149
150impl<H, V> BatchedRows<H>
151where
152    H: Handler<Batch = Vec<V>, Value = V>,
153{
154    #[cfg(test)]
155    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
156        let batch_len = batch.len();
157        Self {
158            batch,
159            batch_len,
160            watermark,
161        }
162    }
163}
164
165impl PrunerConfig {
166    pub fn interval(&self) -> Duration {
167        Duration::from_millis(self.interval_ms)
168    }
169
170    pub fn delay(&self) -> Duration {
171        Duration::from_millis(self.delay_ms)
172    }
173}
174
175impl Default for PrunerConfig {
176    fn default() -> Self {
177        Self {
178            interval_ms: 300_000,
179            delay_ms: 120_000,
180            retention: 4_000_000,
181            max_chunk_size: 2_000,
182            prune_concurrency: 1,
183        }
184    }
185}
186
187/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
188/// strictly after the `watermark` (or from the beginning if no watermark was provided).
189///
190/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
191/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
192/// the database, a committer which writes the rows out concurrently, and a watermark task to
193/// update the high watermark.
194///
195/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
196/// either because it received the checkpoints out-of-order or because of variance in processing
197/// time.
198///
199/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
200/// watermark below which all data has been committed (modulo pruning).
201///
202/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
203/// channels are created to communicate between its various components. The pipeline will shutdown
204/// if any of its input or output channels close, any of its independent tasks fail, or if it is
205/// signalled to shutdown through the returned service handle.
206pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
207    handler: H,
208    next_checkpoint: u64,
209    config: ConcurrentConfig,
210    store: H::Store,
211    task: Option<Task>,
212    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
213    metrics: Arc<IndexerMetrics>,
214) -> Service {
215    info!(
216        pipeline = H::NAME,
217        "Starting pipeline with config: {config:#?}",
218    );
219
220    let ConcurrentConfig {
221        committer: committer_config,
222        pruner: pruner_config,
223    } = config;
224
225    let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
226    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
227    let (collector_tx, committer_rx) =
228        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
229    //docs::/#buff
230    let (committer_tx, watermark_rx) =
231        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
232    let main_reader_lo = Arc::new(SetOnce::new());
233
234    let handler = Arc::new(handler);
235
236    let s_processor = processor(
237        handler.clone(),
238        checkpoint_rx,
239        processor_tx,
240        metrics.clone(),
241    );
242
243    let s_collector = collector::<H>(
244        handler.clone(),
245        committer_config.clone(),
246        collector_rx,
247        collector_tx,
248        main_reader_lo.clone(),
249        metrics.clone(),
250    );
251
252    let s_committer = committer::<H>(
253        handler.clone(),
254        committer_config.clone(),
255        committer_rx,
256        committer_tx,
257        store.clone(),
258        metrics.clone(),
259    );
260
261    let s_commit_watermark = commit_watermark::<H>(
262        next_checkpoint,
263        committer_config,
264        watermark_rx,
265        store.clone(),
266        task.as_ref().map(|t| t.task.clone()),
267        metrics.clone(),
268    );
269
270    let s_track_reader_lo = track_main_reader_lo::<H>(
271        main_reader_lo.clone(),
272        task.as_ref().map(|t| t.reader_interval),
273        store.clone(),
274    );
275
276    let s_reader_watermark =
277        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
278
279    let s_pruner = pruner(handler, pruner_config, store, metrics);
280
281    s_processor
282        .merge(s_collector)
283        .merge(s_committer)
284        .merge(s_commit_watermark)
285        .attach(s_track_reader_lo)
286        .attach(s_reader_watermark)
287        .attach(s_pruner)
288}
289
290#[cfg(test)]
291mod tests {
292    use std::{sync::Arc, time::Duration};
293
294    use prometheus::Registry;
295    use tokio::{sync::mpsc, time::timeout};
296
297    use crate::{
298        FieldCount,
299        metrics::IndexerMetrics,
300        mocks::store::{MockConnection, MockStore},
301        pipeline::Processor,
302        types::{
303            full_checkpoint_content::Checkpoint,
304            test_checkpoint_data_builder::TestCheckpointBuilder,
305        },
306    };
307
308    use super::*;
309
310    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
311    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
312
313    #[derive(Clone, Debug, FieldCount)]
314    struct TestValue {
315        checkpoint: u64,
316        data: u64,
317    }
318
319    struct DataPipeline;
320
321    #[async_trait]
322    impl Processor for DataPipeline {
323        const NAME: &'static str = "test_handler";
324        const FANOUT: usize = 2;
325        type Value = TestValue;
326
327        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
328            let cp_num = checkpoint.summary.sequence_number;
329
330            // Every checkpoint will come with 2 processed values
331            Ok(vec![
332                TestValue {
333                    checkpoint: cp_num,
334                    data: cp_num * 10 + 1,
335                },
336                TestValue {
337                    checkpoint: cp_num,
338                    data: cp_num * 10 + 2,
339                },
340            ])
341        }
342    }
343
344    #[async_trait]
345    impl Handler for DataPipeline {
346        type Store = MockStore;
347        type Batch = Vec<TestValue>;
348
349        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
350        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
351        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
352
353        fn batch(
354            &self,
355            batch: &mut Self::Batch,
356            values: &mut std::vec::IntoIter<Self::Value>,
357        ) -> BatchStatus {
358            // Take all values
359            batch.extend(values);
360            BatchStatus::Pending
361        }
362
363        async fn commit<'a>(
364            &self,
365            batch: &Self::Batch,
366            conn: &mut MockConnection<'a>,
367        ) -> anyhow::Result<usize> {
368            // Group values by checkpoint
369            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
370                std::collections::HashMap::new();
371            for value in batch {
372                grouped
373                    .entry(value.checkpoint)
374                    .or_default()
375                    .push(value.data);
376            }
377
378            // Commit all data at once
379            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
380        }
381
382        async fn prune<'a>(
383            &self,
384            from: u64,
385            to_exclusive: u64,
386            conn: &mut MockConnection<'a>,
387        ) -> anyhow::Result<usize> {
388            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
389        }
390    }
391
392    struct TestSetup {
393        store: MockStore,
394        checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
395        #[allow(unused)]
396        pipeline: Service,
397    }
398
399    impl TestSetup {
400        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
401            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
402            let metrics = IndexerMetrics::new(None, &Registry::default());
403
404            let pipeline = pipeline(
405                DataPipeline,
406                next_checkpoint,
407                config,
408                store.clone(),
409                None,
410                checkpoint_rx,
411                metrics,
412            );
413
414            Self {
415                store,
416                checkpoint_tx,
417                pipeline,
418            }
419        }
420
421        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
422            let checkpoint = Arc::new(
423                TestCheckpointBuilder::new(checkpoint)
424                    .with_epoch(1)
425                    .with_network_total_transactions(checkpoint * 2)
426                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
427                    .build_checkpoint(),
428            );
429            self.checkpoint_tx.send(checkpoint).await?;
430            Ok(())
431        }
432
433        async fn send_checkpoint_with_timeout(
434            &self,
435            checkpoint: u64,
436            timeout_duration: Duration,
437        ) -> anyhow::Result<()> {
438            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
439        }
440
441        async fn send_checkpoint_expect_timeout(
442            &self,
443            checkpoint: u64,
444            timeout_duration: Duration,
445        ) {
446            timeout(timeout_duration, self.send_checkpoint(checkpoint))
447                .await
448                .unwrap_err(); // Panics if send succeeds
449        }
450    }
451
452    #[tokio::test]
453    async fn test_e2e_pipeline() {
454        let config = ConcurrentConfig {
455            pruner: Some(PrunerConfig {
456                interval_ms: 5_000, // Long interval to test states before pruning
457                delay_ms: 100,      // Short delay for faster tests
458                retention: 3,       // Keep only 3 checkpoints
459                ..Default::default()
460            }),
461            ..Default::default()
462        };
463        let store = MockStore::default();
464        let setup = TestSetup::new(config, store, 0).await;
465
466        // Send initial checkpoints
467        for i in 0..3 {
468            setup
469                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
470                .await
471                .unwrap();
472        }
473
474        // Verify all initial data is available (before any pruning)
475        for i in 0..3 {
476            let data = setup
477                .store
478                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
479                .await;
480            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
481        }
482
483        // Add more checkpoints to trigger pruning
484        for i in 3..6 {
485            setup
486                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
487                .await
488                .unwrap();
489        }
490
491        // Verify data is still available BEFORE pruning kicks in
492        // With long pruning interval (5s), we can safely verify data without race conditions
493        for i in 0..6 {
494            let data = setup
495                .store
496                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
497                .await;
498            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
499        }
500
501        // Wait for pruning to occur (5s + delay + processing time)
502        tokio::time::sleep(Duration::from_millis(5_200)).await;
503
504        // Verify pruning has occurred
505        {
506            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
507
508            // Verify recent checkpoints are still available
509            assert!(data.contains_key(&3));
510            assert!(data.contains_key(&4));
511            assert!(data.contains_key(&5));
512
513            // Verify old checkpoints are pruned
514            assert!(!data.contains_key(&0));
515            assert!(!data.contains_key(&1));
516            assert!(!data.contains_key(&2));
517        };
518    }
519
520    #[tokio::test]
521    async fn test_e2e_pipeline_without_pruning() {
522        let config = ConcurrentConfig {
523            pruner: None,
524            ..Default::default()
525        };
526        let store = MockStore::default();
527        let setup = TestSetup::new(config, store, 0).await;
528
529        // Send several checkpoints
530        for i in 0..10 {
531            setup
532                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
533                .await
534                .unwrap();
535        }
536
537        // Wait for all data to be processed and committed
538        let watermark = setup
539            .store
540            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
541            .await;
542
543        // Verify ALL data was processed correctly (no pruning)
544        for i in 0..10 {
545            let data = setup
546                .store
547                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
548                .await;
549            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
550        }
551
552        // Verify watermark progression
553        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
554        assert_eq!(watermark.tx_hi, 18); // 9 * 2
555        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
556
557        // Verify no data was pruned - all 10 checkpoints should still exist
558        let total_checkpoints = {
559            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
560            data.len()
561        };
562        assert_eq!(total_checkpoints, 10);
563    }
564
565    #[tokio::test]
566    async fn test_out_of_order_processing() {
567        let config = ConcurrentConfig::default();
568        let store = MockStore::default();
569        let setup = TestSetup::new(config, store, 0).await;
570
571        // Send checkpoints out of order
572        let checkpoints = vec![2, 0, 4, 1, 3];
573        for cp in checkpoints {
574            setup
575                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
576                .await
577                .unwrap();
578        }
579
580        // Wait for all data to be processed
581        setup
582            .store
583            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
584            .await;
585
586        // Verify all checkpoints were processed correctly despite out-of-order arrival
587        for i in 0..5 {
588            let data = setup
589                .store
590                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
591                .await;
592            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
593        }
594    }
595
596    #[tokio::test]
597    async fn test_watermark_progression_with_gaps() {
598        let config = ConcurrentConfig::default();
599        let store = MockStore::default();
600        let setup = TestSetup::new(config, store, 0).await;
601
602        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
603        for cp in [0, 1, 3, 4] {
604            setup
605                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
606                .await
607                .unwrap();
608        }
609
610        // Wait for processing
611        tokio::time::sleep(Duration::from_secs(1)).await;
612
613        // Watermark should only progress to 1 (can't progress past the gap)
614        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
615        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
616
617        // Now send the missing checkpoint 2
618        setup
619            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
620            .await
621            .unwrap();
622
623        // Now watermark should progress to 4
624        let watermark = setup
625            .store
626            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
627            .await;
628        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
629    }
630
631    // ==================== BACK-PRESSURE TESTING ====================
632
633    #[tokio::test]
634    async fn test_back_pressure_collector_max_pending_rows() {
635        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
636        //
637        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
638        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
639        // │   Input    │    │ (FANOUT=2) │    │            │    │            │
640        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
641        //                │                 │                 │
642        //              [●●●]           [●●●●●●●]         [●●●●●●]
643        //            buffer: 3         buffer: 7         buffer: 6
644        //
645        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
646
647        let config = ConcurrentConfig {
648            committer: CommitterConfig {
649                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
650                write_concurrency: 1,
651                ..Default::default()
652            },
653            ..Default::default()
654        };
655        let store = MockStore::default();
656        let setup = TestSetup::new(config, store, 0).await;
657
658        // Wait for initial setup
659        tokio::time::sleep(Duration::from_millis(200)).await;
660
661        // Pipeline capacity analysis with collector back pressure:
662        // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
663        //
664        // Channel and task breakdown:
665        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
666        // - Processor tasks: 2 tasks (FANOUT=2)
667        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
668        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
669        //
670        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
671
672        // Fill pipeline to capacity - these should all succeed
673        for i in 0..14 {
674            setup
675                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
676                .await
677                .unwrap();
678        }
679
680        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
681        setup
682            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
683            .await;
684
685        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
686        setup
687            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
688            .await
689            .unwrap();
690
691        // Verify data was processed correctly
692        let data = setup
693            .store
694            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
695            .await;
696        assert_eq!(data, vec![1, 2]);
697    }
698
699    #[tokio::test]
700    async fn test_back_pressure_committer_slow_commits() {
701        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
702        //
703        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
704        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
705        // │   Input    │    │ (FANOUT=2) │    │            │    │🐌 10s Delay│
706        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
707        //                │                 │                 │
708        //              [●●●]           [●●●●●●●]         [●●●●●●]
709        //            buffer: 3         buffer: 7         buffer: 6
710        //
711        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
712
713        let config = ConcurrentConfig {
714            committer: CommitterConfig {
715                write_concurrency: 1, // Single committer for deterministic blocking
716                ..Default::default()
717            },
718            ..Default::default()
719        };
720        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
721        let setup = TestSetup::new(config, store, 0).await;
722
723        // Pipeline capacity analysis with slow commits:
724        // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
725        //
726        // Channel and task breakdown:
727        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
728        // - Processor tasks: 2 tasks (FANOUT=2)
729        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
730        // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
731        // - Committer task: 1 task (blocked by slow commit)
732        //
733        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
734
735        // Fill pipeline to theoretical capacity - these should all succeed
736        for i in 0..19 {
737            setup
738                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
739                .await
740                .unwrap();
741        }
742
743        // Find the actual back pressure point
744        // Due to non-determinism in collector's tokio::select!, the collector may consume
745        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
746        // This means back pressure occurs somewhere in range 19-21.
747        let mut back_pressure_checkpoint = None;
748        for checkpoint in 19..22 {
749            if setup
750                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
751                .await
752                .is_err()
753            {
754                back_pressure_checkpoint = Some(checkpoint);
755                break;
756            }
757        }
758        assert!(
759            back_pressure_checkpoint.is_some(),
760            "Back pressure should occur between checkpoints 19-21"
761        );
762
763        // Verify that some data has been processed (pipeline is working)
764        setup
765            .store
766            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
767            .await;
768
769        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
770        setup
771            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
772            .await
773            .unwrap();
774    }
775
776    // ==================== FAILURE TESTING ====================
777
778    #[tokio::test]
779    async fn test_commit_failure_retry() {
780        let config = ConcurrentConfig::default();
781        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
782        let setup = TestSetup::new(config, store, 0).await;
783
784        // Send a checkpoint
785        setup
786            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
787            .await
788            .unwrap();
789
790        // Should eventually succeed despite initial commit failures
791        setup
792            .store
793            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
794            .await;
795
796        // Verify data was eventually committed
797        let data = setup
798            .store
799            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
800            .await;
801        assert_eq!(data, vec![1, 2]);
802    }
803
804    #[tokio::test]
805    async fn test_prune_failure_retry() {
806        let config = ConcurrentConfig {
807            pruner: Some(PrunerConfig {
808                interval_ms: 2000, // 2 seconds interval for testing
809                delay_ms: 100,     // Short delay
810                retention: 2,      // Keep only 2 checkpoints
811                ..Default::default()
812            }),
813            ..Default::default()
814        };
815
816        // Configure prune failures for range [0, 2) - fail twice then succeed
817        let store = MockStore::default().with_prune_failures(0, 2, 1);
818        let setup = TestSetup::new(config, store, 0).await;
819
820        // Send enough checkpoints to trigger pruning
821        for i in 0..4 {
822            setup
823                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
824                .await
825                .unwrap();
826        }
827
828        // Verify data is still available BEFORE pruning kicks in
829        // With long pruning interval (5s), we can safely verify data without race conditions
830        for i in 0..4 {
831            let data = setup
832                .store
833                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
834                .await;
835            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
836        }
837
838        // Wait for first pruning attempt (should fail) and verify no data has been pruned
839        setup
840            .store
841            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
842            .await;
843        {
844            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
845            for i in 0..4 {
846                assert!(data.contains_key(&i));
847            }
848        };
849
850        // Wait for second pruning attempt (should succeed)
851        setup
852            .store
853            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
854            .await;
855        {
856            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
857            // Verify recent checkpoints are still available
858            assert!(data.contains_key(&2));
859            assert!(data.contains_key(&3));
860
861            // Verify old checkpoints are pruned
862            assert!(!data.contains_key(&0));
863            assert!(!data.contains_key(&1));
864        };
865    }
866
867    #[tokio::test]
868    async fn test_reader_watermark_failure_retry() {
869        let config = ConcurrentConfig {
870            pruner: Some(PrunerConfig {
871                interval_ms: 100, // Fast interval for testing
872                delay_ms: 100,    // Short delay
873                retention: 3,     // Keep 3 checkpoints
874                ..Default::default()
875            }),
876            ..Default::default()
877        };
878
879        // Configure reader watermark failures - fail 2 times then succeed
880        let store = MockStore::default().with_reader_watermark_failures(2);
881        let setup = TestSetup::new(config, store, 0).await;
882
883        // Send checkpoints to trigger reader watermark updates
884        for i in 0..6 {
885            setup
886                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
887                .await
888                .unwrap();
889        }
890
891        // Wait for processing to complete
892        setup
893            .store
894            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
895            .await;
896
897        // Wait for reader watermark task to attempt updates (with failures and retries)
898        tokio::time::sleep(Duration::from_secs(2)).await;
899
900        // Verify that reader watermark was eventually updated despite failures
901        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
902        assert_eq!(watermark.reader_lo, 3);
903    }
904
905    #[tokio::test]
906    async fn test_database_connection_failure_retry() {
907        let config = ConcurrentConfig::default();
908        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
909        let setup = TestSetup::new(config, store, 0).await;
910
911        // Send a checkpoint
912        setup
913            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
914            .await
915            .unwrap();
916
917        // Should eventually succeed despite initial failures
918        setup
919            .store
920            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
921            .await;
922
923        // Verify data was eventually committed
924        let data = setup
925            .store
926            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
927            .await;
928        assert_eq!(data, vec![1, 2]);
929    }
930}