sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_util::sync::CancellationToken;
10use tracing::info;
11
12use crate::{
13    FieldCount, metrics::IndexerMetrics, store::Store,
14    types::full_checkpoint_content::CheckpointData,
15};
16
17use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
18
19use self::{
20    collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
21    reader_watermark::reader_watermark,
22};
23
24mod collector;
25mod commit_watermark;
26mod committer;
27mod pruner;
28mod reader_watermark;
29
30/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
31/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
32///
33/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
34/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
35/// usage, but each handle may choose to override these defaults, e.g.
36///
37/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
38///   sizes).
39/// - Handlers that do more work during processing may wish to increase their fanout so more of it
40///   can be done concurrently, to preserve throughput.
41///
42/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
43/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
44/// checkpoint below which all data has been committed.
45///
46/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
47/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
48/// back to the ingestion service.
49#[async_trait]
50pub trait Handler: Processor<Value: FieldCount> {
51    type Store: Store;
52
53    /// If at least this many rows are pending, the committer will commit them eagerly.
54    const MIN_EAGER_ROWS: usize = 50;
55
56    /// If there are more than this many rows pending, the committer applies backpressure.
57    const MAX_PENDING_ROWS: usize = 5000;
58
59    /// The maximum number of watermarks that can show up in a single batch.
60    /// This limit exists to deal with pipelines that produce no data for a majority of
61    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
62    const MAX_WATERMARK_UPDATES: usize = 10_000;
63
64    /// Take a chunk of values and commit them to the database, returning the number of rows
65    /// affected.
66    async fn commit<'a>(
67        values: &[Self::Value],
68        conn: &mut <Self::Store as Store>::Connection<'a>,
69    ) -> anyhow::Result<usize>;
70
71    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
72    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
73    async fn prune<'a>(
74        &self,
75        _from: u64,
76        _to_exclusive: u64,
77        _conn: &mut <Self::Store as Store>::Connection<'a>,
78    ) -> anyhow::Result<usize> {
79        Ok(0)
80    }
81}
82
83/// Configuration for a concurrent pipeline
84#[derive(Serialize, Deserialize, Debug, Clone, Default)]
85pub struct ConcurrentConfig {
86    /// Configuration for the writer, that makes forward progress.
87    pub committer: CommitterConfig,
88
89    /// Configuration for the pruner, that deletes old data.
90    pub pruner: Option<PrunerConfig>,
91}
92
93#[derive(Serialize, Deserialize, Debug, Clone)]
94pub struct PrunerConfig {
95    /// How often the pruner should check whether there is any data to prune, in milliseconds.
96    pub interval_ms: u64,
97
98    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
99    /// this new watermark, in milliseconds.
100    pub delay_ms: u64,
101
102    /// How much data to keep, this is measured in checkpoints.
103    pub retention: u64,
104
105    /// The maximum range to try and prune in one request, measured in checkpoints.
106    pub max_chunk_size: u64,
107
108    /// The max number of tasks to run in parallel for pruning.
109    pub prune_concurrency: u64,
110}
111
112/// Values ready to be written to the database. This is an internal type used to communicate
113/// between the collector and the committer parts of the pipeline.
114///
115/// Values inside each batch may or may not be from the same checkpoint. Values in the same
116/// checkpoint can also be split across multiple batches.
117struct BatchedRows<H: Handler> {
118    /// The rows to write
119    values: Vec<H::Value>,
120    /// Proportions of all the watermarks that are represented in this chunk
121    watermark: Vec<WatermarkPart>,
122}
123
124impl PrunerConfig {
125    pub fn interval(&self) -> Duration {
126        Duration::from_millis(self.interval_ms)
127    }
128
129    pub fn delay(&self) -> Duration {
130        Duration::from_millis(self.delay_ms)
131    }
132}
133
134impl<H: Handler> BatchedRows<H> {
135    fn new() -> Self {
136        Self {
137            values: vec![],
138            watermark: vec![],
139        }
140    }
141
142    /// Number of rows in this batch.
143    fn len(&self) -> usize {
144        self.values.len()
145    }
146
147    /// The batch is full if it has more than enough values to write to the database, or more than
148    /// enough watermarks to update.
149    fn is_full(&self) -> bool {
150        self.values.len() >= max_chunk_rows::<H>()
151            || self.watermark.len() >= H::MAX_WATERMARK_UPDATES
152    }
153}
154
155impl Default for PrunerConfig {
156    fn default() -> Self {
157        Self {
158            interval_ms: 300_000,
159            delay_ms: 120_000,
160            retention: 4_000_000,
161            max_chunk_size: 2_000,
162            prune_concurrency: 1,
163        }
164    }
165}
166
167/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
168/// strictly after the `watermark` (or from the beginning if no watermark was provided).
169///
170/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
171/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
172/// the database, a committer which writes the rows out concurrently, and a watermark task to
173/// update the high watermark.
174///
175/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
176/// either because it received the checkpoints out-of-order or because of variance in processing
177/// time.
178///
179/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
180/// watermark below which all data has been committed (modulo pruning), as long as `skip_watermark`
181/// is not true.
182///
183/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
184/// channels are created to communicate between its various components. The pipeline can be
185/// shutdown using its `cancel` token, and will also shutdown if any of its independent tasks
186/// reports an issue.
187pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
188    handler: H,
189    next_checkpoint: u64,
190    config: ConcurrentConfig,
191    skip_watermark: bool,
192    store: H::Store,
193    checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
194    metrics: Arc<IndexerMetrics>,
195    cancel: CancellationToken,
196) -> JoinHandle<()> {
197    info!(
198        pipeline = H::NAME,
199        "Starting pipeline with config: {:?}", config
200    );
201    let ConcurrentConfig {
202        committer: committer_config,
203        pruner: pruner_config,
204    } = config;
205
206    let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
207    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
208    let (collector_tx, committer_rx) =
209        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
210    //docs::/#buff
211    let (committer_tx, watermark_rx) =
212        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
213
214    // The pruner is not connected to the rest of the tasks by channels, so it needs to be
215    // explicitly signalled to shutdown when the other tasks shutdown, in addition to listening to
216    // the global cancel signal. We achieve this by creating a child cancel token that we call
217    // cancel on once the committer tasks have shutdown.
218    let pruner_cancel = cancel.child_token();
219    let handler = Arc::new(handler);
220
221    let processor = processor(
222        handler.clone(),
223        checkpoint_rx,
224        processor_tx,
225        metrics.clone(),
226        cancel.clone(),
227    );
228
229    let collector = collector::<H>(
230        committer_config.clone(),
231        collector_rx,
232        collector_tx,
233        metrics.clone(),
234        cancel.clone(),
235    );
236
237    let committer = committer::<H>(
238        committer_config.clone(),
239        skip_watermark,
240        committer_rx,
241        committer_tx,
242        store.clone(),
243        metrics.clone(),
244        cancel.clone(),
245    );
246
247    let commit_watermark = commit_watermark::<H>(
248        next_checkpoint,
249        committer_config,
250        skip_watermark,
251        watermark_rx,
252        store.clone(),
253        metrics.clone(),
254        cancel,
255    );
256
257    let reader_watermark = reader_watermark::<H>(
258        pruner_config.clone(),
259        store.clone(),
260        metrics.clone(),
261        pruner_cancel.clone(),
262    );
263
264    let pruner = pruner(
265        handler,
266        pruner_config,
267        store,
268        metrics,
269        pruner_cancel.clone(),
270    );
271
272    tokio::spawn(async move {
273        let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
274
275        pruner_cancel.cancel();
276        let _ = futures::join!(reader_watermark, pruner);
277    })
278}
279
280const fn max_chunk_rows<H: Handler>() -> usize {
281    if H::Value::FIELD_COUNT == 0 {
282        i16::MAX as usize
283    } else {
284        i16::MAX as usize / H::Value::FIELD_COUNT
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use std::{sync::Arc, time::Duration};
291
292    use prometheus::Registry;
293    use tokio::{sync::mpsc, time::timeout};
294    use tokio_util::sync::CancellationToken;
295
296    use crate::{
297        FieldCount,
298        metrics::IndexerMetrics,
299        mocks::store::{MockConnection, MockStore},
300        pipeline::Processor,
301        types::{
302            full_checkpoint_content::CheckpointData,
303            test_checkpoint_data_builder::TestCheckpointDataBuilder,
304        },
305    };
306
307    use super::*;
308
309    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
310    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
311
312    #[derive(Clone, Debug, FieldCount)]
313    struct TestValue {
314        checkpoint: u64,
315        data: u64,
316    }
317
318    struct DataPipeline;
319
320    #[async_trait]
321    impl Processor for DataPipeline {
322        const NAME: &'static str = "test_handler";
323        const FANOUT: usize = 2;
324        type Value = TestValue;
325
326        async fn process(
327            &self,
328            checkpoint: &Arc<CheckpointData>,
329        ) -> anyhow::Result<Vec<Self::Value>> {
330            let cp_num = checkpoint.checkpoint_summary.sequence_number;
331
332            // Every checkpoint will come with 2 processed values
333            Ok(vec![
334                TestValue {
335                    checkpoint: cp_num,
336                    data: cp_num * 10 + 1,
337                },
338                TestValue {
339                    checkpoint: cp_num,
340                    data: cp_num * 10 + 2,
341                },
342            ])
343        }
344    }
345
346    #[async_trait]
347    impl Handler for DataPipeline {
348        type Store = MockStore;
349        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
350        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
351        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
352
353        async fn commit<'a>(
354            values: &[Self::Value],
355            conn: &mut MockConnection<'a>,
356        ) -> anyhow::Result<usize> {
357            // Group values by checkpoint
358            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
359                std::collections::HashMap::new();
360            for value in values {
361                grouped
362                    .entry(value.checkpoint)
363                    .or_default()
364                    .push(value.data);
365            }
366
367            // Commit all data at once
368            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
369        }
370
371        async fn prune<'a>(
372            &self,
373            from: u64,
374            to_exclusive: u64,
375            conn: &mut MockConnection<'a>,
376        ) -> anyhow::Result<usize> {
377            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
378        }
379    }
380
381    struct TestSetup {
382        store: MockStore,
383        checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
384        pipeline_handle: JoinHandle<()>,
385        cancel: CancellationToken,
386    }
387
388    impl TestSetup {
389        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
390            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
391            let metrics = IndexerMetrics::new(None, &Registry::default());
392            let cancel = CancellationToken::new();
393
394            let skip_watermark = false;
395            let pipeline_handle = pipeline(
396                DataPipeline,
397                next_checkpoint,
398                config,
399                skip_watermark,
400                store.clone(),
401                checkpoint_rx,
402                metrics,
403                cancel.clone(),
404            );
405
406            Self {
407                store,
408                checkpoint_tx,
409                pipeline_handle,
410                cancel,
411            }
412        }
413
414        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
415            let checkpoint = Arc::new(
416                TestCheckpointDataBuilder::new(checkpoint)
417                    .with_epoch(1)
418                    .with_network_total_transactions(checkpoint * 2)
419                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
420                    .build_checkpoint(),
421            );
422            self.checkpoint_tx.send(checkpoint).await?;
423            Ok(())
424        }
425
426        async fn shutdown(self) {
427            drop(self.checkpoint_tx);
428            self.cancel.cancel();
429            let _ = self.pipeline_handle.await;
430        }
431
432        async fn send_checkpoint_with_timeout(
433            &self,
434            checkpoint: u64,
435            timeout_duration: Duration,
436        ) -> anyhow::Result<()> {
437            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
438        }
439
440        async fn send_checkpoint_expect_timeout(
441            &self,
442            checkpoint: u64,
443            timeout_duration: Duration,
444        ) {
445            timeout(timeout_duration, self.send_checkpoint(checkpoint))
446                .await
447                .unwrap_err(); // Panics if send succeeds
448        }
449    }
450
451    #[tokio::test]
452    async fn test_e2e_pipeline() {
453        let config = ConcurrentConfig {
454            pruner: Some(PrunerConfig {
455                interval_ms: 5_000, // Long interval to test states before pruning
456                delay_ms: 100,      // Short delay for faster tests
457                retention: 3,       // Keep only 3 checkpoints
458                ..Default::default()
459            }),
460            ..Default::default()
461        };
462        let store = MockStore::default();
463        let setup = TestSetup::new(config, store, 0).await;
464
465        // Send initial checkpoints
466        for i in 0..3 {
467            setup
468                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
469                .await
470                .unwrap();
471        }
472
473        // Verify all initial data is available (before any pruning)
474        for i in 0..3 {
475            let data = setup
476                .store
477                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
478                .await;
479            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
480        }
481
482        // Add more checkpoints to trigger pruning
483        for i in 3..6 {
484            setup
485                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
486                .await
487                .unwrap();
488        }
489
490        // Verify data is still available BEFORE pruning kicks in
491        // With long pruning interval (5s), we can safely verify data without race conditions
492        for i in 0..6 {
493            let data = setup
494                .store
495                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
496                .await;
497            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
498        }
499
500        // Wait for pruning to occur (5s + delay + processing time)
501        tokio::time::sleep(Duration::from_millis(5_200)).await;
502
503        // Verify pruning has occurred
504        {
505            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
506
507            // Verify recent checkpoints are still available
508            assert!(data.contains_key(&3));
509            assert!(data.contains_key(&4));
510            assert!(data.contains_key(&5));
511
512            // Verify old checkpoints are pruned
513            assert!(!data.contains_key(&0));
514            assert!(!data.contains_key(&1));
515            assert!(!data.contains_key(&2));
516        };
517
518        setup.shutdown().await;
519    }
520
521    #[tokio::test]
522    async fn test_e2e_pipeline_without_pruning() {
523        let config = ConcurrentConfig {
524            pruner: None,
525            ..Default::default()
526        };
527        let store = MockStore::default();
528        let setup = TestSetup::new(config, store, 0).await;
529
530        // Send several checkpoints
531        for i in 0..10 {
532            setup
533                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
534                .await
535                .unwrap();
536        }
537
538        // Wait for all data to be processed and committed
539        let watermark = setup
540            .store
541            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
542            .await;
543
544        // Verify ALL data was processed correctly (no pruning)
545        for i in 0..10 {
546            let data = setup
547                .store
548                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
549                .await;
550            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
551        }
552
553        // Verify watermark progression
554        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
555        assert_eq!(watermark.tx_hi, 18); // 9 * 2
556        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
557
558        // Verify no data was pruned - all 10 checkpoints should still exist
559        let total_checkpoints = {
560            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
561            data.len()
562        };
563        assert_eq!(total_checkpoints, 10);
564
565        setup.shutdown().await;
566    }
567
568    #[tokio::test]
569    async fn test_out_of_order_processing() {
570        let config = ConcurrentConfig::default();
571        let store = MockStore::default();
572        let setup = TestSetup::new(config, store, 0).await;
573
574        // Send checkpoints out of order
575        let checkpoints = vec![2, 0, 4, 1, 3];
576        for cp in checkpoints {
577            setup
578                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
579                .await
580                .unwrap();
581        }
582
583        // Wait for all data to be processed
584        setup
585            .store
586            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
587            .await;
588
589        // Verify all checkpoints were processed correctly despite out-of-order arrival
590        for i in 0..5 {
591            let data = setup
592                .store
593                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
594                .await;
595            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
596        }
597
598        setup.shutdown().await;
599    }
600
601    #[tokio::test]
602    async fn test_watermark_progression_with_gaps() {
603        let config = ConcurrentConfig::default();
604        let store = MockStore::default();
605        let setup = TestSetup::new(config, store, 0).await;
606
607        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
608        for cp in [0, 1, 3, 4] {
609            setup
610                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
611                .await
612                .unwrap();
613        }
614
615        // Wait for processing
616        tokio::time::sleep(Duration::from_secs(1)).await;
617
618        // Watermark should only progress to 1 (can't progress past the gap)
619        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
620        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
621
622        // Now send the missing checkpoint 2
623        setup
624            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
625            .await
626            .unwrap();
627
628        // Now watermark should progress to 4
629        let watermark = setup
630            .store
631            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
632            .await;
633        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
634
635        setup.shutdown().await;
636    }
637
638    // ==================== BACK-PRESSURE TESTING ====================
639
640    #[tokio::test]
641    async fn test_back_pressure_collector_max_pending_rows() {
642        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
643        //
644        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
645        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
646        // │   Input    │    │ (FANOUT=2) │    │            │    │            │
647        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
648        //                │                 │                 │
649        //              [●●●]           [●●●●●●●]         [●●●●●●]
650        //            buffer: 3         buffer: 7         buffer: 6
651        //
652        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
653
654        let config = ConcurrentConfig {
655            committer: CommitterConfig {
656                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
657                write_concurrency: 1,
658                ..Default::default()
659            },
660            ..Default::default()
661        };
662        let store = MockStore::default();
663        let setup = TestSetup::new(config, store, 0).await;
664
665        // Wait for initial setup
666        tokio::time::sleep(Duration::from_millis(200)).await;
667
668        // Pipeline capacity analysis with collector back pressure:
669        // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
670        //
671        // Channel and task breakdown:
672        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
673        // - Processor tasks: 2 tasks (FANOUT=2)
674        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
675        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
676        //
677        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
678
679        // Fill pipeline to capacity - these should all succeed
680        for i in 0..14 {
681            setup
682                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
683                .await
684                .unwrap();
685        }
686
687        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
688        setup
689            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
690            .await;
691
692        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
693        setup
694            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
695            .await
696            .unwrap();
697
698        // Verify data was processed correctly
699        let data = setup
700            .store
701            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
702            .await;
703        assert_eq!(data, vec![1, 2]);
704
705        setup.shutdown().await;
706    }
707
708    #[tokio::test]
709    async fn test_back_pressure_committer_slow_commits() {
710        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
711        //
712        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
713        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
714        // │   Input    │    │ (FANOUT=2) │    │            │    │🐌 10s Delay│
715        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
716        //                │                 │                 │
717        //              [●●●]           [●●●●●●●]         [●●●●●●]
718        //            buffer: 3         buffer: 7         buffer: 6
719        //
720        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
721
722        let config = ConcurrentConfig {
723            committer: CommitterConfig {
724                write_concurrency: 1, // Single committer for deterministic blocking
725                ..Default::default()
726            },
727            ..Default::default()
728        };
729        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
730        let setup = TestSetup::new(config, store, 0).await;
731
732        // Pipeline capacity analysis with slow commits:
733        // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
734        //
735        // Channel and task breakdown:
736        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
737        // - Processor tasks: 2 tasks (FANOUT=2)
738        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
739        // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
740        // - Committer task: 1 task (blocked by slow commit)
741        //
742        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
743
744        // Fill pipeline to theoretical capacity - these should all succeed
745        for i in 0..19 {
746            setup
747                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
748                .await
749                .unwrap();
750        }
751
752        // Find the actual back pressure point
753        // Due to non-determinism in collector's tokio::select!, the collector may consume
754        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
755        // This means back pressure occurs somewhere in range 19-21.
756        let mut back_pressure_checkpoint = None;
757        for checkpoint in 19..22 {
758            if setup
759                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
760                .await
761                .is_err()
762            {
763                back_pressure_checkpoint = Some(checkpoint);
764                break;
765            }
766        }
767        assert!(
768            back_pressure_checkpoint.is_some(),
769            "Back pressure should occur between checkpoints 19-21"
770        );
771
772        // Verify that some data has been processed (pipeline is working)
773        setup
774            .store
775            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
776            .await;
777
778        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
779        setup
780            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
781            .await
782            .unwrap();
783
784        setup.shutdown().await;
785    }
786
787    // ==================== FAILURE TESTING ====================
788
789    #[tokio::test]
790    async fn test_commit_failure_retry() {
791        let config = ConcurrentConfig::default();
792        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
793        let setup = TestSetup::new(config, store, 0).await;
794
795        // Send a checkpoint
796        setup
797            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
798            .await
799            .unwrap();
800
801        // Should eventually succeed despite initial commit failures
802        setup
803            .store
804            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
805            .await;
806
807        // Verify data was eventually committed
808        let data = setup
809            .store
810            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
811            .await;
812        assert_eq!(data, vec![1, 2]);
813
814        setup.shutdown().await;
815    }
816
817    #[tokio::test]
818    async fn test_prune_failure_retry() {
819        let config = ConcurrentConfig {
820            pruner: Some(PrunerConfig {
821                interval_ms: 2000, // 2 seconds interval for testing
822                delay_ms: 100,     // Short delay
823                retention: 2,      // Keep only 2 checkpoints
824                ..Default::default()
825            }),
826            ..Default::default()
827        };
828
829        // Configure prune failures for range [0, 2) - fail twice then succeed
830        let store = MockStore::default().with_prune_failures(0, 2, 1);
831        let setup = TestSetup::new(config, store, 0).await;
832
833        // Send enough checkpoints to trigger pruning
834        for i in 0..4 {
835            setup
836                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
837                .await
838                .unwrap();
839        }
840
841        // Verify data is still available BEFORE pruning kicks in
842        // With long pruning interval (5s), we can safely verify data without race conditions
843        for i in 0..4 {
844            let data = setup
845                .store
846                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
847                .await;
848            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
849        }
850
851        // Wait for first pruning attempt (should fail) and verify no data has been pruned
852        setup
853            .store
854            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
855            .await;
856        {
857            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
858            for i in 0..4 {
859                assert!(data.contains_key(&i));
860            }
861        };
862
863        // Wait for second pruning attempt (should succeed)
864        setup
865            .store
866            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
867            .await;
868        {
869            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
870            // Verify recent checkpoints are still available
871            assert!(data.contains_key(&2));
872            assert!(data.contains_key(&3));
873
874            // Verify old checkpoints are pruned
875            assert!(!data.contains_key(&0));
876            assert!(!data.contains_key(&1));
877        };
878
879        setup.shutdown().await;
880    }
881
882    #[tokio::test]
883    async fn test_reader_watermark_failure_retry() {
884        let config = ConcurrentConfig {
885            pruner: Some(PrunerConfig {
886                interval_ms: 100, // Fast interval for testing
887                delay_ms: 100,    // Short delay
888                retention: 3,     // Keep 3 checkpoints
889                ..Default::default()
890            }),
891            ..Default::default()
892        };
893
894        // Configure reader watermark failures - fail 2 times then succeed
895        let store = MockStore::default().with_reader_watermark_failures(2);
896        let setup = TestSetup::new(config, store, 0).await;
897
898        // Send checkpoints to trigger reader watermark updates
899        for i in 0..6 {
900            setup
901                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
902                .await
903                .unwrap();
904        }
905
906        // Wait for processing to complete
907        setup
908            .store
909            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
910            .await;
911
912        // Wait for reader watermark task to attempt updates (with failures and retries)
913        tokio::time::sleep(Duration::from_secs(2)).await;
914
915        // Verify that reader watermark was eventually updated despite failures
916        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
917        assert_eq!(watermark.reader_lo, 3);
918
919        setup.shutdown().await;
920    }
921
922    #[tokio::test]
923    async fn test_database_connection_failure_retry() {
924        let config = ConcurrentConfig::default();
925        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
926        let setup = TestSetup::new(config, store, 0).await;
927
928        // Send a checkpoint
929        setup
930            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
931            .await
932            .unwrap();
933
934        // Should eventually succeed despite initial failures
935        setup
936            .store
937            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
938            .await;
939
940        // Verify data was eventually committed
941        let data = setup
942            .store
943            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
944            .await;
945        assert_eq!(data, vec![1, 2]);
946
947        setup.shutdown().await;
948    }
949}