sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    sync::{Arc, atomic::AtomicU64},
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::{
12    sync::{SetOnce, mpsc},
13    task::JoinHandle,
14};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19    Task, metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint,
20};
21
22use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
23
24use self::{
25    collector::collector, commit_watermark::commit_watermark, committer::committer,
26    main_reader_lo::track_main_reader_lo, pruner::pruner, reader_watermark::reader_watermark,
27};
28
29mod collector;
30mod commit_watermark;
31mod committer;
32mod main_reader_lo;
33mod pruner;
34mod reader_watermark;
35
36/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum BatchStatus {
39    /// The batch can accept more values.
40    Pending,
41    /// The batch is full and should be committed.
42    Ready,
43}
44
45/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
46/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
47///
48/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
49/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
50/// usage, but each handle may choose to override these defaults, e.g.
51///
52/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
53///   sizes).
54/// - Handlers that do more work during processing may wish to increase their fanout so more of it
55///   can be done concurrently, to preserve throughput.
56///
57/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
58/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
59/// checkpoint below which all data has been committed.
60///
61/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
62/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
63/// back to the ingestion service.
64#[async_trait]
65pub trait Handler: Processor {
66    type Store: Store;
67    type Batch: Default + Send + Sync + 'static;
68
69    /// If at least this many rows are pending, the committer will commit them eagerly.
70    const MIN_EAGER_ROWS: usize = 50;
71
72    /// If there are more than this many rows pending, the committer applies backpressure.
73    const MAX_PENDING_ROWS: usize = 5000;
74
75    /// The maximum number of watermarks that can show up in a single batch.
76    /// This limit exists to deal with pipelines that produce no data for a majority of
77    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
78    const MAX_WATERMARK_UPDATES: usize = 10_000;
79
80    /// Add values from the iterator to the batch. The implementation may take all, some, or none
81    /// of the values from the iterator by calling `.next()`.
82    ///
83    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
84    /// or `BatchStatus::Pending` if the batch can accept more values.
85    ///
86    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
87    /// may also decide to commit a batch based on the trait parameters above.
88    fn batch(
89        &self,
90        batch: &mut Self::Batch,
91        values: &mut std::vec::IntoIter<Self::Value>,
92    ) -> BatchStatus;
93
94    /// Commit the batch to the database, returning the number of rows affected.
95    async fn commit<'a>(
96        &self,
97        batch: &Self::Batch,
98        conn: &mut <Self::Store as Store>::Connection<'a>,
99    ) -> anyhow::Result<usize>;
100
101    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
102    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
103    async fn prune<'a>(
104        &self,
105        _from: u64,
106        _to_exclusive: u64,
107        _conn: &mut <Self::Store as Store>::Connection<'a>,
108    ) -> anyhow::Result<usize> {
109        Ok(0)
110    }
111}
112
113/// Configuration for a concurrent pipeline
114#[derive(Serialize, Deserialize, Debug, Clone, Default)]
115pub struct ConcurrentConfig {
116    /// Configuration for the writer, that makes forward progress.
117    pub committer: CommitterConfig,
118
119    /// Configuration for the pruner, that deletes old data.
120    pub pruner: Option<PrunerConfig>,
121}
122
123#[derive(Serialize, Deserialize, Debug, Clone)]
124pub struct PrunerConfig {
125    /// How often the pruner should check whether there is any data to prune, in milliseconds.
126    pub interval_ms: u64,
127
128    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
129    /// this new watermark, in milliseconds.
130    pub delay_ms: u64,
131
132    /// How much data to keep, this is measured in checkpoints.
133    pub retention: u64,
134
135    /// The maximum range to try and prune in one request, measured in checkpoints.
136    pub max_chunk_size: u64,
137
138    /// The max number of tasks to run in parallel for pruning.
139    pub prune_concurrency: u64,
140}
141
142/// Values ready to be written to the database. This is an internal type used to communicate
143/// between the collector and the committer parts of the pipeline.
144///
145/// Values inside each batch may or may not be from the same checkpoint. Values in the same
146/// checkpoint can also be split across multiple batches.
147struct BatchedRows<H: Handler> {
148    /// The batch to write
149    batch: H::Batch,
150    /// Number of rows in the batch
151    batch_len: usize,
152    /// Proportions of all the watermarks that are represented in this chunk
153    watermark: Vec<WatermarkPart>,
154}
155
156impl<H, V> BatchedRows<H>
157where
158    H: Handler<Batch = Vec<V>, Value = V>,
159{
160    #[cfg(test)]
161    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
162        let batch_len = batch.len();
163        Self {
164            batch,
165            batch_len,
166            watermark,
167        }
168    }
169}
170
171impl PrunerConfig {
172    pub fn interval(&self) -> Duration {
173        Duration::from_millis(self.interval_ms)
174    }
175
176    pub fn delay(&self) -> Duration {
177        Duration::from_millis(self.delay_ms)
178    }
179}
180
181impl Default for PrunerConfig {
182    fn default() -> Self {
183        Self {
184            interval_ms: 300_000,
185            delay_ms: 120_000,
186            retention: 4_000_000,
187            max_chunk_size: 2_000,
188            prune_concurrency: 1,
189        }
190    }
191}
192
193/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
194/// strictly after the `watermark` (or from the beginning if no watermark was provided).
195///
196/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
197/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
198/// the database, a committer which writes the rows out concurrently, and a watermark task to
199/// update the high watermark.
200///
201/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
202/// either because it received the checkpoints out-of-order or because of variance in processing
203/// time.
204///
205/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
206/// watermark below which all data has been committed (modulo pruning).
207///
208/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
209/// channels are created to communicate between its various components. The pipeline can be
210/// shutdown using its `cancel` token, and will also shutdown if any of its independent tasks
211/// reports an issue.
212pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
213    handler: H,
214    next_checkpoint: u64,
215    config: ConcurrentConfig,
216    store: H::Store,
217    task: Option<Task>,
218    checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
219    metrics: Arc<IndexerMetrics>,
220    cancel: CancellationToken,
221) -> JoinHandle<()> {
222    info!(
223        pipeline = H::NAME,
224        "Starting pipeline with config: {:?}", config
225    );
226    let ConcurrentConfig {
227        committer: committer_config,
228        pruner: pruner_config,
229    } = config;
230
231    let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
232    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
233    let (collector_tx, committer_rx) =
234        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
235    //docs::/#buff
236    let (committer_tx, watermark_rx) =
237        mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
238
239    // The pruner is not connected to the rest of the tasks by channels, so it needs to be
240    // explicitly signalled to shutdown when the other tasks shutdown, in addition to listening to
241    // the global cancel signal. We achieve this by creating a child cancel token that we call
242    // cancel on once the committer tasks have shutdown.
243    let pruner_cancel = cancel.child_token();
244    let handler = Arc::new(handler);
245
246    let main_reader_lo = Arc::new(SetOnce::<AtomicU64>::new());
247
248    let main_reader_lo_task = track_main_reader_lo::<H>(
249        main_reader_lo.clone(),
250        task.as_ref().map(|t| t.reader_interval),
251        pruner_cancel.clone(),
252        store.clone(),
253    );
254
255    let processor = processor(
256        handler.clone(),
257        checkpoint_rx,
258        processor_tx,
259        metrics.clone(),
260        cancel.clone(),
261    );
262
263    let collector = collector::<H>(
264        handler.clone(),
265        committer_config.clone(),
266        collector_rx,
267        collector_tx,
268        main_reader_lo.clone(),
269        metrics.clone(),
270        cancel.clone(),
271    );
272
273    let committer = committer::<H>(
274        handler.clone(),
275        committer_config.clone(),
276        committer_rx,
277        committer_tx,
278        store.clone(),
279        metrics.clone(),
280        cancel.clone(),
281    );
282
283    let commit_watermark = commit_watermark::<H>(
284        next_checkpoint,
285        committer_config,
286        watermark_rx,
287        store.clone(),
288        task.as_ref().map(|t| t.task.clone()),
289        metrics.clone(),
290        cancel,
291    );
292
293    let reader_watermark = reader_watermark::<H>(
294        pruner_config.clone(),
295        store.clone(),
296        metrics.clone(),
297        pruner_cancel.clone(),
298    );
299
300    let pruner = pruner(
301        handler,
302        pruner_config,
303        store,
304        metrics,
305        pruner_cancel.clone(),
306    );
307
308    tokio::spawn(async move {
309        let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
310
311        pruner_cancel.cancel();
312        let _ = futures::join!(main_reader_lo_task, reader_watermark, pruner);
313    })
314}
315
316#[cfg(test)]
317mod tests {
318    use std::{sync::Arc, time::Duration};
319
320    use prometheus::Registry;
321    use tokio::{sync::mpsc, time::timeout};
322    use tokio_util::sync::CancellationToken;
323
324    use crate::{
325        FieldCount,
326        metrics::IndexerMetrics,
327        mocks::store::{MockConnection, MockStore},
328        pipeline::Processor,
329        types::{
330            full_checkpoint_content::Checkpoint,
331            test_checkpoint_data_builder::TestCheckpointBuilder,
332        },
333    };
334
335    use super::*;
336
337    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
338    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
339
340    #[derive(Clone, Debug, FieldCount)]
341    struct TestValue {
342        checkpoint: u64,
343        data: u64,
344    }
345
346    struct DataPipeline;
347
348    #[async_trait]
349    impl Processor for DataPipeline {
350        const NAME: &'static str = "test_handler";
351        const FANOUT: usize = 2;
352        type Value = TestValue;
353
354        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
355            let cp_num = checkpoint.summary.sequence_number;
356
357            // Every checkpoint will come with 2 processed values
358            Ok(vec![
359                TestValue {
360                    checkpoint: cp_num,
361                    data: cp_num * 10 + 1,
362                },
363                TestValue {
364                    checkpoint: cp_num,
365                    data: cp_num * 10 + 2,
366                },
367            ])
368        }
369    }
370
371    #[async_trait]
372    impl Handler for DataPipeline {
373        type Store = MockStore;
374        type Batch = Vec<TestValue>;
375
376        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
377        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
378        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
379
380        fn batch(
381            &self,
382            batch: &mut Self::Batch,
383            values: &mut std::vec::IntoIter<Self::Value>,
384        ) -> BatchStatus {
385            // Take all values
386            batch.extend(values);
387            BatchStatus::Pending
388        }
389
390        async fn commit<'a>(
391            &self,
392            batch: &Self::Batch,
393            conn: &mut MockConnection<'a>,
394        ) -> anyhow::Result<usize> {
395            // Group values by checkpoint
396            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
397                std::collections::HashMap::new();
398            for value in batch {
399                grouped
400                    .entry(value.checkpoint)
401                    .or_default()
402                    .push(value.data);
403            }
404
405            // Commit all data at once
406            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
407        }
408
409        async fn prune<'a>(
410            &self,
411            from: u64,
412            to_exclusive: u64,
413            conn: &mut MockConnection<'a>,
414        ) -> anyhow::Result<usize> {
415            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
416        }
417    }
418
419    struct TestSetup {
420        store: MockStore,
421        checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
422        pipeline_handle: JoinHandle<()>,
423        cancel: CancellationToken,
424    }
425
426    impl TestSetup {
427        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
428            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
429            let metrics = IndexerMetrics::new(None, &Registry::default());
430            let cancel = CancellationToken::new();
431
432            let pipeline_handle = pipeline(
433                DataPipeline,
434                next_checkpoint,
435                config,
436                store.clone(),
437                None,
438                checkpoint_rx,
439                metrics,
440                cancel.clone(),
441            );
442
443            Self {
444                store,
445                checkpoint_tx,
446                pipeline_handle,
447                cancel,
448            }
449        }
450
451        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
452            let checkpoint = Arc::new(
453                TestCheckpointBuilder::new(checkpoint)
454                    .with_epoch(1)
455                    .with_network_total_transactions(checkpoint * 2)
456                    .with_timestamp_ms(1000000000 + checkpoint * 1000)
457                    .build_checkpoint(),
458            );
459            self.checkpoint_tx.send(checkpoint).await?;
460            Ok(())
461        }
462
463        async fn shutdown(self) {
464            drop(self.checkpoint_tx);
465            self.cancel.cancel();
466            let _ = self.pipeline_handle.await;
467        }
468
469        async fn send_checkpoint_with_timeout(
470            &self,
471            checkpoint: u64,
472            timeout_duration: Duration,
473        ) -> anyhow::Result<()> {
474            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
475        }
476
477        async fn send_checkpoint_expect_timeout(
478            &self,
479            checkpoint: u64,
480            timeout_duration: Duration,
481        ) {
482            timeout(timeout_duration, self.send_checkpoint(checkpoint))
483                .await
484                .unwrap_err(); // Panics if send succeeds
485        }
486    }
487
488    #[tokio::test]
489    async fn test_e2e_pipeline() {
490        let config = ConcurrentConfig {
491            pruner: Some(PrunerConfig {
492                interval_ms: 5_000, // Long interval to test states before pruning
493                delay_ms: 100,      // Short delay for faster tests
494                retention: 3,       // Keep only 3 checkpoints
495                ..Default::default()
496            }),
497            ..Default::default()
498        };
499        let store = MockStore::default();
500        let setup = TestSetup::new(config, store, 0).await;
501
502        // Send initial checkpoints
503        for i in 0..3 {
504            setup
505                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
506                .await
507                .unwrap();
508        }
509
510        // Verify all initial data is available (before any pruning)
511        for i in 0..3 {
512            let data = setup
513                .store
514                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
515                .await;
516            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
517        }
518
519        // Add more checkpoints to trigger pruning
520        for i in 3..6 {
521            setup
522                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
523                .await
524                .unwrap();
525        }
526
527        // Verify data is still available BEFORE pruning kicks in
528        // With long pruning interval (5s), we can safely verify data without race conditions
529        for i in 0..6 {
530            let data = setup
531                .store
532                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
533                .await;
534            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
535        }
536
537        // Wait for pruning to occur (5s + delay + processing time)
538        tokio::time::sleep(Duration::from_millis(5_200)).await;
539
540        // Verify pruning has occurred
541        {
542            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
543
544            // Verify recent checkpoints are still available
545            assert!(data.contains_key(&3));
546            assert!(data.contains_key(&4));
547            assert!(data.contains_key(&5));
548
549            // Verify old checkpoints are pruned
550            assert!(!data.contains_key(&0));
551            assert!(!data.contains_key(&1));
552            assert!(!data.contains_key(&2));
553        };
554
555        setup.shutdown().await;
556    }
557
558    #[tokio::test]
559    async fn test_e2e_pipeline_without_pruning() {
560        let config = ConcurrentConfig {
561            pruner: None,
562            ..Default::default()
563        };
564        let store = MockStore::default();
565        let setup = TestSetup::new(config, store, 0).await;
566
567        // Send several checkpoints
568        for i in 0..10 {
569            setup
570                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
571                .await
572                .unwrap();
573        }
574
575        // Wait for all data to be processed and committed
576        let watermark = setup
577            .store
578            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
579            .await;
580
581        // Verify ALL data was processed correctly (no pruning)
582        for i in 0..10 {
583            let data = setup
584                .store
585                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
586                .await;
587            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
588        }
589
590        // Verify watermark progression
591        assert_eq!(watermark.checkpoint_hi_inclusive, 9);
592        assert_eq!(watermark.tx_hi, 18); // 9 * 2
593        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
594
595        // Verify no data was pruned - all 10 checkpoints should still exist
596        let total_checkpoints = {
597            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
598            data.len()
599        };
600        assert_eq!(total_checkpoints, 10);
601
602        setup.shutdown().await;
603    }
604
605    #[tokio::test]
606    async fn test_out_of_order_processing() {
607        let config = ConcurrentConfig::default();
608        let store = MockStore::default();
609        let setup = TestSetup::new(config, store, 0).await;
610
611        // Send checkpoints out of order
612        let checkpoints = vec![2, 0, 4, 1, 3];
613        for cp in checkpoints {
614            setup
615                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
616                .await
617                .unwrap();
618        }
619
620        // Wait for all data to be processed
621        setup
622            .store
623            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
624            .await;
625
626        // Verify all checkpoints were processed correctly despite out-of-order arrival
627        for i in 0..5 {
628            let data = setup
629                .store
630                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
631                .await;
632            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
633        }
634
635        setup.shutdown().await;
636    }
637
638    #[tokio::test]
639    async fn test_watermark_progression_with_gaps() {
640        let config = ConcurrentConfig::default();
641        let store = MockStore::default();
642        let setup = TestSetup::new(config, store, 0).await;
643
644        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
645        for cp in [0, 1, 3, 4] {
646            setup
647                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
648                .await
649                .unwrap();
650        }
651
652        // Wait for processing
653        tokio::time::sleep(Duration::from_secs(1)).await;
654
655        // Watermark should only progress to 1 (can't progress past the gap)
656        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
657        assert_eq!(watermark.checkpoint_hi_inclusive, 1);
658
659        // Now send the missing checkpoint 2
660        setup
661            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
662            .await
663            .unwrap();
664
665        // Now watermark should progress to 4
666        let watermark = setup
667            .store
668            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
669            .await;
670        assert_eq!(watermark.checkpoint_hi_inclusive, 4);
671
672        setup.shutdown().await;
673    }
674
675    // ==================== BACK-PRESSURE TESTING ====================
676
677    #[tokio::test]
678    async fn test_back_pressure_collector_max_pending_rows() {
679        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
680        //
681        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
682        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
683        // │   Input    │    │ (FANOUT=2) │    │            │    │            │
684        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
685        //                │                 │                 │
686        //              [●●●]           [●●●●●●●]         [●●●●●●]
687        //            buffer: 3         buffer: 7         buffer: 6
688        //
689        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
690
691        let config = ConcurrentConfig {
692            committer: CommitterConfig {
693                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
694                write_concurrency: 1,
695                ..Default::default()
696            },
697            ..Default::default()
698        };
699        let store = MockStore::default();
700        let setup = TestSetup::new(config, store, 0).await;
701
702        // Wait for initial setup
703        tokio::time::sleep(Duration::from_millis(200)).await;
704
705        // Pipeline capacity analysis with collector back pressure:
706        // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
707        //
708        // Channel and task breakdown:
709        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
710        // - Processor tasks: 2 tasks (FANOUT=2)
711        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
712        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
713        //
714        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
715
716        // Fill pipeline to capacity - these should all succeed
717        for i in 0..14 {
718            setup
719                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
720                .await
721                .unwrap();
722        }
723
724        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
725        setup
726            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
727            .await;
728
729        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
730        setup
731            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
732            .await
733            .unwrap();
734
735        // Verify data was processed correctly
736        let data = setup
737            .store
738            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
739            .await;
740        assert_eq!(data, vec![1, 2]);
741
742        setup.shutdown().await;
743    }
744
745    #[tokio::test]
746    async fn test_back_pressure_committer_slow_commits() {
747        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
748        //
749        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
750        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
751        // │   Input    │    │ (FANOUT=2) │    │            │    │🐌 10s Delay│
752        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
753        //                │                 │                 │
754        //              [●●●]           [●●●●●●●]         [●●●●●●]
755        //            buffer: 3         buffer: 7         buffer: 6
756        //
757        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
758
759        let config = ConcurrentConfig {
760            committer: CommitterConfig {
761                write_concurrency: 1, // Single committer for deterministic blocking
762                ..Default::default()
763            },
764            ..Default::default()
765        };
766        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
767        let setup = TestSetup::new(config, store, 0).await;
768
769        // Pipeline capacity analysis with slow commits:
770        // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
771        //
772        // Channel and task breakdown:
773        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
774        // - Processor tasks: 2 tasks (FANOUT=2)
775        // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
776        // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
777        // - Committer task: 1 task (blocked by slow commit)
778        //
779        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
780
781        // Fill pipeline to theoretical capacity - these should all succeed
782        for i in 0..19 {
783            setup
784                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
785                .await
786                .unwrap();
787        }
788
789        // Find the actual back pressure point
790        // Due to non-determinism in collector's tokio::select!, the collector may consume
791        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
792        // This means back pressure occurs somewhere in range 19-21.
793        let mut back_pressure_checkpoint = None;
794        for checkpoint in 19..22 {
795            if setup
796                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
797                .await
798                .is_err()
799            {
800                back_pressure_checkpoint = Some(checkpoint);
801                break;
802            }
803        }
804        assert!(
805            back_pressure_checkpoint.is_some(),
806            "Back pressure should occur between checkpoints 19-21"
807        );
808
809        // Verify that some data has been processed (pipeline is working)
810        setup
811            .store
812            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
813            .await;
814
815        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
816        setup
817            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
818            .await
819            .unwrap();
820
821        setup.shutdown().await;
822    }
823
824    // ==================== FAILURE TESTING ====================
825
826    #[tokio::test]
827    async fn test_commit_failure_retry() {
828        let config = ConcurrentConfig::default();
829        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
830        let setup = TestSetup::new(config, store, 0).await;
831
832        // Send a checkpoint
833        setup
834            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
835            .await
836            .unwrap();
837
838        // Should eventually succeed despite initial commit failures
839        setup
840            .store
841            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
842            .await;
843
844        // Verify data was eventually committed
845        let data = setup
846            .store
847            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
848            .await;
849        assert_eq!(data, vec![1, 2]);
850
851        setup.shutdown().await;
852    }
853
854    #[tokio::test]
855    async fn test_prune_failure_retry() {
856        let config = ConcurrentConfig {
857            pruner: Some(PrunerConfig {
858                interval_ms: 2000, // 2 seconds interval for testing
859                delay_ms: 100,     // Short delay
860                retention: 2,      // Keep only 2 checkpoints
861                ..Default::default()
862            }),
863            ..Default::default()
864        };
865
866        // Configure prune failures for range [0, 2) - fail twice then succeed
867        let store = MockStore::default().with_prune_failures(0, 2, 1);
868        let setup = TestSetup::new(config, store, 0).await;
869
870        // Send enough checkpoints to trigger pruning
871        for i in 0..4 {
872            setup
873                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
874                .await
875                .unwrap();
876        }
877
878        // Verify data is still available BEFORE pruning kicks in
879        // With long pruning interval (5s), we can safely verify data without race conditions
880        for i in 0..4 {
881            let data = setup
882                .store
883                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
884                .await;
885            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
886        }
887
888        // Wait for first pruning attempt (should fail) and verify no data has been pruned
889        setup
890            .store
891            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
892            .await;
893        {
894            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
895            for i in 0..4 {
896                assert!(data.contains_key(&i));
897            }
898        };
899
900        // Wait for second pruning attempt (should succeed)
901        setup
902            .store
903            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
904            .await;
905        {
906            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
907            // Verify recent checkpoints are still available
908            assert!(data.contains_key(&2));
909            assert!(data.contains_key(&3));
910
911            // Verify old checkpoints are pruned
912            assert!(!data.contains_key(&0));
913            assert!(!data.contains_key(&1));
914        };
915
916        setup.shutdown().await;
917    }
918
919    #[tokio::test]
920    async fn test_reader_watermark_failure_retry() {
921        let config = ConcurrentConfig {
922            pruner: Some(PrunerConfig {
923                interval_ms: 100, // Fast interval for testing
924                delay_ms: 100,    // Short delay
925                retention: 3,     // Keep 3 checkpoints
926                ..Default::default()
927            }),
928            ..Default::default()
929        };
930
931        // Configure reader watermark failures - fail 2 times then succeed
932        let store = MockStore::default().with_reader_watermark_failures(2);
933        let setup = TestSetup::new(config, store, 0).await;
934
935        // Send checkpoints to trigger reader watermark updates
936        for i in 0..6 {
937            setup
938                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
939                .await
940                .unwrap();
941        }
942
943        // Wait for processing to complete
944        setup
945            .store
946            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
947            .await;
948
949        // Wait for reader watermark task to attempt updates (with failures and retries)
950        tokio::time::sleep(Duration::from_secs(2)).await;
951
952        // Verify that reader watermark was eventually updated despite failures
953        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
954        assert_eq!(watermark.reader_lo, 3);
955
956        setup.shutdown().await;
957    }
958
959    #[tokio::test]
960    async fn test_database_connection_failure_retry() {
961        let config = ConcurrentConfig::default();
962        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
963        let setup = TestSetup::new(config, store, 0).await;
964
965        // Send a checkpoint
966        setup
967            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
968            .await
969            .unwrap();
970
971        // Should eventually succeed despite initial failures
972        setup
973            .store
974            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
975            .await;
976
977        // Verify data was eventually committed
978        let data = setup
979            .store
980            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
981            .await;
982        assert_eq!(data, vec![1, 2]);
983
984        setup.shutdown().await;
985    }
986}