sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::ConcurrentStore;
30use crate::store::Store;
31
32mod collector;
33mod commit_watermark;
34mod committer;
35mod main_reader_lo;
36mod pruner;
37mod reader_watermark;
38
39/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum BatchStatus {
42    /// The batch can accept more values.
43    Pending,
44    /// The batch is full and should be committed.
45    Ready,
46}
47
48/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
49/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
50///
51/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
52/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
53/// usage, but each handle may choose to override these defaults, e.g.
54///
55/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
56///   sizes).
57/// - Handlers that do more work during processing may wish to increase their fanout so more of it
58///   can be done concurrently, to preserve throughput.
59///
60/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
61/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
62/// checkpoint below which all data has been committed.
63///
64/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
65/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
66/// back to the ingestion service.
67#[async_trait]
68pub trait Handler: Processor {
69    type Store: ConcurrentStore;
70    type Batch: Default + Send + Sync + 'static;
71
72    /// If at least this many rows are pending, the committer will commit them eagerly.
73    const MIN_EAGER_ROWS: usize = 50;
74
75    /// If there are more than this many rows pending, the committer applies backpressure.
76    const MAX_PENDING_ROWS: usize = 5000;
77
78    /// The maximum number of watermarks that can show up in a single batch.
79    /// This limit exists to deal with pipelines that produce no data for a majority of
80    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
81    const MAX_WATERMARK_UPDATES: usize = 10_000;
82
83    /// Add values from the iterator to the batch. The implementation may take all, some, or none
84    /// of the values from the iterator by calling `.next()`.
85    ///
86    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
87    /// or `BatchStatus::Pending` if the batch can accept more values.
88    ///
89    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
90    /// may also decide to commit a batch based on the trait parameters above.
91    fn batch(
92        &self,
93        batch: &mut Self::Batch,
94        values: &mut std::vec::IntoIter<Self::Value>,
95    ) -> BatchStatus;
96
97    /// Commit the batch to the database, returning the number of rows affected.
98    async fn commit<'a>(
99        &self,
100        batch: &Self::Batch,
101        conn: &mut <Self::Store as Store>::Connection<'a>,
102    ) -> anyhow::Result<usize>;
103
104    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
105    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
106    async fn prune<'a>(
107        &self,
108        _from: u64,
109        _to_exclusive: u64,
110        _conn: &mut <Self::Store as Store>::Connection<'a>,
111    ) -> anyhow::Result<usize> {
112        Ok(0)
113    }
114}
115
116/// Configuration for a concurrent pipeline
117#[derive(Serialize, Deserialize, Debug, Clone, Default)]
118pub struct ConcurrentConfig {
119    /// Configuration for the writer, that makes forward progress.
120    pub committer: CommitterConfig,
121
122    /// Configuration for the pruner, that deletes old data.
123    pub pruner: Option<PrunerConfig>,
124
125    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
126    pub fanout: Option<ConcurrencyConfig>,
127
128    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
129    pub min_eager_rows: Option<usize>,
130
131    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
132    pub max_pending_rows: Option<usize>,
133
134    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
135    pub max_watermark_updates: Option<usize>,
136
137    /// Size of the channel between the processor and collector.
138    pub processor_channel_size: Option<usize>,
139
140    /// Size of the channel between the collector and committer.
141    pub collector_channel_size: Option<usize>,
142
143    /// Size of the channel between the committer and the watermark updater.
144    pub committer_channel_size: Option<usize>,
145}
146
147#[derive(Serialize, Deserialize, Debug, Clone)]
148pub struct PrunerConfig {
149    /// How often the pruner should check whether there is any data to prune, in milliseconds.
150    pub interval_ms: u64,
151
152    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
153    /// this new watermark, in milliseconds.
154    pub delay_ms: u64,
155
156    /// How much data to keep, this is measured in checkpoints.
157    pub retention: u64,
158
159    /// The maximum range to try and prune in one request, measured in checkpoints.
160    pub max_chunk_size: u64,
161
162    /// The max number of tasks to run in parallel for pruning.
163    pub prune_concurrency: u64,
164}
165
166/// Values ready to be written to the database. This is an internal type used to communicate
167/// between the collector and the committer parts of the pipeline.
168///
169/// Values inside each batch may or may not be from the same checkpoint. Values in the same
170/// checkpoint can also be split across multiple batches.
171struct BatchedRows<H: Handler> {
172    /// The batch to write
173    batch: H::Batch,
174    /// Number of rows in the batch
175    batch_len: usize,
176    /// Proportions of all the watermarks that are represented in this chunk
177    watermark: Vec<WatermarkPart>,
178}
179
180impl<H, V> BatchedRows<H>
181where
182    H: Handler<Batch = Vec<V>, Value = V>,
183{
184    #[cfg(test)]
185    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
186        let batch_len = batch.len();
187        Self {
188            batch,
189            batch_len,
190            watermark,
191        }
192    }
193}
194
195impl PrunerConfig {
196    pub fn interval(&self) -> Duration {
197        Duration::from_millis(self.interval_ms)
198    }
199
200    pub fn delay(&self) -> Duration {
201        Duration::from_millis(self.delay_ms)
202    }
203}
204
205impl Default for PrunerConfig {
206    fn default() -> Self {
207        Self {
208            interval_ms: 300_000,
209            delay_ms: 120_000,
210            retention: 4_000_000,
211            max_chunk_size: 2_000,
212            prune_concurrency: 1,
213        }
214    }
215}
216
217/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
218/// strictly after the `watermark` (or from the beginning if no watermark was provided).
219///
220/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
221/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
222/// the database, a committer which writes the rows out concurrently, and a watermark task to
223/// update the high watermark.
224///
225/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
226/// either because it received the checkpoints out-of-order or because of variance in processing
227/// time.
228///
229/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
230/// watermark below which all data has been committed (modulo pruning).
231///
232/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
233/// channels are created to communicate between its various components. The pipeline will shutdown
234/// if any of its input or output channels close, any of its independent tasks fail, or if it is
235/// signalled to shutdown through the returned service handle.
236pub(crate) fn pipeline<H: Handler>(
237    handler: H,
238    next_checkpoint: u64,
239    config: ConcurrentConfig,
240    store: H::Store,
241    task: Option<Task>,
242    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
243    metrics: Arc<IndexerMetrics>,
244) -> Service {
245    info!(
246        pipeline = H::NAME,
247        "Starting pipeline with config: {config:#?}",
248    );
249
250    let ConcurrentConfig {
251        committer: committer_config,
252        pruner: pruner_config,
253        fanout,
254        min_eager_rows,
255        max_pending_rows,
256        max_watermark_updates,
257        processor_channel_size,
258        collector_channel_size,
259        committer_channel_size,
260    } = config;
261
262    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
263        initial: 1,
264        min: 1,
265        max: num_cpus::get(),
266        dead_band: None,
267    });
268    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
269    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
270    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
271
272    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
273    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
274
275    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
276    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
277    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
278    //docs::/#buff
279    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
280    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
281    let main_reader_lo = Arc::new(SetOnce::new());
282
283    let handler = Arc::new(handler);
284
285    let s_processor = processor(
286        handler.clone(),
287        checkpoint_rx,
288        processor_tx,
289        metrics.clone(),
290        concurrency,
291        store.clone(),
292    );
293
294    let s_collector = collector::<H>(
295        handler.clone(),
296        committer_config.clone(),
297        collector_rx,
298        collector_tx,
299        main_reader_lo.clone(),
300        metrics.clone(),
301        min_eager_rows,
302        max_pending_rows,
303        max_watermark_updates,
304    );
305
306    let s_committer = committer::<H>(
307        handler.clone(),
308        committer_config.clone(),
309        committer_rx,
310        committer_tx,
311        store.clone(),
312        metrics.clone(),
313    );
314
315    let s_commit_watermark = commit_watermark::<H>(
316        next_checkpoint,
317        committer_config,
318        watermark_rx,
319        store.clone(),
320        task.as_ref().map(|t| t.task.clone()),
321        metrics.clone(),
322    );
323
324    let s_track_reader_lo = track_main_reader_lo::<H>(
325        main_reader_lo.clone(),
326        task.as_ref().map(|t| t.reader_interval),
327        store.clone(),
328    );
329
330    let s_reader_watermark =
331        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
332
333    let s_pruner = pruner(handler, pruner_config, store, metrics);
334
335    s_processor
336        .merge(s_collector)
337        .merge(s_committer)
338        .merge(s_commit_watermark)
339        .attach(s_track_reader_lo)
340        .attach(s_reader_watermark)
341        .attach(s_pruner)
342}
343
344#[cfg(test)]
345mod tests {
346    use std::sync::Arc;
347    use std::time::Duration;
348
349    use prometheus::Registry;
350    use sui_types::digests::CheckpointDigest;
351    use tokio::sync::mpsc;
352    use tokio::time::timeout;
353
354    use crate::FieldCount;
355    use crate::metrics::IndexerMetrics;
356    use crate::mocks::store::MockConnection;
357    use crate::mocks::store::MockStore;
358    use crate::pipeline::Processor;
359    use crate::types::full_checkpoint_content::Checkpoint;
360    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
361
362    use super::*;
363
364    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
365    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
366
367    #[derive(Clone, Debug, FieldCount)]
368    struct TestValue {
369        checkpoint: u64,
370        data: u64,
371    }
372
373    struct DataPipeline;
374
375    #[async_trait]
376    impl Processor for DataPipeline {
377        const NAME: &'static str = "test_handler";
378        type Value = TestValue;
379
380        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
381            let cp_num = checkpoint.summary.sequence_number;
382
383            // Every checkpoint will come with 2 processed values
384            Ok(vec![
385                TestValue {
386                    checkpoint: cp_num,
387                    data: cp_num * 10 + 1,
388                },
389                TestValue {
390                    checkpoint: cp_num,
391                    data: cp_num * 10 + 2,
392                },
393            ])
394        }
395    }
396
397    #[async_trait]
398    impl Handler for DataPipeline {
399        type Store = MockStore;
400        type Batch = Vec<TestValue>;
401
402        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
403        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
404        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
405
406        fn batch(
407            &self,
408            batch: &mut Self::Batch,
409            values: &mut std::vec::IntoIter<Self::Value>,
410        ) -> BatchStatus {
411            // Take all values
412            batch.extend(values);
413            BatchStatus::Pending
414        }
415
416        async fn commit<'a>(
417            &self,
418            batch: &Self::Batch,
419            conn: &mut MockConnection<'a>,
420        ) -> anyhow::Result<usize> {
421            // Group values by checkpoint
422            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
423                std::collections::HashMap::new();
424            for value in batch {
425                grouped
426                    .entry(value.checkpoint)
427                    .or_default()
428                    .push(value.data);
429            }
430
431            // Commit all data at once
432            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
433        }
434
435        async fn prune<'a>(
436            &self,
437            from: u64,
438            to_exclusive: u64,
439            conn: &mut MockConnection<'a>,
440        ) -> anyhow::Result<usize> {
441            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
442        }
443    }
444
445    struct TestSetup {
446        store: MockStore,
447        checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
448        #[allow(unused)]
449        pipeline: Service,
450    }
451
452    impl TestSetup {
453        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
454            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
455            let metrics = IndexerMetrics::new(None, &Registry::default());
456
457            let pipeline = pipeline(
458                DataPipeline,
459                next_checkpoint,
460                config,
461                store.clone(),
462                None,
463                checkpoint_rx,
464                metrics,
465            );
466
467            Self {
468                store,
469                checkpoint_tx,
470                pipeline,
471            }
472        }
473
474        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
475            let checkpoint_envelope = Arc::new(CheckpointEnvelope {
476                checkpoint: Arc::new(
477                    TestCheckpointBuilder::new(checkpoint)
478                        .with_epoch(1)
479                        .with_network_total_transactions(checkpoint * 2)
480                        .with_timestamp_ms(1000000000 + checkpoint * 1000)
481                        .build_checkpoint(),
482                ),
483                chain_id: CheckpointDigest::new([1; 32]).into(),
484            });
485            self.checkpoint_tx.send(checkpoint_envelope).await?;
486            Ok(())
487        }
488
489        async fn send_checkpoint_with_timeout(
490            &self,
491            checkpoint: u64,
492            timeout_duration: Duration,
493        ) -> anyhow::Result<()> {
494            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
495        }
496
497        async fn send_checkpoint_expect_timeout(
498            &self,
499            checkpoint: u64,
500            timeout_duration: Duration,
501        ) {
502            timeout(timeout_duration, self.send_checkpoint(checkpoint))
503                .await
504                .unwrap_err(); // Panics if send succeeds
505        }
506    }
507
508    #[tokio::test]
509    async fn test_e2e_pipeline() {
510        let config = ConcurrentConfig {
511            pruner: Some(PrunerConfig {
512                interval_ms: 5_000, // Long interval to test states before pruning
513                delay_ms: 100,      // Short delay for faster tests
514                retention: 3,       // Keep only 3 checkpoints
515                ..Default::default()
516            }),
517            ..Default::default()
518        };
519        let store = MockStore::default();
520        let setup = TestSetup::new(config, store, 0).await;
521
522        // Send initial checkpoints
523        for i in 0..3 {
524            setup
525                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
526                .await
527                .unwrap();
528        }
529
530        // Verify all initial data is available (before any pruning)
531        for i in 0..3 {
532            let data = setup
533                .store
534                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
535                .await;
536            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
537        }
538
539        // Add more checkpoints to trigger pruning
540        for i in 3..6 {
541            setup
542                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
543                .await
544                .unwrap();
545        }
546
547        // Verify data is still available BEFORE pruning kicks in
548        // With long pruning interval (5s), we can safely verify data without race conditions
549        for i in 0..6 {
550            let data = setup
551                .store
552                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
553                .await;
554            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
555        }
556
557        // Wait for pruning to occur (5s + delay + processing time)
558        tokio::time::sleep(Duration::from_millis(5_200)).await;
559
560        // Verify pruning has occurred
561        {
562            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
563
564            // Verify recent checkpoints are still available
565            assert!(data.contains_key(&3));
566            assert!(data.contains_key(&4));
567            assert!(data.contains_key(&5));
568
569            // Verify old checkpoints are pruned
570            assert!(!data.contains_key(&0));
571            assert!(!data.contains_key(&1));
572            assert!(!data.contains_key(&2));
573        };
574    }
575
576    #[tokio::test]
577    async fn test_e2e_pipeline_without_pruning() {
578        let config = ConcurrentConfig {
579            pruner: None,
580            ..Default::default()
581        };
582        let store = MockStore::default();
583        let setup = TestSetup::new(config, store, 0).await;
584
585        // Send several checkpoints
586        for i in 0..10 {
587            setup
588                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
589                .await
590                .unwrap();
591        }
592
593        // Wait for all data to be processed and committed
594        let watermark = setup
595            .store
596            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
597            .await;
598
599        // Verify ALL data was processed correctly (no pruning)
600        for i in 0..10 {
601            let data = setup
602                .store
603                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
604                .await;
605            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
606        }
607
608        // Verify watermark progression
609        assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
610        assert_eq!(watermark.tx_hi, 18); // 9 * 2
611        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
612
613        // Verify no data was pruned - all 10 checkpoints should still exist
614        let total_checkpoints = {
615            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
616            data.len()
617        };
618        assert_eq!(total_checkpoints, 10);
619    }
620
621    #[tokio::test]
622    async fn test_out_of_order_processing() {
623        let config = ConcurrentConfig::default();
624        let store = MockStore::default();
625        let setup = TestSetup::new(config, store, 0).await;
626
627        // Send checkpoints out of order
628        let checkpoints = vec![2, 0, 4, 1, 3];
629        for cp in checkpoints {
630            setup
631                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
632                .await
633                .unwrap();
634        }
635
636        // Wait for all data to be processed
637        setup
638            .store
639            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
640            .await;
641
642        // Verify all checkpoints were processed correctly despite out-of-order arrival
643        for i in 0..5 {
644            let data = setup
645                .store
646                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
647                .await;
648            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
649        }
650    }
651
652    #[tokio::test]
653    async fn test_watermark_progression_with_gaps() {
654        let config = ConcurrentConfig::default();
655        let store = MockStore::default();
656        let setup = TestSetup::new(config, store, 0).await;
657
658        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
659        for cp in [0, 1, 3, 4] {
660            setup
661                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
662                .await
663                .unwrap();
664        }
665
666        // Wait for processing
667        tokio::time::sleep(Duration::from_secs(1)).await;
668
669        // Watermark should only progress to 1 (can't progress past the gap)
670        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
671        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
672
673        // Now send the missing checkpoint 2
674        setup
675            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
676            .await
677            .unwrap();
678
679        // Now watermark should progress to 4
680        let watermark = setup
681            .store
682            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
683            .await;
684        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
685    }
686
687    // ==================== BACK-PRESSURE TESTING ====================
688
689    #[tokio::test]
690    async fn test_back_pressure_collector_max_pending_rows() {
691        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
692        //
693        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
694        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
695        // │   Input    │    │ (fanout=2) │    │            │    │            │
696        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
697        //                │                 │                 │
698        //              [●●●]           [●●●●●●●]         [●●●●●●]
699        //            buffer: 3         buffer: 7         buffer: 6
700        //
701        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
702
703        let config = ConcurrentConfig {
704            committer: CommitterConfig {
705                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
706                write_concurrency: 1,
707                ..Default::default()
708            },
709            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
710            processor_channel_size: Some(7),
711            collector_channel_size: Some(6),
712            ..Default::default()
713        };
714        let store = MockStore::default();
715        let setup = TestSetup::new(config, store, 0).await;
716
717        // Wait for initial setup
718        tokio::time::sleep(Duration::from_millis(200)).await;
719
720        // Pipeline capacity analysis with collector back pressure:
721        // Configuration: MAX_PENDING_ROWS=4, fanout=2
722        //
723        // Channel and task breakdown:
724        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
725        // - Processor tasks: 2 tasks (fanout=2)
726        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
727        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
728        //
729        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
730
731        // Fill pipeline to capacity - these should all succeed
732        for i in 0..14 {
733            setup
734                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
735                .await
736                .unwrap();
737        }
738
739        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
740        setup
741            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
742            .await;
743
744        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
745        setup
746            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
747            .await
748            .unwrap();
749
750        // Verify data was processed correctly
751        let data = setup
752            .store
753            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
754            .await;
755        assert_eq!(data, vec![1, 2]);
756    }
757
758    #[tokio::test]
759    async fn test_back_pressure_committer_slow_commits() {
760        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
761        //
762        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
763        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
764        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
765        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
766        //                │                 │                 │
767        //              [●●●]           [●●●●●●●]          [●●●●●●]
768        //            buffer: 3    proc_chan: 7       coll_chan: 6
769        //
770        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
771
772        let config = ConcurrentConfig {
773            committer: CommitterConfig {
774                write_concurrency: 1, // Single committer for deterministic blocking
775                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
776                // this test relies on the collect interval tip to force
777                // batch flushing.
778                collect_interval_ms: 10,
779                ..Default::default()
780            },
781            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
782            processor_channel_size: Some(7),
783            collector_channel_size: Some(6),
784            ..Default::default()
785        };
786        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
787        let setup = TestSetup::new(config, store, 0).await;
788
789        // Pipeline capacity analysis with slow commits:
790        // Configuration: fanout=2, write_concurrency=1
791        //
792        // Channel and task breakdown:
793        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
794        // - Processor tasks: 2 tasks (fanout=2)
795        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
796        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
797        // - Committer task: 1 task (blocked by slow commit)
798        //
799        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
800
801        // Fill pipeline to theoretical capacity - these should all succeed
802        for i in 0..19 {
803            setup
804                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
805                .await
806                .unwrap();
807        }
808
809        // Find the actual back pressure point
810        // Due to non-determinism in collector's tokio::select!, the collector may consume
811        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
812        // This means back pressure occurs somewhere in range 19-21.
813        let mut back_pressure_checkpoint = None;
814        for checkpoint in 19..22 {
815            if setup
816                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
817                .await
818                .is_err()
819            {
820                back_pressure_checkpoint = Some(checkpoint);
821                break;
822            }
823        }
824        assert!(
825            back_pressure_checkpoint.is_some(),
826            "Back pressure should occur between checkpoints 19-21"
827        );
828
829        // Verify that some data has been processed (pipeline is working)
830        setup
831            .store
832            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
833            .await;
834
835        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
836        setup
837            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
838            .await
839            .unwrap();
840    }
841
842    // ==================== FAILURE TESTING ====================
843
844    #[tokio::test]
845    async fn test_commit_failure_retry() {
846        let config = ConcurrentConfig::default();
847        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
848        let setup = TestSetup::new(config, store, 0).await;
849
850        // Send a checkpoint
851        setup
852            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
853            .await
854            .unwrap();
855
856        // Should eventually succeed despite initial commit failures
857        setup
858            .store
859            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
860            .await;
861
862        // Verify data was eventually committed
863        let data = setup
864            .store
865            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
866            .await;
867        assert_eq!(data, vec![1, 2]);
868    }
869
870    #[tokio::test]
871    async fn test_prune_failure_retry() {
872        let config = ConcurrentConfig {
873            pruner: Some(PrunerConfig {
874                interval_ms: 2000, // 2 seconds interval for testing
875                delay_ms: 100,     // Short delay
876                retention: 2,      // Keep only 2 checkpoints
877                ..Default::default()
878            }),
879            ..Default::default()
880        };
881
882        // Configure prune failures for range [0, 2) - fail twice then succeed
883        let store = MockStore::default().with_prune_failures(0, 2, 1);
884        let setup = TestSetup::new(config, store, 0).await;
885
886        // Send enough checkpoints to trigger pruning
887        for i in 0..4 {
888            setup
889                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
890                .await
891                .unwrap();
892        }
893
894        // Verify data is still available BEFORE pruning kicks in
895        // With long pruning interval (5s), we can safely verify data without race conditions
896        for i in 0..4 {
897            let data = setup
898                .store
899                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
900                .await;
901            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
902        }
903
904        // Wait for first pruning attempt (should fail) and verify no data has been pruned
905        setup
906            .store
907            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
908            .await;
909        {
910            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
911            for i in 0..4 {
912                assert!(data.contains_key(&i));
913            }
914        };
915
916        // Wait for second pruning attempt (should succeed)
917        setup
918            .store
919            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
920            .await;
921        {
922            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
923            // Verify recent checkpoints are still available
924            assert!(data.contains_key(&2));
925            assert!(data.contains_key(&3));
926
927            // Verify old checkpoints are pruned
928            assert!(!data.contains_key(&0));
929            assert!(!data.contains_key(&1));
930        };
931    }
932
933    #[tokio::test]
934    async fn test_reader_watermark_failure_retry() {
935        let config = ConcurrentConfig {
936            pruner: Some(PrunerConfig {
937                interval_ms: 100, // Fast interval for testing
938                delay_ms: 100,    // Short delay
939                retention: 3,     // Keep 3 checkpoints
940                ..Default::default()
941            }),
942            ..Default::default()
943        };
944
945        // Configure reader watermark failures - fail 2 times then succeed
946        let store = MockStore::default().with_reader_watermark_failures(2);
947        let setup = TestSetup::new(config, store, 0).await;
948
949        // Send checkpoints to trigger reader watermark updates
950        for i in 0..6 {
951            setup
952                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
953                .await
954                .unwrap();
955        }
956
957        // Wait for processing to complete
958        setup
959            .store
960            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
961            .await;
962
963        // Wait for reader watermark task to attempt updates (with failures and retries)
964        tokio::time::sleep(Duration::from_secs(2)).await;
965
966        // Verify that reader watermark was eventually updated despite failures
967        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
968        assert_eq!(watermark.reader_lo, 3);
969    }
970
971    #[tokio::test]
972    async fn test_database_connection_failure_retry() {
973        let config = ConcurrentConfig::default();
974        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
975        let setup = TestSetup::new(config, store, 0).await;
976
977        // Send a checkpoint
978        setup
979            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
980            .await
981            .unwrap();
982
983        // Should eventually succeed despite initial failures
984        setup
985            .store
986            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
987            .await;
988
989        // Verify data was eventually committed
990        let data = setup
991            .store
992            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
993            .await;
994        assert_eq!(data, vec![1, 2]);
995    }
996}