sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::Store;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41    /// The batch can accept more values.
42    Pending,
43    /// The batch is full and should be committed.
44    Ready,
45}
46
47/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
48/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
49///
50/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
51/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
52/// usage, but each handle may choose to override these defaults, e.g.
53///
54/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
55///   sizes).
56/// - Handlers that do more work during processing may wish to increase their fanout so more of it
57///   can be done concurrently, to preserve throughput.
58///
59/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
60/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
61/// checkpoint below which all data has been committed.
62///
63/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
64/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
65/// back to the ingestion service.
66#[async_trait]
67pub trait Handler: Processor {
68    type Store: Store;
69    type Batch: Default + Send + Sync + 'static;
70
71    /// If at least this many rows are pending, the committer will commit them eagerly.
72    const MIN_EAGER_ROWS: usize = 50;
73
74    /// If there are more than this many rows pending, the committer applies backpressure.
75    const MAX_PENDING_ROWS: usize = 5000;
76
77    /// The maximum number of watermarks that can show up in a single batch.
78    /// This limit exists to deal with pipelines that produce no data for a majority of
79    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
80    const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82    /// Add values from the iterator to the batch. The implementation may take all, some, or none
83    /// of the values from the iterator by calling `.next()`.
84    ///
85    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
86    /// or `BatchStatus::Pending` if the batch can accept more values.
87    ///
88    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
89    /// may also decide to commit a batch based on the trait parameters above.
90    fn batch(
91        &self,
92        batch: &mut Self::Batch,
93        values: &mut std::vec::IntoIter<Self::Value>,
94    ) -> BatchStatus;
95
96    /// Commit the batch to the database, returning the number of rows affected.
97    async fn commit<'a>(
98        &self,
99        batch: &Self::Batch,
100        conn: &mut <Self::Store as Store>::Connection<'a>,
101    ) -> anyhow::Result<usize>;
102
103    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
104    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
105    async fn prune<'a>(
106        &self,
107        _from: u64,
108        _to_exclusive: u64,
109        _conn: &mut <Self::Store as Store>::Connection<'a>,
110    ) -> anyhow::Result<usize> {
111        Ok(0)
112    }
113}
114
115/// Configuration for a concurrent pipeline
116#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118    /// Configuration for the writer, that makes forward progress.
119    pub committer: CommitterConfig,
120
121    /// Configuration for the pruner, that deletes old data.
122    pub pruner: Option<PrunerConfig>,
123
124    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
125    pub fanout: Option<ConcurrencyConfig>,
126
127    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
128    pub min_eager_rows: Option<usize>,
129
130    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
131    pub max_pending_rows: Option<usize>,
132
133    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
134    pub max_watermark_updates: Option<usize>,
135
136    /// Size of the channel between the processor and collector.
137    pub processor_channel_size: Option<usize>,
138
139    /// Size of the channel between the collector and committer.
140    pub collector_channel_size: Option<usize>,
141
142    /// Size of the channel between the committer and the watermark updater.
143    pub committer_channel_size: Option<usize>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147pub struct PrunerConfig {
148    /// How often the pruner should check whether there is any data to prune, in milliseconds.
149    pub interval_ms: u64,
150
151    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
152    /// this new watermark, in milliseconds.
153    pub delay_ms: u64,
154
155    /// How much data to keep, this is measured in checkpoints.
156    pub retention: u64,
157
158    /// The maximum range to try and prune in one request, measured in checkpoints.
159    pub max_chunk_size: u64,
160
161    /// The max number of tasks to run in parallel for pruning.
162    pub prune_concurrency: u64,
163}
164
165/// Values ready to be written to the database. This is an internal type used to communicate
166/// between the collector and the committer parts of the pipeline.
167///
168/// Values inside each batch may or may not be from the same checkpoint. Values in the same
169/// checkpoint can also be split across multiple batches.
170struct BatchedRows<H: Handler> {
171    /// The batch to write
172    batch: H::Batch,
173    /// Number of rows in the batch
174    batch_len: usize,
175    /// Proportions of all the watermarks that are represented in this chunk
176    watermark: Vec<WatermarkPart>,
177}
178
179impl<H, V> BatchedRows<H>
180where
181    H: Handler<Batch = Vec<V>, Value = V>,
182{
183    #[cfg(test)]
184    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
185        let batch_len = batch.len();
186        Self {
187            batch,
188            batch_len,
189            watermark,
190        }
191    }
192}
193
194impl PrunerConfig {
195    pub fn interval(&self) -> Duration {
196        Duration::from_millis(self.interval_ms)
197    }
198
199    pub fn delay(&self) -> Duration {
200        Duration::from_millis(self.delay_ms)
201    }
202}
203
204impl Default for PrunerConfig {
205    fn default() -> Self {
206        Self {
207            interval_ms: 300_000,
208            delay_ms: 120_000,
209            retention: 4_000_000,
210            max_chunk_size: 2_000,
211            prune_concurrency: 1,
212        }
213    }
214}
215
216/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
217/// strictly after the `watermark` (or from the beginning if no watermark was provided).
218///
219/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
220/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
221/// the database, a committer which writes the rows out concurrently, and a watermark task to
222/// update the high watermark.
223///
224/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
225/// either because it received the checkpoints out-of-order or because of variance in processing
226/// time.
227///
228/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
229/// watermark below which all data has been committed (modulo pruning).
230///
231/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
232/// channels are created to communicate between its various components. The pipeline will shutdown
233/// if any of its input or output channels close, any of its independent tasks fail, or if it is
234/// signalled to shutdown through the returned service handle.
235pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
236    handler: H,
237    next_checkpoint: u64,
238    config: ConcurrentConfig,
239    store: H::Store,
240    task: Option<Task>,
241    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
242    metrics: Arc<IndexerMetrics>,
243) -> Service {
244    info!(
245        pipeline = H::NAME,
246        "Starting pipeline with config: {config:#?}",
247    );
248
249    let ConcurrentConfig {
250        committer: committer_config,
251        pruner: pruner_config,
252        fanout,
253        min_eager_rows,
254        max_pending_rows,
255        max_watermark_updates,
256        processor_channel_size,
257        collector_channel_size,
258        committer_channel_size,
259    } = config;
260
261    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
262        initial: 1,
263        min: 1,
264        max: num_cpus::get(),
265        dead_band: None,
266    });
267    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
268    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
269    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
270
271    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
272    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
273
274    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
275    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
276    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
277    //docs::/#buff
278    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
279    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
280    let main_reader_lo = Arc::new(SetOnce::new());
281
282    let handler = Arc::new(handler);
283
284    let s_processor = processor(
285        handler.clone(),
286        checkpoint_rx,
287        processor_tx,
288        metrics.clone(),
289        concurrency,
290    );
291
292    let s_collector = collector::<H>(
293        handler.clone(),
294        committer_config.clone(),
295        collector_rx,
296        collector_tx,
297        main_reader_lo.clone(),
298        metrics.clone(),
299        min_eager_rows,
300        max_pending_rows,
301        max_watermark_updates,
302    );
303
304    let s_committer = committer::<H>(
305        handler.clone(),
306        committer_config.clone(),
307        committer_rx,
308        committer_tx,
309        store.clone(),
310        metrics.clone(),
311    );
312
313    let s_commit_watermark = commit_watermark::<H>(
314        next_checkpoint,
315        committer_config,
316        watermark_rx,
317        store.clone(),
318        task.as_ref().map(|t| t.task.clone()),
319        metrics.clone(),
320    );
321
322    let s_track_reader_lo = track_main_reader_lo::<H>(
323        main_reader_lo.clone(),
324        task.as_ref().map(|t| t.reader_interval),
325        store.clone(),
326    );
327
328    let s_reader_watermark =
329        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
330
331    let s_pruner = pruner(handler, pruner_config, store, metrics);
332
333    s_processor
334        .merge(s_collector)
335        .merge(s_committer)
336        .merge(s_commit_watermark)
337        .attach(s_track_reader_lo)
338        .attach(s_reader_watermark)
339        .attach(s_pruner)
340}
341
342#[cfg(test)]
343mod tests {
344    use std::sync::Arc;
345    use std::time::Duration;
346
347    use prometheus::Registry;
348    use sui_types::digests::CheckpointDigest;
349    use tokio::sync::mpsc;
350    use tokio::time::timeout;
351
352    use crate::FieldCount;
353    use crate::metrics::IndexerMetrics;
354    use crate::mocks::store::MockConnection;
355    use crate::mocks::store::MockStore;
356    use crate::pipeline::Processor;
357    use crate::types::full_checkpoint_content::Checkpoint;
358    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
359
360    use super::*;
361
362    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
363    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
364
365    #[derive(Clone, Debug, FieldCount)]
366    struct TestValue {
367        checkpoint: u64,
368        data: u64,
369    }
370
371    struct DataPipeline;
372
373    #[async_trait]
374    impl Processor for DataPipeline {
375        const NAME: &'static str = "test_handler";
376        type Value = TestValue;
377
378        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
379            let cp_num = checkpoint.summary.sequence_number;
380
381            // Every checkpoint will come with 2 processed values
382            Ok(vec![
383                TestValue {
384                    checkpoint: cp_num,
385                    data: cp_num * 10 + 1,
386                },
387                TestValue {
388                    checkpoint: cp_num,
389                    data: cp_num * 10 + 2,
390                },
391            ])
392        }
393    }
394
395    #[async_trait]
396    impl Handler for DataPipeline {
397        type Store = MockStore;
398        type Batch = Vec<TestValue>;
399
400        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
401        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
402        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
403
404        fn batch(
405            &self,
406            batch: &mut Self::Batch,
407            values: &mut std::vec::IntoIter<Self::Value>,
408        ) -> BatchStatus {
409            // Take all values
410            batch.extend(values);
411            BatchStatus::Pending
412        }
413
414        async fn commit<'a>(
415            &self,
416            batch: &Self::Batch,
417            conn: &mut MockConnection<'a>,
418        ) -> anyhow::Result<usize> {
419            // Group values by checkpoint
420            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
421                std::collections::HashMap::new();
422            for value in batch {
423                grouped
424                    .entry(value.checkpoint)
425                    .or_default()
426                    .push(value.data);
427            }
428
429            // Commit all data at once
430            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
431        }
432
433        async fn prune<'a>(
434            &self,
435            from: u64,
436            to_exclusive: u64,
437            conn: &mut MockConnection<'a>,
438        ) -> anyhow::Result<usize> {
439            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
440        }
441    }
442
443    struct TestSetup {
444        store: MockStore,
445        checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
446        #[allow(unused)]
447        pipeline: Service,
448    }
449
450    impl TestSetup {
451        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
452            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
453            let metrics = IndexerMetrics::new(None, &Registry::default());
454
455            let pipeline = pipeline(
456                DataPipeline,
457                next_checkpoint,
458                config,
459                store.clone(),
460                None,
461                checkpoint_rx,
462                metrics,
463            );
464
465            Self {
466                store,
467                checkpoint_tx,
468                pipeline,
469            }
470        }
471
472        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
473            let checkpoint_envelope = Arc::new(CheckpointEnvelope {
474                checkpoint: Arc::new(
475                    TestCheckpointBuilder::new(checkpoint)
476                        .with_epoch(1)
477                        .with_network_total_transactions(checkpoint * 2)
478                        .with_timestamp_ms(1000000000 + checkpoint * 1000)
479                        .build_checkpoint(),
480                ),
481                chain_id: CheckpointDigest::new([1; 32]).into(),
482            });
483            self.checkpoint_tx.send(checkpoint_envelope).await?;
484            Ok(())
485        }
486
487        async fn send_checkpoint_with_timeout(
488            &self,
489            checkpoint: u64,
490            timeout_duration: Duration,
491        ) -> anyhow::Result<()> {
492            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
493        }
494
495        async fn send_checkpoint_expect_timeout(
496            &self,
497            checkpoint: u64,
498            timeout_duration: Duration,
499        ) {
500            timeout(timeout_duration, self.send_checkpoint(checkpoint))
501                .await
502                .unwrap_err(); // Panics if send succeeds
503        }
504    }
505
506    #[tokio::test]
507    async fn test_e2e_pipeline() {
508        let config = ConcurrentConfig {
509            pruner: Some(PrunerConfig {
510                interval_ms: 5_000, // Long interval to test states before pruning
511                delay_ms: 100,      // Short delay for faster tests
512                retention: 3,       // Keep only 3 checkpoints
513                ..Default::default()
514            }),
515            ..Default::default()
516        };
517        let store = MockStore::default();
518        let setup = TestSetup::new(config, store, 0).await;
519
520        // Send initial checkpoints
521        for i in 0..3 {
522            setup
523                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
524                .await
525                .unwrap();
526        }
527
528        // Verify all initial data is available (before any pruning)
529        for i in 0..3 {
530            let data = setup
531                .store
532                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
533                .await;
534            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
535        }
536
537        // Add more checkpoints to trigger pruning
538        for i in 3..6 {
539            setup
540                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
541                .await
542                .unwrap();
543        }
544
545        // Verify data is still available BEFORE pruning kicks in
546        // With long pruning interval (5s), we can safely verify data without race conditions
547        for i in 0..6 {
548            let data = setup
549                .store
550                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
551                .await;
552            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
553        }
554
555        // Wait for pruning to occur (5s + delay + processing time)
556        tokio::time::sleep(Duration::from_millis(5_200)).await;
557
558        // Verify pruning has occurred
559        {
560            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
561
562            // Verify recent checkpoints are still available
563            assert!(data.contains_key(&3));
564            assert!(data.contains_key(&4));
565            assert!(data.contains_key(&5));
566
567            // Verify old checkpoints are pruned
568            assert!(!data.contains_key(&0));
569            assert!(!data.contains_key(&1));
570            assert!(!data.contains_key(&2));
571        };
572    }
573
574    #[tokio::test]
575    async fn test_e2e_pipeline_without_pruning() {
576        let config = ConcurrentConfig {
577            pruner: None,
578            ..Default::default()
579        };
580        let store = MockStore::default();
581        let setup = TestSetup::new(config, store, 0).await;
582
583        // Send several checkpoints
584        for i in 0..10 {
585            setup
586                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
587                .await
588                .unwrap();
589        }
590
591        // Wait for all data to be processed and committed
592        let watermark = setup
593            .store
594            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
595            .await;
596
597        // Verify ALL data was processed correctly (no pruning)
598        for i in 0..10 {
599            let data = setup
600                .store
601                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
602                .await;
603            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
604        }
605
606        // Verify watermark progression
607        assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
608        assert_eq!(watermark.tx_hi, 18); // 9 * 2
609        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
610
611        // Verify no data was pruned - all 10 checkpoints should still exist
612        let total_checkpoints = {
613            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
614            data.len()
615        };
616        assert_eq!(total_checkpoints, 10);
617    }
618
619    #[tokio::test]
620    async fn test_out_of_order_processing() {
621        let config = ConcurrentConfig::default();
622        let store = MockStore::default();
623        let setup = TestSetup::new(config, store, 0).await;
624
625        // Send checkpoints out of order
626        let checkpoints = vec![2, 0, 4, 1, 3];
627        for cp in checkpoints {
628            setup
629                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
630                .await
631                .unwrap();
632        }
633
634        // Wait for all data to be processed
635        setup
636            .store
637            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
638            .await;
639
640        // Verify all checkpoints were processed correctly despite out-of-order arrival
641        for i in 0..5 {
642            let data = setup
643                .store
644                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
645                .await;
646            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
647        }
648    }
649
650    #[tokio::test]
651    async fn test_watermark_progression_with_gaps() {
652        let config = ConcurrentConfig::default();
653        let store = MockStore::default();
654        let setup = TestSetup::new(config, store, 0).await;
655
656        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
657        for cp in [0, 1, 3, 4] {
658            setup
659                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
660                .await
661                .unwrap();
662        }
663
664        // Wait for processing
665        tokio::time::sleep(Duration::from_secs(1)).await;
666
667        // Watermark should only progress to 1 (can't progress past the gap)
668        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
669        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
670
671        // Now send the missing checkpoint 2
672        setup
673            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
674            .await
675            .unwrap();
676
677        // Now watermark should progress to 4
678        let watermark = setup
679            .store
680            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
681            .await;
682        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
683    }
684
685    // ==================== BACK-PRESSURE TESTING ====================
686
687    #[tokio::test]
688    async fn test_back_pressure_collector_max_pending_rows() {
689        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
690        //
691        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
692        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
693        // │   Input    │    │ (fanout=2) │    │            │    │            │
694        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
695        //                │                 │                 │
696        //              [●●●]           [●●●●●●●]         [●●●●●●]
697        //            buffer: 3         buffer: 7         buffer: 6
698        //
699        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
700
701        let config = ConcurrentConfig {
702            committer: CommitterConfig {
703                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
704                write_concurrency: 1,
705                ..Default::default()
706            },
707            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
708            processor_channel_size: Some(7),
709            collector_channel_size: Some(6),
710            ..Default::default()
711        };
712        let store = MockStore::default();
713        let setup = TestSetup::new(config, store, 0).await;
714
715        // Wait for initial setup
716        tokio::time::sleep(Duration::from_millis(200)).await;
717
718        // Pipeline capacity analysis with collector back pressure:
719        // Configuration: MAX_PENDING_ROWS=4, fanout=2
720        //
721        // Channel and task breakdown:
722        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
723        // - Processor tasks: 2 tasks (fanout=2)
724        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
725        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
726        //
727        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
728
729        // Fill pipeline to capacity - these should all succeed
730        for i in 0..14 {
731            setup
732                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
733                .await
734                .unwrap();
735        }
736
737        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
738        setup
739            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
740            .await;
741
742        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
743        setup
744            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
745            .await
746            .unwrap();
747
748        // Verify data was processed correctly
749        let data = setup
750            .store
751            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
752            .await;
753        assert_eq!(data, vec![1, 2]);
754    }
755
756    #[tokio::test]
757    async fn test_back_pressure_committer_slow_commits() {
758        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
759        //
760        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
761        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
762        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
763        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
764        //                │                 │                 │
765        //              [●●●]           [●●●●●●●]          [●●●●●●]
766        //            buffer: 3    proc_chan: 7       coll_chan: 6
767        //
768        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
769
770        let config = ConcurrentConfig {
771            committer: CommitterConfig {
772                write_concurrency: 1, // Single committer for deterministic blocking
773                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
774                // this test relies on the collect interval tip to force
775                // batch flushing.
776                collect_interval_ms: 10,
777                ..Default::default()
778            },
779            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
780            processor_channel_size: Some(7),
781            collector_channel_size: Some(6),
782            ..Default::default()
783        };
784        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
785        let setup = TestSetup::new(config, store, 0).await;
786
787        // Pipeline capacity analysis with slow commits:
788        // Configuration: fanout=2, write_concurrency=1
789        //
790        // Channel and task breakdown:
791        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
792        // - Processor tasks: 2 tasks (fanout=2)
793        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
794        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
795        // - Committer task: 1 task (blocked by slow commit)
796        //
797        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
798
799        // Fill pipeline to theoretical capacity - these should all succeed
800        for i in 0..19 {
801            setup
802                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
803                .await
804                .unwrap();
805        }
806
807        // Find the actual back pressure point
808        // Due to non-determinism in collector's tokio::select!, the collector may consume
809        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
810        // This means back pressure occurs somewhere in range 19-21.
811        let mut back_pressure_checkpoint = None;
812        for checkpoint in 19..22 {
813            if setup
814                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
815                .await
816                .is_err()
817            {
818                back_pressure_checkpoint = Some(checkpoint);
819                break;
820            }
821        }
822        assert!(
823            back_pressure_checkpoint.is_some(),
824            "Back pressure should occur between checkpoints 19-21"
825        );
826
827        // Verify that some data has been processed (pipeline is working)
828        setup
829            .store
830            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
831            .await;
832
833        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
834        setup
835            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
836            .await
837            .unwrap();
838    }
839
840    // ==================== FAILURE TESTING ====================
841
842    #[tokio::test]
843    async fn test_commit_failure_retry() {
844        let config = ConcurrentConfig::default();
845        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
846        let setup = TestSetup::new(config, store, 0).await;
847
848        // Send a checkpoint
849        setup
850            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
851            .await
852            .unwrap();
853
854        // Should eventually succeed despite initial commit failures
855        setup
856            .store
857            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
858            .await;
859
860        // Verify data was eventually committed
861        let data = setup
862            .store
863            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
864            .await;
865        assert_eq!(data, vec![1, 2]);
866    }
867
868    #[tokio::test]
869    async fn test_prune_failure_retry() {
870        let config = ConcurrentConfig {
871            pruner: Some(PrunerConfig {
872                interval_ms: 2000, // 2 seconds interval for testing
873                delay_ms: 100,     // Short delay
874                retention: 2,      // Keep only 2 checkpoints
875                ..Default::default()
876            }),
877            ..Default::default()
878        };
879
880        // Configure prune failures for range [0, 2) - fail twice then succeed
881        let store = MockStore::default().with_prune_failures(0, 2, 1);
882        let setup = TestSetup::new(config, store, 0).await;
883
884        // Send enough checkpoints to trigger pruning
885        for i in 0..4 {
886            setup
887                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
888                .await
889                .unwrap();
890        }
891
892        // Verify data is still available BEFORE pruning kicks in
893        // With long pruning interval (5s), we can safely verify data without race conditions
894        for i in 0..4 {
895            let data = setup
896                .store
897                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
898                .await;
899            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
900        }
901
902        // Wait for first pruning attempt (should fail) and verify no data has been pruned
903        setup
904            .store
905            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
906            .await;
907        {
908            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
909            for i in 0..4 {
910                assert!(data.contains_key(&i));
911            }
912        };
913
914        // Wait for second pruning attempt (should succeed)
915        setup
916            .store
917            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
918            .await;
919        {
920            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
921            // Verify recent checkpoints are still available
922            assert!(data.contains_key(&2));
923            assert!(data.contains_key(&3));
924
925            // Verify old checkpoints are pruned
926            assert!(!data.contains_key(&0));
927            assert!(!data.contains_key(&1));
928        };
929    }
930
931    #[tokio::test]
932    async fn test_reader_watermark_failure_retry() {
933        let config = ConcurrentConfig {
934            pruner: Some(PrunerConfig {
935                interval_ms: 100, // Fast interval for testing
936                delay_ms: 100,    // Short delay
937                retention: 3,     // Keep 3 checkpoints
938                ..Default::default()
939            }),
940            ..Default::default()
941        };
942
943        // Configure reader watermark failures - fail 2 times then succeed
944        let store = MockStore::default().with_reader_watermark_failures(2);
945        let setup = TestSetup::new(config, store, 0).await;
946
947        // Send checkpoints to trigger reader watermark updates
948        for i in 0..6 {
949            setup
950                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
951                .await
952                .unwrap();
953        }
954
955        // Wait for processing to complete
956        setup
957            .store
958            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
959            .await;
960
961        // Wait for reader watermark task to attempt updates (with failures and retries)
962        tokio::time::sleep(Duration::from_secs(2)).await;
963
964        // Verify that reader watermark was eventually updated despite failures
965        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
966        assert_eq!(watermark.reader_lo, 3);
967    }
968
969    #[tokio::test]
970    async fn test_database_connection_failure_retry() {
971        let config = ConcurrentConfig::default();
972        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
973        let setup = TestSetup::new(config, store, 0).await;
974
975        // Send a checkpoint
976        setup
977            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
978            .await
979            .unwrap();
980
981        // Should eventually succeed despite initial failures
982        setup
983            .store
984            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
985            .await;
986
987        // Verify data was eventually committed
988        let data = setup
989            .store
990            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
991            .await;
992        assert_eq!(data, vec![1, 2]);
993    }
994}