sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::metrics::IndexerMetrics;
17use crate::pipeline::CommitterConfig;
18use crate::pipeline::PIPELINE_BUFFER;
19use crate::pipeline::Processor;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::collector::collector;
22use crate::pipeline::concurrent::commit_watermark::commit_watermark;
23use crate::pipeline::concurrent::committer::committer;
24use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
25use crate::pipeline::concurrent::pruner::pruner;
26use crate::pipeline::concurrent::reader_watermark::reader_watermark;
27use crate::pipeline::processor::processor;
28use crate::store::Store;
29use crate::types::full_checkpoint_content::Checkpoint;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41    /// The batch can accept more values.
42    Pending,
43    /// The batch is full and should be committed.
44    Ready,
45}
46
47/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
48/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
49///
50/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
51/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
52/// usage, but each handle may choose to override these defaults, e.g.
53///
54/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
55///   sizes).
56/// - Handlers that do more work during processing may wish to increase their fanout so more of it
57///   can be done concurrently, to preserve throughput.
58///
59/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
60/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
61/// checkpoint below which all data has been committed.
62///
63/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
64/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
65/// back to the ingestion service.
66#[async_trait]
67pub trait Handler: Processor {
68    type Store: Store;
69    type Batch: Default + Send + Sync + 'static;
70
71    /// If at least this many rows are pending, the committer will commit them eagerly.
72    const MIN_EAGER_ROWS: usize = 50;
73
74    /// If there are more than this many rows pending, the committer applies backpressure.
75    const MAX_PENDING_ROWS: usize = 5000;
76
77    /// The maximum number of watermarks that can show up in a single batch.
78    /// This limit exists to deal with pipelines that produce no data for a majority of
79    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
80    const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82    /// Add values from the iterator to the batch. The implementation may take all, some, or none
83    /// of the values from the iterator by calling `.next()`.
84    ///
85    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
86    /// or `BatchStatus::Pending` if the batch can accept more values.
87    ///
88    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
89    /// may also decide to commit a batch based on the trait parameters above.
90    fn batch(
91        &self,
92        batch: &mut Self::Batch,
93        values: &mut std::vec::IntoIter<Self::Value>,
94    ) -> BatchStatus;
95
96    /// Commit the batch to the database, returning the number of rows affected.
97    async fn commit<'a>(
98        &self,
99        batch: &Self::Batch,
100        conn: &mut <Self::Store as Store>::Connection<'a>,
101    ) -> anyhow::Result<usize>;
102
103    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
104    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
105    async fn prune<'a>(
106        &self,
107        _from: u64,
108        _to_exclusive: u64,
109        _conn: &mut <Self::Store as Store>::Connection<'a>,
110    ) -> anyhow::Result<usize> {
111        Ok(0)
112    }
113}
114
115/// Configuration for a concurrent pipeline
116#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118    /// Configuration for the writer, that makes forward progress.
119    pub committer: CommitterConfig,
120
121    /// Configuration for the pruner, that deletes old data.
122    pub pruner: Option<PrunerConfig>,
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone)]
126pub struct PrunerConfig {
127    /// How often the pruner should check whether there is any data to prune, in milliseconds.
128    pub interval_ms: u64,
129
130    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
131    /// this new watermark, in milliseconds.
132    pub delay_ms: u64,
133
134    /// How much data to keep, this is measured in checkpoints.
135    pub retention: u64,
136
137    /// The maximum range to try and prune in one request, measured in checkpoints.
138    pub max_chunk_size: u64,
139
140    /// The max number of tasks to run in parallel for pruning.
141    pub prune_concurrency: u64,
142}
143
144/// Values ready to be written to the database. This is an internal type used to communicate
145/// between the collector and the committer parts of the pipeline.
146///
147/// Values inside each batch may or may not be from the same checkpoint. Values in the same
148/// checkpoint can also be split across multiple batches.
149struct BatchedRows<H: Handler> {
150    /// The batch to write
151    batch: H::Batch,
152    /// Number of rows in the batch
153    batch_len: usize,
154    /// Proportions of all the watermarks that are represented in this chunk
155    watermark: Vec<WatermarkPart>,
156}
157
158impl<H, V> BatchedRows<H>
159where
160    H: Handler<Batch = Vec<V>, Value = V>,
161{
162    #[cfg(test)]
163    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
164        let batch_len = batch.len();
165        Self {
166            batch,
167            batch_len,
168            watermark,
169        }
170    }
171}
172
173impl PrunerConfig {
174    pub fn interval(&self) -> Duration {
175        Duration::from_millis(self.interval_ms)
176    }
177
178    pub fn delay(&self) -> Duration {
179        Duration::from_millis(self.delay_ms)
180    }
181}
182
183impl Default for PrunerConfig {
184    fn default() -> Self {
185        Self {
186            interval_ms: 300_000,
187            delay_ms: 120_000,
188            retention: 4_000_000,
189            max_chunk_size: 2_000,
190            prune_concurrency: 1,
191        }
192    }
193}
194
195/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
196/// strictly after the `watermark` (or from the beginning if no watermark was provided).
197///
198/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
199/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
200/// the database, a committer which writes the rows out concurrently, and a watermark task to
201/// update the high watermark.
202///
203/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
204/// either because it received the checkpoints out-of-order or because of variance in processing
205/// time.
206///
207/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
208/// watermark below which all data has been committed (modulo pruning).
209///
210/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
211/// channels are created to communicate between its various components. The pipeline will shutdown
212/// if any of its input or output channels close, any of its independent tasks fail, or if it is
213/// signalled to shutdown through the returned service handle.
214pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
215    handler: H,
216    next_checkpoint: u64,
217    config: ConcurrentConfig,
218    store: H::Store,
219    task: Option<Task>,
220    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
221    metrics: Arc<IndexerMetrics>,
222) -> Service {
223    info!(
224        pipeline = H::NAME,
225        "Starting pipeline with config: {config:#?}",
226    );
227
228    let ConcurrentConfig {
229        committer: committer_config,
230        pruner: pruner_config,
231    } = config;
232
233    let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
234    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
235    let (collector_tx, committer_rx) =
236        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
237    //docs::/#buff
238    let (committer_tx, watermark_rx) =
239        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
240    let main_reader_lo = Arc::new(SetOnce::new());
241
242    let handler = Arc::new(handler);
243
244    let s_processor = processor(
245        handler.clone(),
246        checkpoint_rx,
247        processor_tx,
248        metrics.clone(),
249    );
250
251    let s_collector = collector::<H>(
252        handler.clone(),
253        committer_config.clone(),
254        collector_rx,
255        collector_tx,
256        main_reader_lo.clone(),
257        metrics.clone(),
258    );
259
260    let s_committer = committer::<H>(
261        handler.clone(),
262        committer_config.clone(),
263        committer_rx,
264        committer_tx,
265        store.clone(),
266        metrics.clone(),
267    );
268
269    let s_commit_watermark = commit_watermark::<H>(
270        next_checkpoint,
271        committer_config,
272        watermark_rx,
273        store.clone(),
274        task.as_ref().map(|t| t.task.clone()),
275        metrics.clone(),
276    );
277
278    let s_track_reader_lo = track_main_reader_lo::<H>(
279        main_reader_lo.clone(),
280        task.as_ref().map(|t| t.reader_interval),
281        store.clone(),
282    );
283
284    let s_reader_watermark =
285        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
286
287    let s_pruner = pruner(handler, pruner_config, store, metrics);
288
289    s_processor
290        .merge(s_collector)
291        .merge(s_committer)
292        .merge(s_commit_watermark)
293        .attach(s_track_reader_lo)
294        .attach(s_reader_watermark)
295        .attach(s_pruner)
296}
297
298#[cfg(test)]
299mod tests {
300    use std::sync::Arc;
301    use std::time::Duration;
302
303    use prometheus::Registry;
304    use tokio::sync::mpsc;
305    use tokio::time::timeout;
306
307    use crate::FieldCount;
308    use crate::metrics::IndexerMetrics;
309    use crate::mocks::store::MockConnection;
310    use crate::mocks::store::MockStore;
311    use crate::pipeline::Processor;
312    use crate::types::full_checkpoint_content::Checkpoint;
313    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
314
315    use super::*;
316
317    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
318    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
319
320    #[derive(Clone, Debug, FieldCount)]
321    struct TestValue {
322        checkpoint: u64,
323        data: u64,
324    }
325
326    struct DataPipeline;
327
328    #[async_trait]
329    impl Processor for DataPipeline {
330        const NAME: &'static str = "test_handler";
331        const FANOUT: usize = 2;
332        type Value = TestValue;
333
334        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
335            let cp_num = checkpoint.summary.sequence_number;
336
337            // Every checkpoint will come with 2 processed values
338            Ok(vec![
339                TestValue {
340                    checkpoint: cp_num,
341                    data: cp_num * 10 + 1,
342                },
343                TestValue {
344                    checkpoint: cp_num,
345                    data: cp_num * 10 + 2,
346                },
347            ])
348        }
349    }
350
351    #[async_trait]
352    impl Handler for DataPipeline {
353        type Store = MockStore;
354        type Batch = Vec<TestValue>;
355
356        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
357        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
358        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
359
360        fn batch(
361            &self,
362            batch: &mut Self::Batch,
363            values: &mut std::vec::IntoIter<Self::Value>,
364        ) -> BatchStatus {
365            // Take all values
366            batch.extend(values);
367            BatchStatus::Pending
368        }
369
370        async fn commit<'a>(
371            &self,
372            batch: &Self::Batch,
373            conn: &mut MockConnection<'a>,
374        ) -> anyhow::Result<usize> {
375            // Group values by checkpoint
376            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
377                std::collections::HashMap::new();
378            for value in batch {
379                grouped
380                    .entry(value.checkpoint)
381                    .or_default()
382                    .push(value.data);
383            }
384
385            // Commit all data at once
386            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
387        }
388
389        async fn prune<'a>(
390            &self,
391            from: u64,
392            to_exclusive: u64,
393            conn: &mut MockConnection<'a>,
394        ) -> anyhow::Result<usize> {
395            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
396        }
397    }
398
399    struct TestSetup {
400        store: MockStore,
401        checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
402        #[allow(unused)]
403        pipeline: Service,
404    }
405
406    impl TestSetup {
407        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
408            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
409            let metrics = IndexerMetrics::new(None, &Registry::default());
410
411            let pipeline = pipeline(
412                DataPipeline,
413                next_checkpoint,
414                config,
415                store.clone(),
416                None,
417                checkpoint_rx,
418                metrics,
419            );
420
421            Self {
422                store,
423                checkpoint_tx,
424                pipeline,
425            }
426        }
427
428        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
429            let checkpoint = Arc::new(
430                TestCheckpointBuilder::new(checkpoint)
431                    .with_epoch(1)
432                    .with_network_total_transactions(checkpoint * 2)
433                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
434                    .build_checkpoint(),
435            );
436            self.checkpoint_tx.send(checkpoint).await?;
437            Ok(())
438        }
439
440        async fn send_checkpoint_with_timeout(
441            &self,
442            checkpoint: u64,
443            timeout_duration: Duration,
444        ) -> anyhow::Result<()> {
445            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
446        }
447
448        async fn send_checkpoint_expect_timeout(
449            &self,
450            checkpoint: u64,
451            timeout_duration: Duration,
452        ) {
453            timeout(timeout_duration, self.send_checkpoint(checkpoint))
454                .await
455                .unwrap_err(); // Panics if send succeeds
456        }
457    }
458
459    #[tokio::test]
460    async fn test_e2e_pipeline() {
461        let config = ConcurrentConfig {
462            pruner: Some(PrunerConfig {
463                interval_ms: 5_000, // Long interval to test states before pruning
464                delay_ms: 100,      // Short delay for faster tests
465                retention: 3,       // Keep only 3 checkpoints
466                ..Default::default()
467            }),
468            ..Default::default()
469        };
470        let store = MockStore::default();
471        let setup = TestSetup::new(config, store, 0).await;
472
473        // Send initial checkpoints
474        for i in 0..3 {
475            setup
476                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
477                .await
478                .unwrap();
479        }
480
481        // Verify all initial data is available (before any pruning)
482        for i in 0..3 {
483            let data = setup
484                .store
485                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
486                .await;
487            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
488        }
489
490        // Add more checkpoints to trigger pruning
491        for i in 3..6 {
492            setup
493                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
494                .await
495                .unwrap();
496        }
497
498        // Verify data is still available BEFORE pruning kicks in
499        // With long pruning interval (5s), we can safely verify data without race conditions
500        for i in 0..6 {
501            let data = setup
502                .store
503                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
504                .await;
505            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
506        }
507
508        // Wait for pruning to occur (5s + delay + processing time)
509        tokio::time::sleep(Duration::from_millis(5_200)).await;
510
511        // Verify pruning has occurred
512        {
513            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
514
515            // Verify recent checkpoints are still available
516            assert!(data.contains_key(&3));
517            assert!(data.contains_key(&4));
518            assert!(data.contains_key(&5));
519
520            // Verify old checkpoints are pruned
521            assert!(!data.contains_key(&0));
522            assert!(!data.contains_key(&1));
523            assert!(!data.contains_key(&2));
524        };
525    }
526
527    #[tokio::test]
528    async fn test_e2e_pipeline_without_pruning() {
529        let config = ConcurrentConfig {
530            pruner: None,
531            ..Default::default()
532        };
533        let store = MockStore::default();
534        let setup = TestSetup::new(config, store, 0).await;
535
536        // Send several checkpoints
537        for i in 0..10 {
538            setup
539                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
540                .await
541                .unwrap();
542        }
543
544        // Wait for all data to be processed and committed
545        let watermark = setup
546            .store
547            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
548            .await;
549
550        // Verify ALL data was processed correctly (no pruning)
551        for i in 0..10 {
552            let data = setup
553                .store
554                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
555                .await;
556            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
557        }
558
559        // Verify watermark progression
560        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
561        assert_eq!(watermark.tx_hi, 18); // 9 * 2
562        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
563
564        // Verify no data was pruned - all 10 checkpoints should still exist
565        let total_checkpoints = {
566            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
567            data.len()
568        };
569        assert_eq!(total_checkpoints, 10);
570    }
571
572    #[tokio::test]
573    async fn test_out_of_order_processing() {
574        let config = ConcurrentConfig::default();
575        let store = MockStore::default();
576        let setup = TestSetup::new(config, store, 0).await;
577
578        // Send checkpoints out of order
579        let checkpoints = vec![2, 0, 4, 1, 3];
580        for cp in checkpoints {
581            setup
582                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
583                .await
584                .unwrap();
585        }
586
587        // Wait for all data to be processed
588        setup
589            .store
590            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
591            .await;
592
593        // Verify all checkpoints were processed correctly despite out-of-order arrival
594        for i in 0..5 {
595            let data = setup
596                .store
597                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
598                .await;
599            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
600        }
601    }
602
603    #[tokio::test]
604    async fn test_watermark_progression_with_gaps() {
605        let config = ConcurrentConfig::default();
606        let store = MockStore::default();
607        let setup = TestSetup::new(config, store, 0).await;
608
609        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
610        for cp in [0, 1, 3, 4] {
611            setup
612                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
613                .await
614                .unwrap();
615        }
616
617        // Wait for processing
618        tokio::time::sleep(Duration::from_secs(1)).await;
619
620        // Watermark should only progress to 1 (can't progress past the gap)
621        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
622        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
623
624        // Now send the missing checkpoint 2
625        setup
626            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
627            .await
628            .unwrap();
629
630        // Now watermark should progress to 4
631        let watermark = setup
632            .store
633            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
634            .await;
635        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
636    }
637
638    // ==================== BACK-PRESSURE TESTING ====================
639
640    #[tokio::test]
641    async fn test_back_pressure_collector_max_pending_rows() {
642        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
643        //
644        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
645        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
646        // │   Input    │    │ (FANOUT=2) │    │            │    │            │
647        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
648        //                │                 │                 │
649        //              [●●●]           [●●●●●●●]         [●●●●●●]
650        //            buffer: 3         buffer: 7         buffer: 6
651        //
652        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
653
654        let config = ConcurrentConfig {
655            committer: CommitterConfig {
656                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
657                write_concurrency: 1,
658                ..Default::default()
659            },
660            ..Default::default()
661        };
662        let store = MockStore::default();
663        let setup = TestSetup::new(config, store, 0).await;
664
665        // Wait for initial setup
666        tokio::time::sleep(Duration::from_millis(200)).await;
667
668        // Pipeline capacity analysis with collector back pressure:
669        // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
670        //
671        // Channel and task breakdown:
672        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
673        // - Processor tasks: 2 tasks (FANOUT=2)
674        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
675        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
676        //
677        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
678
679        // Fill pipeline to capacity - these should all succeed
680        for i in 0..14 {
681            setup
682                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
683                .await
684                .unwrap();
685        }
686
687        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
688        setup
689            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
690            .await;
691
692        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
693        setup
694            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
695            .await
696            .unwrap();
697
698        // Verify data was processed correctly
699        let data = setup
700            .store
701            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
702            .await;
703        assert_eq!(data, vec![1, 2]);
704    }
705
706    #[tokio::test]
707    async fn test_back_pressure_committer_slow_commits() {
708        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
709        //
710        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
711        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
712        // │   Input    │    │ (FANOUT=2) │    │            │    │🐌 10s Delay│
713        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
714        //                │                 │                 │
715        //              [●●●]           [●●●●●●●]         [●●●●●●]
716        //            buffer: 3         buffer: 7         buffer: 6
717        //
718        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
719
720        let config = ConcurrentConfig {
721            committer: CommitterConfig {
722                write_concurrency: 1, // Single committer for deterministic blocking
723                ..Default::default()
724            },
725            ..Default::default()
726        };
727        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
728        let setup = TestSetup::new(config, store, 0).await;
729
730        // Pipeline capacity analysis with slow commits:
731        // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
732        //
733        // Channel and task breakdown:
734        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
735        // - Processor tasks: 2 tasks (FANOUT=2)
736        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
737        // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
738        // - Committer task: 1 task (blocked by slow commit)
739        //
740        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
741
742        // Fill pipeline to theoretical capacity - these should all succeed
743        for i in 0..19 {
744            setup
745                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
746                .await
747                .unwrap();
748        }
749
750        // Find the actual back pressure point
751        // Due to non-determinism in collector's tokio::select!, the collector may consume
752        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
753        // This means back pressure occurs somewhere in range 19-21.
754        let mut back_pressure_checkpoint = None;
755        for checkpoint in 19..22 {
756            if setup
757                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
758                .await
759                .is_err()
760            {
761                back_pressure_checkpoint = Some(checkpoint);
762                break;
763            }
764        }
765        assert!(
766            back_pressure_checkpoint.is_some(),
767            "Back pressure should occur between checkpoints 19-21"
768        );
769
770        // Verify that some data has been processed (pipeline is working)
771        setup
772            .store
773            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
774            .await;
775
776        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
777        setup
778            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
779            .await
780            .unwrap();
781    }
782
783    // ==================== FAILURE TESTING ====================
784
785    #[tokio::test]
786    async fn test_commit_failure_retry() {
787        let config = ConcurrentConfig::default();
788        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
789        let setup = TestSetup::new(config, store, 0).await;
790
791        // Send a checkpoint
792        setup
793            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
794            .await
795            .unwrap();
796
797        // Should eventually succeed despite initial commit failures
798        setup
799            .store
800            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
801            .await;
802
803        // Verify data was eventually committed
804        let data = setup
805            .store
806            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
807            .await;
808        assert_eq!(data, vec![1, 2]);
809    }
810
811    #[tokio::test]
812    async fn test_prune_failure_retry() {
813        let config = ConcurrentConfig {
814            pruner: Some(PrunerConfig {
815                interval_ms: 2000, // 2 seconds interval for testing
816                delay_ms: 100,     // Short delay
817                retention: 2,      // Keep only 2 checkpoints
818                ..Default::default()
819            }),
820            ..Default::default()
821        };
822
823        // Configure prune failures for range [0, 2) - fail twice then succeed
824        let store = MockStore::default().with_prune_failures(0, 2, 1);
825        let setup = TestSetup::new(config, store, 0).await;
826
827        // Send enough checkpoints to trigger pruning
828        for i in 0..4 {
829            setup
830                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
831                .await
832                .unwrap();
833        }
834
835        // Verify data is still available BEFORE pruning kicks in
836        // With long pruning interval (5s), we can safely verify data without race conditions
837        for i in 0..4 {
838            let data = setup
839                .store
840                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
841                .await;
842            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
843        }
844
845        // Wait for first pruning attempt (should fail) and verify no data has been pruned
846        setup
847            .store
848            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
849            .await;
850        {
851            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
852            for i in 0..4 {
853                assert!(data.contains_key(&i));
854            }
855        };
856
857        // Wait for second pruning attempt (should succeed)
858        setup
859            .store
860            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
861            .await;
862        {
863            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
864            // Verify recent checkpoints are still available
865            assert!(data.contains_key(&2));
866            assert!(data.contains_key(&3));
867
868            // Verify old checkpoints are pruned
869            assert!(!data.contains_key(&0));
870            assert!(!data.contains_key(&1));
871        };
872    }
873
874    #[tokio::test]
875    async fn test_reader_watermark_failure_retry() {
876        let config = ConcurrentConfig {
877            pruner: Some(PrunerConfig {
878                interval_ms: 100, // Fast interval for testing
879                delay_ms: 100,    // Short delay
880                retention: 3,     // Keep 3 checkpoints
881                ..Default::default()
882            }),
883            ..Default::default()
884        };
885
886        // Configure reader watermark failures - fail 2 times then succeed
887        let store = MockStore::default().with_reader_watermark_failures(2);
888        let setup = TestSetup::new(config, store, 0).await;
889
890        // Send checkpoints to trigger reader watermark updates
891        for i in 0..6 {
892            setup
893                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
894                .await
895                .unwrap();
896        }
897
898        // Wait for processing to complete
899        setup
900            .store
901            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
902            .await;
903
904        // Wait for reader watermark task to attempt updates (with failures and retries)
905        tokio::time::sleep(Duration::from_secs(2)).await;
906
907        // Verify that reader watermark was eventually updated despite failures
908        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
909        assert_eq!(watermark.reader_lo, 3);
910    }
911
912    #[tokio::test]
913    async fn test_database_connection_failure_retry() {
914        let config = ConcurrentConfig::default();
915        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
916        let setup = TestSetup::new(config, store, 0).await;
917
918        // Send a checkpoint
919        setup
920            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
921            .await
922            .unwrap();
923
924        // Should eventually succeed despite initial failures
925        setup
926            .store
927            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
928            .await;
929
930        // Verify data was eventually committed
931        let data = setup
932            .store
933            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
934            .await;
935        assert_eq!(data, vec![1, 2]);
936    }
937}