sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_util::sync::CancellationToken;
10use tracing::info;
11
12use crate::{metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint};
13
14use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
15
16use self::{
17    collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
18    reader_watermark::reader_watermark,
19};
20
21mod collector;
22mod commit_watermark;
23mod committer;
24mod pruner;
25mod reader_watermark;
26
27/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum BatchStatus {
30    /// The batch can accept more values.
31    Pending,
32    /// The batch is full and should be committed.
33    Ready,
34}
35
36/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
37/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
38///
39/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
40/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
41/// usage, but each handle may choose to override these defaults, e.g.
42///
43/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
44///   sizes).
45/// - Handlers that do more work during processing may wish to increase their fanout so more of it
46///   can be done concurrently, to preserve throughput.
47///
48/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
49/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
50/// checkpoint below which all data has been committed.
51///
52/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
53/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
54/// back to the ingestion service.
55#[async_trait]
56pub trait Handler: Processor {
57    type Store: Store;
58    type Batch: Default + Send + Sync + 'static;
59
60    /// If at least this many rows are pending, the committer will commit them eagerly.
61    const MIN_EAGER_ROWS: usize = 50;
62
63    /// If there are more than this many rows pending, the committer applies backpressure.
64    const MAX_PENDING_ROWS: usize = 5000;
65
66    /// The maximum number of watermarks that can show up in a single batch.
67    /// This limit exists to deal with pipelines that produce no data for a majority of
68    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
69    const MAX_WATERMARK_UPDATES: usize = 10_000;
70
71    /// Add values from the iterator to the batch. The implementation may take all, some, or none
72    /// of the values from the iterator by calling `.next()`.
73    ///
74    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
75    /// or `BatchStatus::Pending` if the batch can accept more values.
76    ///
77    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
78    /// may also decide to commit a batch based on the trait parameters above.
79    fn batch(
80        &self,
81        batch: &mut Self::Batch,
82        values: &mut std::vec::IntoIter<Self::Value>,
83    ) -> BatchStatus;
84
85    /// Commit the batch to the database, returning the number of rows affected.
86    async fn commit<'a>(
87        &self,
88        batch: &Self::Batch,
89        conn: &mut <Self::Store as Store>::Connection<'a>,
90    ) -> anyhow::Result<usize>;
91
92    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
93    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
94    async fn prune<'a>(
95        &self,
96        _from: u64,
97        _to_exclusive: u64,
98        _conn: &mut <Self::Store as Store>::Connection<'a>,
99    ) -> anyhow::Result<usize> {
100        Ok(0)
101    }
102}
103
104/// Configuration for a concurrent pipeline
105#[derive(Serialize, Deserialize, Debug, Clone, Default)]
106pub struct ConcurrentConfig {
107    /// Configuration for the writer, that makes forward progress.
108    pub committer: CommitterConfig,
109
110    /// Configuration for the pruner, that deletes old data.
111    pub pruner: Option<PrunerConfig>,
112}
113
114#[derive(Serialize, Deserialize, Debug, Clone)]
115pub struct PrunerConfig {
116    /// How often the pruner should check whether there is any data to prune, in milliseconds.
117    pub interval_ms: u64,
118
119    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
120    /// this new watermark, in milliseconds.
121    pub delay_ms: u64,
122
123    /// How much data to keep, this is measured in checkpoints.
124    pub retention: u64,
125
126    /// The maximum range to try and prune in one request, measured in checkpoints.
127    pub max_chunk_size: u64,
128
129    /// The max number of tasks to run in parallel for pruning.
130    pub prune_concurrency: u64,
131}
132
133/// Values ready to be written to the database. This is an internal type used to communicate
134/// between the collector and the committer parts of the pipeline.
135///
136/// Values inside each batch may or may not be from the same checkpoint. Values in the same
137/// checkpoint can also be split across multiple batches.
138struct BatchedRows<H: Handler> {
139    /// The batch to write
140    batch: H::Batch,
141    /// Number of rows in the batch
142    batch_len: usize,
143    /// Proportions of all the watermarks that are represented in this chunk
144    watermark: Vec<WatermarkPart>,
145}
146
147impl<H, V> BatchedRows<H>
148where
149    H: Handler<Batch = Vec<V>, Value = V>,
150{
151    #[cfg(test)]
152    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
153        let batch_len = batch.len();
154        Self {
155            batch,
156            batch_len,
157            watermark,
158        }
159    }
160}
161
162impl PrunerConfig {
163    pub fn interval(&self) -> Duration {
164        Duration::from_millis(self.interval_ms)
165    }
166
167    pub fn delay(&self) -> Duration {
168        Duration::from_millis(self.delay_ms)
169    }
170}
171
172impl Default for PrunerConfig {
173    fn default() -> Self {
174        Self {
175            interval_ms: 300_000,
176            delay_ms: 120_000,
177            retention: 4_000_000,
178            max_chunk_size: 2_000,
179            prune_concurrency: 1,
180        }
181    }
182}
183
184/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
185/// strictly after the `watermark` (or from the beginning if no watermark was provided).
186///
187/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
188/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
189/// the database, a committer which writes the rows out concurrently, and a watermark task to
190/// update the high watermark.
191///
192/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
193/// either because it received the checkpoints out-of-order or because of variance in processing
194/// time.
195///
196/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
197/// watermark below which all data has been committed (modulo pruning), as long as `skip_watermark`
198/// is not true.
199///
200/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
201/// channels are created to communicate between its various components. The pipeline can be
202/// shutdown using its `cancel` token, and will also shutdown if any of its independent tasks
203/// reports an issue.
204pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
205    handler: H,
206    next_checkpoint: u64,
207    config: ConcurrentConfig,
208    skip_watermark: bool,
209    store: H::Store,
210    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
211    metrics: Arc<IndexerMetrics>,
212    cancel: CancellationToken,
213) -> JoinHandle<()> {
214    info!(
215        pipeline = H::NAME,
216        "Starting pipeline with config: {:?}", config
217    );
218    let ConcurrentConfig {
219        committer: committer_config,
220        pruner: pruner_config,
221    } = config;
222
223    let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
224    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
225    let (collector_tx, committer_rx) =
226        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
227    //docs::/#buff
228    let (committer_tx, watermark_rx) =
229        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
230
231    // The pruner is not connected to the rest of the tasks by channels, so it needs to be
232    // explicitly signalled to shutdown when the other tasks shutdown, in addition to listening to
233    // the global cancel signal. We achieve this by creating a child cancel token that we call
234    // cancel on once the committer tasks have shutdown.
235    let pruner_cancel = cancel.child_token();
236    let handler = Arc::new(handler);
237
238    let processor = processor(
239        handler.clone(),
240        checkpoint_rx,
241        processor_tx,
242        metrics.clone(),
243        cancel.clone(),
244    );
245
246    let collector = collector::<H>(
247        handler.clone(),
248        committer_config.clone(),
249        collector_rx,
250        collector_tx,
251        metrics.clone(),
252        cancel.clone(),
253    );
254
255    let committer = committer::<H>(
256        handler.clone(),
257        committer_config.clone(),
258        skip_watermark,
259        committer_rx,
260        committer_tx,
261        store.clone(),
262        metrics.clone(),
263        cancel.clone(),
264    );
265
266    let commit_watermark = commit_watermark::<H>(
267        next_checkpoint,
268        committer_config,
269        skip_watermark,
270        watermark_rx,
271        store.clone(),
272        metrics.clone(),
273        cancel,
274    );
275
276    let reader_watermark = reader_watermark::<H>(
277        pruner_config.clone(),
278        store.clone(),
279        metrics.clone(),
280        pruner_cancel.clone(),
281    );
282
283    let pruner = pruner(
284        handler,
285        pruner_config,
286        store,
287        metrics,
288        pruner_cancel.clone(),
289    );
290
291    tokio::spawn(async move {
292        let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
293
294        pruner_cancel.cancel();
295        let _ = futures::join!(reader_watermark, pruner);
296    })
297}
298
299#[cfg(test)]
300mod tests {
301    use std::{sync::Arc, time::Duration};
302
303    use prometheus::Registry;
304    use tokio::{sync::mpsc, time::timeout};
305    use tokio_util::sync::CancellationToken;
306
307    use crate::{
308        FieldCount,
309        metrics::IndexerMetrics,
310        mocks::store::{MockConnection, MockStore},
311        pipeline::Processor,
312        types::{
313            full_checkpoint_content::Checkpoint,
314            test_checkpoint_data_builder::TestCheckpointBuilder,
315        },
316    };
317
318    use super::*;
319
320    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
321    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
322
323    #[derive(Clone, Debug, FieldCount)]
324    struct TestValue {
325        checkpoint: u64,
326        data: u64,
327    }
328
329    struct DataPipeline;
330
331    #[async_trait]
332    impl Processor for DataPipeline {
333        const NAME: &'static str = "test_handler";
334        const FANOUT: usize = 2;
335        type Value = TestValue;
336
337        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
338            let cp_num = checkpoint.summary.sequence_number;
339
340            // Every checkpoint will come with 2 processed values
341            Ok(vec![
342                TestValue {
343                    checkpoint: cp_num,
344                    data: cp_num * 10 + 1,
345                },
346                TestValue {
347                    checkpoint: cp_num,
348                    data: cp_num * 10 + 2,
349                },
350            ])
351        }
352    }
353
354    #[async_trait]
355    impl Handler for DataPipeline {
356        type Store = MockStore;
357        type Batch = Vec<TestValue>;
358
359        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
360        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
361        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
362
363        fn batch(
364            &self,
365            batch: &mut Self::Batch,
366            values: &mut std::vec::IntoIter<Self::Value>,
367        ) -> BatchStatus {
368            // Take all values
369            batch.extend(values);
370            BatchStatus::Pending
371        }
372
373        async fn commit<'a>(
374            &self,
375            batch: &Self::Batch,
376            conn: &mut MockConnection<'a>,
377        ) -> anyhow::Result<usize> {
378            // Group values by checkpoint
379            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
380                std::collections::HashMap::new();
381            for value in batch {
382                grouped
383                    .entry(value.checkpoint)
384                    .or_default()
385                    .push(value.data);
386            }
387
388            // Commit all data at once
389            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
390        }
391
392        async fn prune<'a>(
393            &self,
394            from: u64,
395            to_exclusive: u64,
396            conn: &mut MockConnection<'a>,
397        ) -> anyhow::Result<usize> {
398            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
399        }
400    }
401
402    struct TestSetup {
403        store: MockStore,
404        checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
405        pipeline_handle: JoinHandle<()>,
406        cancel: CancellationToken,
407    }
408
409    impl TestSetup {
410        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
411            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
412            let metrics = IndexerMetrics::new(None, &Registry::default());
413            let cancel = CancellationToken::new();
414
415            let skip_watermark = false;
416            let pipeline_handle = pipeline(
417                DataPipeline,
418                next_checkpoint,
419                config,
420                skip_watermark,
421                store.clone(),
422                checkpoint_rx,
423                metrics,
424                cancel.clone(),
425            );
426
427            Self {
428                store,
429                checkpoint_tx,
430                pipeline_handle,
431                cancel,
432            }
433        }
434
435        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
436            let checkpoint = Arc::new(
437                TestCheckpointBuilder::new(checkpoint)
438                    .with_epoch(1)
439                    .with_network_total_transactions(checkpoint * 2)
440                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
441                    .build_checkpoint(),
442            );
443            self.checkpoint_tx.send(checkpoint).await?;
444            Ok(())
445        }
446
447        async fn shutdown(self) {
448            drop(self.checkpoint_tx);
449            self.cancel.cancel();
450            let _ = self.pipeline_handle.await;
451        }
452
453        async fn send_checkpoint_with_timeout(
454            &self,
455            checkpoint: u64,
456            timeout_duration: Duration,
457        ) -> anyhow::Result<()> {
458            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
459        }
460
461        async fn send_checkpoint_expect_timeout(
462            &self,
463            checkpoint: u64,
464            timeout_duration: Duration,
465        ) {
466            timeout(timeout_duration, self.send_checkpoint(checkpoint))
467                .await
468                .unwrap_err(); // Panics if send succeeds
469        }
470    }
471
472    #[tokio::test]
473    async fn test_e2e_pipeline() {
474        let config = ConcurrentConfig {
475            pruner: Some(PrunerConfig {
476                interval_ms: 5_000, // Long interval to test states before pruning
477                delay_ms: 100,      // Short delay for faster tests
478                retention: 3,       // Keep only 3 checkpoints
479                ..Default::default()
480            }),
481            ..Default::default()
482        };
483        let store = MockStore::default();
484        let setup = TestSetup::new(config, store, 0).await;
485
486        // Send initial checkpoints
487        for i in 0..3 {
488            setup
489                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
490                .await
491                .unwrap();
492        }
493
494        // Verify all initial data is available (before any pruning)
495        for i in 0..3 {
496            let data = setup
497                .store
498                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
499                .await;
500            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
501        }
502
503        // Add more checkpoints to trigger pruning
504        for i in 3..6 {
505            setup
506                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
507                .await
508                .unwrap();
509        }
510
511        // Verify data is still available BEFORE pruning kicks in
512        // With long pruning interval (5s), we can safely verify data without race conditions
513        for i in 0..6 {
514            let data = setup
515                .store
516                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
517                .await;
518            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
519        }
520
521        // Wait for pruning to occur (5s + delay + processing time)
522        tokio::time::sleep(Duration::from_millis(5_200)).await;
523
524        // Verify pruning has occurred
525        {
526            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
527
528            // Verify recent checkpoints are still available
529            assert!(data.contains_key(&3));
530            assert!(data.contains_key(&4));
531            assert!(data.contains_key(&5));
532
533            // Verify old checkpoints are pruned
534            assert!(!data.contains_key(&0));
535            assert!(!data.contains_key(&1));
536            assert!(!data.contains_key(&2));
537        };
538
539        setup.shutdown().await;
540    }
541
542    #[tokio::test]
543    async fn test_e2e_pipeline_without_pruning() {
544        let config = ConcurrentConfig {
545            pruner: None,
546            ..Default::default()
547        };
548        let store = MockStore::default();
549        let setup = TestSetup::new(config, store, 0).await;
550
551        // Send several checkpoints
552        for i in 0..10 {
553            setup
554                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
555                .await
556                .unwrap();
557        }
558
559        // Wait for all data to be processed and committed
560        let watermark = setup
561            .store
562            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
563            .await;
564
565        // Verify ALL data was processed correctly (no pruning)
566        for i in 0..10 {
567            let data = setup
568                .store
569                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
570                .await;
571            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
572        }
573
574        // Verify watermark progression
575        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
576        assert_eq!(watermark.tx_hi, 18); // 9 * 2
577        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
578
579        // Verify no data was pruned - all 10 checkpoints should still exist
580        let total_checkpoints = {
581            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
582            data.len()
583        };
584        assert_eq!(total_checkpoints, 10);
585
586        setup.shutdown().await;
587    }
588
589    #[tokio::test]
590    async fn test_out_of_order_processing() {
591        let config = ConcurrentConfig::default();
592        let store = MockStore::default();
593        let setup = TestSetup::new(config, store, 0).await;
594
595        // Send checkpoints out of order
596        let checkpoints = vec![2, 0, 4, 1, 3];
597        for cp in checkpoints {
598            setup
599                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
600                .await
601                .unwrap();
602        }
603
604        // Wait for all data to be processed
605        setup
606            .store
607            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
608            .await;
609
610        // Verify all checkpoints were processed correctly despite out-of-order arrival
611        for i in 0..5 {
612            let data = setup
613                .store
614                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
615                .await;
616            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
617        }
618
619        setup.shutdown().await;
620    }
621
622    #[tokio::test]
623    async fn test_watermark_progression_with_gaps() {
624        let config = ConcurrentConfig::default();
625        let store = MockStore::default();
626        let setup = TestSetup::new(config, store, 0).await;
627
628        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
629        for cp in [0, 1, 3, 4] {
630            setup
631                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
632                .await
633                .unwrap();
634        }
635
636        // Wait for processing
637        tokio::time::sleep(Duration::from_secs(1)).await;
638
639        // Watermark should only progress to 1 (can't progress past the gap)
640        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
641        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
642
643        // Now send the missing checkpoint 2
644        setup
645            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
646            .await
647            .unwrap();
648
649        // Now watermark should progress to 4
650        let watermark = setup
651            .store
652            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
653            .await;
654        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
655
656        setup.shutdown().await;
657    }
658
659    // ==================== BACK-PRESSURE TESTING ====================
660
661    #[tokio::test]
662    async fn test_back_pressure_collector_max_pending_rows() {
663        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
664        //
665        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
666        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
667        // │   Input    │    │ (FANOUT=2) │    │            │    │            │
668        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
669        //                │                 │                 │
670        //              [●●●]           [●●●●●●●]         [●●●●●●]
671        //            buffer: 3         buffer: 7         buffer: 6
672        //
673        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
674
675        let config = ConcurrentConfig {
676            committer: CommitterConfig {
677                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
678                write_concurrency: 1,
679                ..Default::default()
680            },
681            ..Default::default()
682        };
683        let store = MockStore::default();
684        let setup = TestSetup::new(config, store, 0).await;
685
686        // Wait for initial setup
687        tokio::time::sleep(Duration::from_millis(200)).await;
688
689        // Pipeline capacity analysis with collector back pressure:
690        // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
691        //
692        // Channel and task breakdown:
693        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
694        // - Processor tasks: 2 tasks (FANOUT=2)
695        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
696        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
697        //
698        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
699
700        // Fill pipeline to capacity - these should all succeed
701        for i in 0..14 {
702            setup
703                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
704                .await
705                .unwrap();
706        }
707
708        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
709        setup
710            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
711            .await;
712
713        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
714        setup
715            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
716            .await
717            .unwrap();
718
719        // Verify data was processed correctly
720        let data = setup
721            .store
722            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
723            .await;
724        assert_eq!(data, vec![1, 2]);
725
726        setup.shutdown().await;
727    }
728
729    #[tokio::test]
730    async fn test_back_pressure_committer_slow_commits() {
731        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
732        //
733        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
734        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
735        // │   Input    │    │ (FANOUT=2) │    │            │    │🐌 10s Delay│
736        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
737        //                │                 │                 │
738        //              [●●●]           [●●●●●●●]         [●●●●●●]
739        //            buffer: 3         buffer: 7         buffer: 6
740        //
741        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
742
743        let config = ConcurrentConfig {
744            committer: CommitterConfig {
745                write_concurrency: 1, // Single committer for deterministic blocking
746                ..Default::default()
747            },
748            ..Default::default()
749        };
750        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
751        let setup = TestSetup::new(config, store, 0).await;
752
753        // Pipeline capacity analysis with slow commits:
754        // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
755        //
756        // Channel and task breakdown:
757        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
758        // - Processor tasks: 2 tasks (FANOUT=2)
759        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
760        // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
761        // - Committer task: 1 task (blocked by slow commit)
762        //
763        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
764
765        // Fill pipeline to theoretical capacity - these should all succeed
766        for i in 0..19 {
767            setup
768                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
769                .await
770                .unwrap();
771        }
772
773        // Find the actual back pressure point
774        // Due to non-determinism in collector's tokio::select!, the collector may consume
775        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
776        // This means back pressure occurs somewhere in range 19-21.
777        let mut back_pressure_checkpoint = None;
778        for checkpoint in 19..22 {
779            if setup
780                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
781                .await
782                .is_err()
783            {
784                back_pressure_checkpoint = Some(checkpoint);
785                break;
786            }
787        }
788        assert!(
789            back_pressure_checkpoint.is_some(),
790            "Back pressure should occur between checkpoints 19-21"
791        );
792
793        // Verify that some data has been processed (pipeline is working)
794        setup
795            .store
796            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
797            .await;
798
799        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
800        setup
801            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
802            .await
803            .unwrap();
804
805        setup.shutdown().await;
806    }
807
808    // ==================== FAILURE TESTING ====================
809
810    #[tokio::test]
811    async fn test_commit_failure_retry() {
812        let config = ConcurrentConfig::default();
813        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
814        let setup = TestSetup::new(config, store, 0).await;
815
816        // Send a checkpoint
817        setup
818            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
819            .await
820            .unwrap();
821
822        // Should eventually succeed despite initial commit failures
823        setup
824            .store
825            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
826            .await;
827
828        // Verify data was eventually committed
829        let data = setup
830            .store
831            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
832            .await;
833        assert_eq!(data, vec![1, 2]);
834
835        setup.shutdown().await;
836    }
837
838    #[tokio::test]
839    async fn test_prune_failure_retry() {
840        let config = ConcurrentConfig {
841            pruner: Some(PrunerConfig {
842                interval_ms: 2000, // 2 seconds interval for testing
843                delay_ms: 100,     // Short delay
844                retention: 2,      // Keep only 2 checkpoints
845                ..Default::default()
846            }),
847            ..Default::default()
848        };
849
850        // Configure prune failures for range [0, 2) - fail twice then succeed
851        let store = MockStore::default().with_prune_failures(0, 2, 1);
852        let setup = TestSetup::new(config, store, 0).await;
853
854        // Send enough checkpoints to trigger pruning
855        for i in 0..4 {
856            setup
857                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
858                .await
859                .unwrap();
860        }
861
862        // Verify data is still available BEFORE pruning kicks in
863        // With long pruning interval (5s), we can safely verify data without race conditions
864        for i in 0..4 {
865            let data = setup
866                .store
867                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
868                .await;
869            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
870        }
871
872        // Wait for first pruning attempt (should fail) and verify no data has been pruned
873        setup
874            .store
875            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
876            .await;
877        {
878            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
879            for i in 0..4 {
880                assert!(data.contains_key(&i));
881            }
882        };
883
884        // Wait for second pruning attempt (should succeed)
885        setup
886            .store
887            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
888            .await;
889        {
890            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
891            // Verify recent checkpoints are still available
892            assert!(data.contains_key(&2));
893            assert!(data.contains_key(&3));
894
895            // Verify old checkpoints are pruned
896            assert!(!data.contains_key(&0));
897            assert!(!data.contains_key(&1));
898        };
899
900        setup.shutdown().await;
901    }
902
903    #[tokio::test]
904    async fn test_reader_watermark_failure_retry() {
905        let config = ConcurrentConfig {
906            pruner: Some(PrunerConfig {
907                interval_ms: 100, // Fast interval for testing
908                delay_ms: 100,    // Short delay
909                retention: 3,     // Keep 3 checkpoints
910                ..Default::default()
911            }),
912            ..Default::default()
913        };
914
915        // Configure reader watermark failures - fail 2 times then succeed
916        let store = MockStore::default().with_reader_watermark_failures(2);
917        let setup = TestSetup::new(config, store, 0).await;
918
919        // Send checkpoints to trigger reader watermark updates
920        for i in 0..6 {
921            setup
922                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
923                .await
924                .unwrap();
925        }
926
927        // Wait for processing to complete
928        setup
929            .store
930            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
931            .await;
932
933        // Wait for reader watermark task to attempt updates (with failures and retries)
934        tokio::time::sleep(Duration::from_secs(2)).await;
935
936        // Verify that reader watermark was eventually updated despite failures
937        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
938        assert_eq!(watermark.reader_lo, 3);
939
940        setup.shutdown().await;
941    }
942
943    #[tokio::test]
944    async fn test_database_connection_failure_retry() {
945        let config = ConcurrentConfig::default();
946        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
947        let setup = TestSetup::new(config, store, 0).await;
948
949        // Send a checkpoint
950        setup
951            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
952            .await
953            .unwrap();
954
955        // Should eventually succeed despite initial failures
956        setup
957            .store
958            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
959            .await;
960
961        // Verify data was eventually committed
962        let data = setup
963            .store
964            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
965            .await;
966        assert_eq!(data, vec![1, 2]);
967
968        setup.shutdown().await;
969    }
970}