sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::Processor;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::collector::collector;
22use crate::pipeline::concurrent::commit_watermark::commit_watermark;
23use crate::pipeline::concurrent::committer::committer;
24use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
25use crate::pipeline::concurrent::pruner::pruner;
26use crate::pipeline::concurrent::reader_watermark::reader_watermark;
27use crate::pipeline::processor::processor;
28use crate::store::Store;
29use crate::types::full_checkpoint_content::Checkpoint;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41    /// The batch can accept more values.
42    Pending,
43    /// The batch is full and should be committed.
44    Ready,
45}
46
47/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
48/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
49///
50/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
51/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
52/// usage, but each handle may choose to override these defaults, e.g.
53///
54/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
55///   sizes).
56/// - Handlers that do more work during processing may wish to increase their fanout so more of it
57///   can be done concurrently, to preserve throughput.
58///
59/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
60/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
61/// checkpoint below which all data has been committed.
62///
63/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
64/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
65/// back to the ingestion service.
66#[async_trait]
67pub trait Handler: Processor {
68    type Store: Store;
69    type Batch: Default + Send + Sync + 'static;
70
71    /// If at least this many rows are pending, the committer will commit them eagerly.
72    const MIN_EAGER_ROWS: usize = 50;
73
74    /// If there are more than this many rows pending, the committer applies backpressure.
75    const MAX_PENDING_ROWS: usize = 5000;
76
77    /// The maximum number of watermarks that can show up in a single batch.
78    /// This limit exists to deal with pipelines that produce no data for a majority of
79    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
80    const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82    /// Add values from the iterator to the batch. The implementation may take all, some, or none
83    /// of the values from the iterator by calling `.next()`.
84    ///
85    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
86    /// or `BatchStatus::Pending` if the batch can accept more values.
87    ///
88    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
89    /// may also decide to commit a batch based on the trait parameters above.
90    fn batch(
91        &self,
92        batch: &mut Self::Batch,
93        values: &mut std::vec::IntoIter<Self::Value>,
94    ) -> BatchStatus;
95
96    /// Commit the batch to the database, returning the number of rows affected.
97    async fn commit<'a>(
98        &self,
99        batch: &Self::Batch,
100        conn: &mut <Self::Store as Store>::Connection<'a>,
101    ) -> anyhow::Result<usize>;
102
103    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
104    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
105    async fn prune<'a>(
106        &self,
107        _from: u64,
108        _to_exclusive: u64,
109        _conn: &mut <Self::Store as Store>::Connection<'a>,
110    ) -> anyhow::Result<usize> {
111        Ok(0)
112    }
113}
114
115/// Configuration for a concurrent pipeline
116#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118    /// Configuration for the writer, that makes forward progress.
119    pub committer: CommitterConfig,
120
121    /// Configuration for the pruner, that deletes old data.
122    pub pruner: Option<PrunerConfig>,
123
124    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
125    pub fanout: Option<ConcurrencyConfig>,
126
127    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
128    pub min_eager_rows: Option<usize>,
129
130    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
131    pub max_pending_rows: Option<usize>,
132
133    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
134    pub max_watermark_updates: Option<usize>,
135
136    /// Size of the channel between the processor and collector.
137    pub processor_channel_size: Option<usize>,
138
139    /// Size of the channel between the collector and committer.
140    pub collector_channel_size: Option<usize>,
141
142    /// Size of the channel between the committer and the watermark updater.
143    pub committer_channel_size: Option<usize>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147pub struct PrunerConfig {
148    /// How often the pruner should check whether there is any data to prune, in milliseconds.
149    pub interval_ms: u64,
150
151    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
152    /// this new watermark, in milliseconds.
153    pub delay_ms: u64,
154
155    /// How much data to keep, this is measured in checkpoints.
156    pub retention: u64,
157
158    /// The maximum range to try and prune in one request, measured in checkpoints.
159    pub max_chunk_size: u64,
160
161    /// The max number of tasks to run in parallel for pruning.
162    pub prune_concurrency: u64,
163}
164
165/// Values ready to be written to the database. This is an internal type used to communicate
166/// between the collector and the committer parts of the pipeline.
167///
168/// Values inside each batch may or may not be from the same checkpoint. Values in the same
169/// checkpoint can also be split across multiple batches.
170struct BatchedRows<H: Handler> {
171    /// The batch to write
172    batch: H::Batch,
173    /// Number of rows in the batch
174    batch_len: usize,
175    /// Proportions of all the watermarks that are represented in this chunk
176    watermark: Vec<WatermarkPart>,
177}
178
179impl<H, V> BatchedRows<H>
180where
181    H: Handler<Batch = Vec<V>, Value = V>,
182{
183    #[cfg(test)]
184    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
185        let batch_len = batch.len();
186        Self {
187            batch,
188            batch_len,
189            watermark,
190        }
191    }
192}
193
194impl PrunerConfig {
195    pub fn interval(&self) -> Duration {
196        Duration::from_millis(self.interval_ms)
197    }
198
199    pub fn delay(&self) -> Duration {
200        Duration::from_millis(self.delay_ms)
201    }
202}
203
204impl Default for PrunerConfig {
205    fn default() -> Self {
206        Self {
207            interval_ms: 300_000,
208            delay_ms: 120_000,
209            retention: 4_000_000,
210            max_chunk_size: 2_000,
211            prune_concurrency: 1,
212        }
213    }
214}
215
216/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
217/// strictly after the `watermark` (or from the beginning if no watermark was provided).
218///
219/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
220/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
221/// the database, a committer which writes the rows out concurrently, and a watermark task to
222/// update the high watermark.
223///
224/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
225/// either because it received the checkpoints out-of-order or because of variance in processing
226/// time.
227///
228/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
229/// watermark below which all data has been committed (modulo pruning).
230///
231/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
232/// channels are created to communicate between its various components. The pipeline will shutdown
233/// if any of its input or output channels close, any of its independent tasks fail, or if it is
234/// signalled to shutdown through the returned service handle.
235pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
236    handler: H,
237    next_checkpoint: u64,
238    config: ConcurrentConfig,
239    store: H::Store,
240    task: Option<Task>,
241    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
242    metrics: Arc<IndexerMetrics>,
243) -> Service {
244    info!(
245        pipeline = H::NAME,
246        "Starting pipeline with config: {config:#?}",
247    );
248
249    let ConcurrentConfig {
250        committer: committer_config,
251        pruner: pruner_config,
252        fanout,
253        min_eager_rows,
254        max_pending_rows,
255        max_watermark_updates,
256        processor_channel_size,
257        collector_channel_size,
258        committer_channel_size,
259    } = config;
260
261    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
262        initial: 1,
263        min: 1,
264        max: num_cpus::get(),
265        dead_band: None,
266    });
267    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
268    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
269    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
270
271    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
272    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
273
274    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
275    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
276    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
277    //docs::/#buff
278    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
279    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
280    let main_reader_lo = Arc::new(SetOnce::new());
281
282    let handler = Arc::new(handler);
283
284    let s_processor = processor(
285        handler.clone(),
286        checkpoint_rx,
287        processor_tx,
288        metrics.clone(),
289        concurrency,
290    );
291
292    let s_collector = collector::<H>(
293        handler.clone(),
294        committer_config.clone(),
295        collector_rx,
296        collector_tx,
297        main_reader_lo.clone(),
298        metrics.clone(),
299        min_eager_rows,
300        max_pending_rows,
301        max_watermark_updates,
302    );
303
304    let s_committer = committer::<H>(
305        handler.clone(),
306        committer_config.clone(),
307        committer_rx,
308        committer_tx,
309        store.clone(),
310        metrics.clone(),
311    );
312
313    let s_commit_watermark = commit_watermark::<H>(
314        next_checkpoint,
315        committer_config,
316        watermark_rx,
317        store.clone(),
318        task.as_ref().map(|t| t.task.clone()),
319        metrics.clone(),
320    );
321
322    let s_track_reader_lo = track_main_reader_lo::<H>(
323        main_reader_lo.clone(),
324        task.as_ref().map(|t| t.reader_interval),
325        store.clone(),
326    );
327
328    let s_reader_watermark =
329        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
330
331    let s_pruner = pruner(handler, pruner_config, store, metrics);
332
333    s_processor
334        .merge(s_collector)
335        .merge(s_committer)
336        .merge(s_commit_watermark)
337        .attach(s_track_reader_lo)
338        .attach(s_reader_watermark)
339        .attach(s_pruner)
340}
341
342#[cfg(test)]
343mod tests {
344    use std::sync::Arc;
345    use std::time::Duration;
346
347    use prometheus::Registry;
348    use tokio::sync::mpsc;
349    use tokio::time::timeout;
350
351    use crate::FieldCount;
352    use crate::metrics::IndexerMetrics;
353    use crate::mocks::store::MockConnection;
354    use crate::mocks::store::MockStore;
355    use crate::pipeline::Processor;
356    use crate::types::full_checkpoint_content::Checkpoint;
357    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
358
359    use super::*;
360
361    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
362    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
363
364    #[derive(Clone, Debug, FieldCount)]
365    struct TestValue {
366        checkpoint: u64,
367        data: u64,
368    }
369
370    struct DataPipeline;
371
372    #[async_trait]
373    impl Processor for DataPipeline {
374        const NAME: &'static str = "test_handler";
375        type Value = TestValue;
376
377        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
378            let cp_num = checkpoint.summary.sequence_number;
379
380            // Every checkpoint will come with 2 processed values
381            Ok(vec![
382                TestValue {
383                    checkpoint: cp_num,
384                    data: cp_num * 10 + 1,
385                },
386                TestValue {
387                    checkpoint: cp_num,
388                    data: cp_num * 10 + 2,
389                },
390            ])
391        }
392    }
393
394    #[async_trait]
395    impl Handler for DataPipeline {
396        type Store = MockStore;
397        type Batch = Vec<TestValue>;
398
399        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
400        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
401        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
402
403        fn batch(
404            &self,
405            batch: &mut Self::Batch,
406            values: &mut std::vec::IntoIter<Self::Value>,
407        ) -> BatchStatus {
408            // Take all values
409            batch.extend(values);
410            BatchStatus::Pending
411        }
412
413        async fn commit<'a>(
414            &self,
415            batch: &Self::Batch,
416            conn: &mut MockConnection<'a>,
417        ) -> anyhow::Result<usize> {
418            // Group values by checkpoint
419            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
420                std::collections::HashMap::new();
421            for value in batch {
422                grouped
423                    .entry(value.checkpoint)
424                    .or_default()
425                    .push(value.data);
426            }
427
428            // Commit all data at once
429            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
430        }
431
432        async fn prune<'a>(
433            &self,
434            from: u64,
435            to_exclusive: u64,
436            conn: &mut MockConnection<'a>,
437        ) -> anyhow::Result<usize> {
438            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
439        }
440    }
441
442    struct TestSetup {
443        store: MockStore,
444        checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
445        #[allow(unused)]
446        pipeline: Service,
447    }
448
449    impl TestSetup {
450        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
451            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
452            let metrics = IndexerMetrics::new(None, &Registry::default());
453
454            let pipeline = pipeline(
455                DataPipeline,
456                next_checkpoint,
457                config,
458                store.clone(),
459                None,
460                checkpoint_rx,
461                metrics,
462            );
463
464            Self {
465                store,
466                checkpoint_tx,
467                pipeline,
468            }
469        }
470
471        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
472            let checkpoint = Arc::new(
473                TestCheckpointBuilder::new(checkpoint)
474                    .with_epoch(1)
475                    .with_network_total_transactions(checkpoint * 2)
476                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
477                    .build_checkpoint(),
478            );
479            self.checkpoint_tx.send(checkpoint).await?;
480            Ok(())
481        }
482
483        async fn send_checkpoint_with_timeout(
484            &self,
485            checkpoint: u64,
486            timeout_duration: Duration,
487        ) -> anyhow::Result<()> {
488            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
489        }
490
491        async fn send_checkpoint_expect_timeout(
492            &self,
493            checkpoint: u64,
494            timeout_duration: Duration,
495        ) {
496            timeout(timeout_duration, self.send_checkpoint(checkpoint))
497                .await
498                .unwrap_err(); // Panics if send succeeds
499        }
500    }
501
502    #[tokio::test]
503    async fn test_e2e_pipeline() {
504        let config = ConcurrentConfig {
505            pruner: Some(PrunerConfig {
506                interval_ms: 5_000, // Long interval to test states before pruning
507                delay_ms: 100,      // Short delay for faster tests
508                retention: 3,       // Keep only 3 checkpoints
509                ..Default::default()
510            }),
511            ..Default::default()
512        };
513        let store = MockStore::default();
514        let setup = TestSetup::new(config, store, 0).await;
515
516        // Send initial checkpoints
517        for i in 0..3 {
518            setup
519                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
520                .await
521                .unwrap();
522        }
523
524        // Verify all initial data is available (before any pruning)
525        for i in 0..3 {
526            let data = setup
527                .store
528                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
529                .await;
530            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
531        }
532
533        // Add more checkpoints to trigger pruning
534        for i in 3..6 {
535            setup
536                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
537                .await
538                .unwrap();
539        }
540
541        // Verify data is still available BEFORE pruning kicks in
542        // With long pruning interval (5s), we can safely verify data without race conditions
543        for i in 0..6 {
544            let data = setup
545                .store
546                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
547                .await;
548            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
549        }
550
551        // Wait for pruning to occur (5s + delay + processing time)
552        tokio::time::sleep(Duration::from_millis(5_200)).await;
553
554        // Verify pruning has occurred
555        {
556            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
557
558            // Verify recent checkpoints are still available
559            assert!(data.contains_key(&3));
560            assert!(data.contains_key(&4));
561            assert!(data.contains_key(&5));
562
563            // Verify old checkpoints are pruned
564            assert!(!data.contains_key(&0));
565            assert!(!data.contains_key(&1));
566            assert!(!data.contains_key(&2));
567        };
568    }
569
570    #[tokio::test]
571    async fn test_e2e_pipeline_without_pruning() {
572        let config = ConcurrentConfig {
573            pruner: None,
574            ..Default::default()
575        };
576        let store = MockStore::default();
577        let setup = TestSetup::new(config, store, 0).await;
578
579        // Send several checkpoints
580        for i in 0..10 {
581            setup
582                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
583                .await
584                .unwrap();
585        }
586
587        // Wait for all data to be processed and committed
588        let watermark = setup
589            .store
590            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
591            .await;
592
593        // Verify ALL data was processed correctly (no pruning)
594        for i in 0..10 {
595            let data = setup
596                .store
597                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
598                .await;
599            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
600        }
601
602        // Verify watermark progression
603        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
604        assert_eq!(watermark.tx_hi, 18); // 9 * 2
605        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
606
607        // Verify no data was pruned - all 10 checkpoints should still exist
608        let total_checkpoints = {
609            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
610            data.len()
611        };
612        assert_eq!(total_checkpoints, 10);
613    }
614
615    #[tokio::test]
616    async fn test_out_of_order_processing() {
617        let config = ConcurrentConfig::default();
618        let store = MockStore::default();
619        let setup = TestSetup::new(config, store, 0).await;
620
621        // Send checkpoints out of order
622        let checkpoints = vec![2, 0, 4, 1, 3];
623        for cp in checkpoints {
624            setup
625                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
626                .await
627                .unwrap();
628        }
629
630        // Wait for all data to be processed
631        setup
632            .store
633            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
634            .await;
635
636        // Verify all checkpoints were processed correctly despite out-of-order arrival
637        for i in 0..5 {
638            let data = setup
639                .store
640                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
641                .await;
642            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
643        }
644    }
645
646    #[tokio::test]
647    async fn test_watermark_progression_with_gaps() {
648        let config = ConcurrentConfig::default();
649        let store = MockStore::default();
650        let setup = TestSetup::new(config, store, 0).await;
651
652        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
653        for cp in [0, 1, 3, 4] {
654            setup
655                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
656                .await
657                .unwrap();
658        }
659
660        // Wait for processing
661        tokio::time::sleep(Duration::from_secs(1)).await;
662
663        // Watermark should only progress to 1 (can't progress past the gap)
664        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
665        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
666
667        // Now send the missing checkpoint 2
668        setup
669            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
670            .await
671            .unwrap();
672
673        // Now watermark should progress to 4
674        let watermark = setup
675            .store
676            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
677            .await;
678        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
679    }
680
681    // ==================== BACK-PRESSURE TESTING ====================
682
683    #[tokio::test]
684    async fn test_back_pressure_collector_max_pending_rows() {
685        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
686        //
687        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
688        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
689        // │   Input    │    │ (fanout=2) │    │            │    │            │
690        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
691        //                │                 │                 │
692        //              [●●●]           [●●●●●●●]         [●●●●●●]
693        //            buffer: 3         buffer: 7         buffer: 6
694        //
695        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
696
697        let config = ConcurrentConfig {
698            committer: CommitterConfig {
699                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
700                write_concurrency: 1,
701                ..Default::default()
702            },
703            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
704            processor_channel_size: Some(7),
705            collector_channel_size: Some(6),
706            ..Default::default()
707        };
708        let store = MockStore::default();
709        let setup = TestSetup::new(config, store, 0).await;
710
711        // Wait for initial setup
712        tokio::time::sleep(Duration::from_millis(200)).await;
713
714        // Pipeline capacity analysis with collector back pressure:
715        // Configuration: MAX_PENDING_ROWS=4, fanout=2
716        //
717        // Channel and task breakdown:
718        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
719        // - Processor tasks: 2 tasks (fanout=2)
720        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
721        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
722        //
723        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
724
725        // Fill pipeline to capacity - these should all succeed
726        for i in 0..14 {
727            setup
728                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
729                .await
730                .unwrap();
731        }
732
733        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
734        setup
735            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
736            .await;
737
738        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
739        setup
740            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
741            .await
742            .unwrap();
743
744        // Verify data was processed correctly
745        let data = setup
746            .store
747            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
748            .await;
749        assert_eq!(data, vec![1, 2]);
750    }
751
752    #[tokio::test]
753    async fn test_back_pressure_committer_slow_commits() {
754        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
755        //
756        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
757        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
758        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
759        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
760        //                │                 │                 │
761        //              [●●●]           [●●●●●●●]          [●●●●●●]
762        //            buffer: 3    proc_chan: 7       coll_chan: 6
763        //
764        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
765
766        let config = ConcurrentConfig {
767            committer: CommitterConfig {
768                write_concurrency: 1, // Single committer for deterministic blocking
769                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
770                // this test relies on the collect interval tip to force
771                // batch flushing.
772                collect_interval_ms: 10,
773                ..Default::default()
774            },
775            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
776            processor_channel_size: Some(7),
777            collector_channel_size: Some(6),
778            ..Default::default()
779        };
780        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
781        let setup = TestSetup::new(config, store, 0).await;
782
783        // Pipeline capacity analysis with slow commits:
784        // Configuration: fanout=2, write_concurrency=1
785        //
786        // Channel and task breakdown:
787        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
788        // - Processor tasks: 2 tasks (fanout=2)
789        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
790        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
791        // - Committer task: 1 task (blocked by slow commit)
792        //
793        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
794
795        // Fill pipeline to theoretical capacity - these should all succeed
796        for i in 0..19 {
797            setup
798                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
799                .await
800                .unwrap();
801        }
802
803        // Find the actual back pressure point
804        // Due to non-determinism in collector's tokio::select!, the collector may consume
805        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
806        // This means back pressure occurs somewhere in range 19-21.
807        let mut back_pressure_checkpoint = None;
808        for checkpoint in 19..22 {
809            if setup
810                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
811                .await
812                .is_err()
813            {
814                back_pressure_checkpoint = Some(checkpoint);
815                break;
816            }
817        }
818        assert!(
819            back_pressure_checkpoint.is_some(),
820            "Back pressure should occur between checkpoints 19-21"
821        );
822
823        // Verify that some data has been processed (pipeline is working)
824        setup
825            .store
826            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
827            .await;
828
829        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
830        setup
831            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
832            .await
833            .unwrap();
834    }
835
836    // ==================== FAILURE TESTING ====================
837
838    #[tokio::test]
839    async fn test_commit_failure_retry() {
840        let config = ConcurrentConfig::default();
841        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
842        let setup = TestSetup::new(config, store, 0).await;
843
844        // Send a checkpoint
845        setup
846            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
847            .await
848            .unwrap();
849
850        // Should eventually succeed despite initial commit failures
851        setup
852            .store
853            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
854            .await;
855
856        // Verify data was eventually committed
857        let data = setup
858            .store
859            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
860            .await;
861        assert_eq!(data, vec![1, 2]);
862    }
863
864    #[tokio::test]
865    async fn test_prune_failure_retry() {
866        let config = ConcurrentConfig {
867            pruner: Some(PrunerConfig {
868                interval_ms: 2000, // 2 seconds interval for testing
869                delay_ms: 100,     // Short delay
870                retention: 2,      // Keep only 2 checkpoints
871                ..Default::default()
872            }),
873            ..Default::default()
874        };
875
876        // Configure prune failures for range [0, 2) - fail twice then succeed
877        let store = MockStore::default().with_prune_failures(0, 2, 1);
878        let setup = TestSetup::new(config, store, 0).await;
879
880        // Send enough checkpoints to trigger pruning
881        for i in 0..4 {
882            setup
883                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
884                .await
885                .unwrap();
886        }
887
888        // Verify data is still available BEFORE pruning kicks in
889        // With long pruning interval (5s), we can safely verify data without race conditions
890        for i in 0..4 {
891            let data = setup
892                .store
893                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
894                .await;
895            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
896        }
897
898        // Wait for first pruning attempt (should fail) and verify no data has been pruned
899        setup
900            .store
901            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
902            .await;
903        {
904            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
905            for i in 0..4 {
906                assert!(data.contains_key(&i));
907            }
908        };
909
910        // Wait for second pruning attempt (should succeed)
911        setup
912            .store
913            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
914            .await;
915        {
916            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
917            // Verify recent checkpoints are still available
918            assert!(data.contains_key(&2));
919            assert!(data.contains_key(&3));
920
921            // Verify old checkpoints are pruned
922            assert!(!data.contains_key(&0));
923            assert!(!data.contains_key(&1));
924        };
925    }
926
927    #[tokio::test]
928    async fn test_reader_watermark_failure_retry() {
929        let config = ConcurrentConfig {
930            pruner: Some(PrunerConfig {
931                interval_ms: 100, // Fast interval for testing
932                delay_ms: 100,    // Short delay
933                retention: 3,     // Keep 3 checkpoints
934                ..Default::default()
935            }),
936            ..Default::default()
937        };
938
939        // Configure reader watermark failures - fail 2 times then succeed
940        let store = MockStore::default().with_reader_watermark_failures(2);
941        let setup = TestSetup::new(config, store, 0).await;
942
943        // Send checkpoints to trigger reader watermark updates
944        for i in 0..6 {
945            setup
946                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
947                .await
948                .unwrap();
949        }
950
951        // Wait for processing to complete
952        setup
953            .store
954            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
955            .await;
956
957        // Wait for reader watermark task to attempt updates (with failures and retries)
958        tokio::time::sleep(Duration::from_secs(2)).await;
959
960        // Verify that reader watermark was eventually updated despite failures
961        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
962        assert_eq!(watermark.reader_lo, 3);
963    }
964
965    #[tokio::test]
966    async fn test_database_connection_failure_retry() {
967        let config = ConcurrentConfig::default();
968        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
969        let setup = TestSetup::new(config, store, 0).await;
970
971        // Send a checkpoint
972        setup
973            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
974            .await
975            .unwrap();
976
977        // Should eventually succeed despite initial failures
978        setup
979            .store
980            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
981            .await;
982
983        // Verify data was eventually committed
984        let data = setup
985            .store
986            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
987            .await;
988        assert_eq!(data, vec![1, 2]);
989    }
990}