sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::ConcurrentStore;
30use crate::store::Store;
31
32mod collector;
33mod commit_watermark;
34mod committer;
35mod main_reader_lo;
36mod pruner;
37mod reader_watermark;
38
39/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum BatchStatus {
42    /// The batch can accept more values.
43    Pending,
44    /// The batch is full and should be committed.
45    Ready,
46}
47
48/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
49/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
50///
51/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
52/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
53/// usage, but each handle may choose to override these defaults, e.g.
54///
55/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
56///   sizes).
57/// - Handlers that do more work during processing may wish to increase their fanout so more of it
58///   can be done concurrently, to preserve throughput.
59///
60/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
61/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
62/// checkpoint below which all data has been committed.
63///
64/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
65/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
66/// back to the ingestion service.
67#[async_trait]
68pub trait Handler: Processor {
69    type Store: ConcurrentStore;
70    type Batch: Default + Send + Sync + 'static;
71
72    /// If at least this many rows are pending, the committer will commit them eagerly.
73    const MIN_EAGER_ROWS: usize = 50;
74
75    /// If there are more than this many rows pending, the committer applies backpressure.
76    const MAX_PENDING_ROWS: usize = 5000;
77
78    /// The maximum number of watermarks that can show up in a single batch.
79    /// This limit exists to deal with pipelines that produce no data for a majority of
80    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
81    const MAX_WATERMARK_UPDATES: usize = 10_000;
82
83    /// Add values from the iterator to the batch. The implementation may take all, some, or none
84    /// of the values from the iterator by calling `.next()`.
85    ///
86    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
87    /// or `BatchStatus::Pending` if the batch can accept more values.
88    ///
89    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
90    /// may also decide to commit a batch based on the trait parameters above.
91    fn batch(
92        &self,
93        batch: &mut Self::Batch,
94        values: &mut std::vec::IntoIter<Self::Value>,
95    ) -> BatchStatus;
96
97    /// Commit the batch to the database, returning the number of rows affected.
98    async fn commit<'a>(
99        &self,
100        batch: &Self::Batch,
101        conn: &mut <Self::Store as Store>::Connection<'a>,
102    ) -> anyhow::Result<usize>;
103
104    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
105    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
106    async fn prune<'a>(
107        &self,
108        _from: u64,
109        _to_exclusive: u64,
110        _conn: &mut <Self::Store as Store>::Connection<'a>,
111    ) -> anyhow::Result<usize> {
112        Ok(0)
113    }
114}
115
116/// Configuration for a concurrent pipeline
117#[derive(Serialize, Deserialize, Debug, Clone, Default)]
118pub struct ConcurrentConfig {
119    /// Configuration for the writer, that makes forward progress.
120    pub committer: CommitterConfig,
121
122    /// Configuration for the pruner, that deletes old data.
123    pub pruner: Option<PrunerConfig>,
124
125    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
126    pub fanout: Option<ConcurrencyConfig>,
127
128    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
129    pub min_eager_rows: Option<usize>,
130
131    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
132    pub max_pending_rows: Option<usize>,
133
134    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
135    pub max_watermark_updates: Option<usize>,
136
137    /// Size of the channel between the processor and collector.
138    pub processor_channel_size: Option<usize>,
139
140    /// Size of the channel between the collector and committer.
141    pub collector_channel_size: Option<usize>,
142
143    /// Size of the channel between the committer and the watermark updater.
144    pub committer_channel_size: Option<usize>,
145}
146
147#[derive(Serialize, Deserialize, Debug, Clone)]
148pub struct PrunerConfig {
149    /// How often the pruner should check whether there is any data to prune, in milliseconds.
150    pub interval_ms: u64,
151
152    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
153    /// this new watermark, in milliseconds.
154    pub delay_ms: u64,
155
156    /// How much data to keep, this is measured in checkpoints.
157    pub retention: u64,
158
159    /// The maximum range to try and prune in one request, measured in checkpoints.
160    pub max_chunk_size: u64,
161
162    /// The max number of tasks to run in parallel for pruning.
163    pub prune_concurrency: u64,
164}
165
166/// Values ready to be written to the database. This is an internal type used to communicate
167/// between the collector and the committer parts of the pipeline.
168///
169/// Values inside each batch may or may not be from the same checkpoint. Values in the same
170/// checkpoint can also be split across multiple batches.
171struct BatchedRows<H: Handler> {
172    /// The batch to write
173    batch: H::Batch,
174    /// Number of rows in the batch
175    batch_len: usize,
176    /// Proportions of all the watermarks that are represented in this chunk
177    watermark: Vec<WatermarkPart>,
178}
179
180impl<H, V> BatchedRows<H>
181where
182    H: Handler<Batch = Vec<V>, Value = V>,
183{
184    #[cfg(test)]
185    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
186        let batch_len = batch.len();
187        Self {
188            batch,
189            batch_len,
190            watermark,
191        }
192    }
193}
194
195impl PrunerConfig {
196    pub fn interval(&self) -> Duration {
197        Duration::from_millis(self.interval_ms)
198    }
199
200    pub fn delay(&self) -> Duration {
201        Duration::from_millis(self.delay_ms)
202    }
203}
204
205impl Default for PrunerConfig {
206    fn default() -> Self {
207        Self {
208            interval_ms: 300_000,
209            delay_ms: 120_000,
210            retention: 4_000_000,
211            max_chunk_size: 2_000,
212            prune_concurrency: 1,
213        }
214    }
215}
216
217/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
218/// strictly after the `watermark` (or from the beginning if no watermark was provided).
219///
220/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
221/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
222/// the database, a committer which writes the rows out concurrently, and a watermark task to
223/// update the high watermark.
224///
225/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
226/// either because it received the checkpoints out-of-order or because of variance in processing
227/// time.
228///
229/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
230/// watermark below which all data has been committed (modulo pruning).
231///
232/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
233/// channels are created to communicate between its various components. The pipeline will shutdown
234/// if any of its input or output channels close, any of its independent tasks fail, or if it is
235/// signalled to shutdown through the returned service handle.
236pub(crate) fn pipeline<H: Handler>(
237    handler: H,
238    next_checkpoint: u64,
239    config: ConcurrentConfig,
240    store: H::Store,
241    task: Option<Task>,
242    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
243    metrics: Arc<IndexerMetrics>,
244) -> Service {
245    info!(
246        pipeline = H::NAME,
247        "Starting pipeline with config: {config:#?}",
248    );
249
250    let ConcurrentConfig {
251        committer: committer_config,
252        pruner: pruner_config,
253        fanout,
254        min_eager_rows,
255        max_pending_rows,
256        max_watermark_updates,
257        processor_channel_size,
258        collector_channel_size,
259        committer_channel_size,
260    } = config;
261
262    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
263        initial: 1,
264        min: 1,
265        max: num_cpus::get(),
266        dead_band: None,
267    });
268    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
269    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
270    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
271
272    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
273    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
274
275    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
276    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
277    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
278    //docs::/#buff
279    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
280    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
281    let main_reader_lo = Arc::new(SetOnce::new());
282
283    let handler = Arc::new(handler);
284
285    let s_processor = processor(
286        handler.clone(),
287        checkpoint_rx,
288        processor_tx,
289        metrics.clone(),
290        concurrency,
291        store.clone(),
292    );
293
294    let s_collector = collector::<H>(
295        handler.clone(),
296        committer_config.clone(),
297        collector_rx,
298        collector_tx,
299        main_reader_lo.clone(),
300        metrics.clone(),
301        min_eager_rows,
302        max_pending_rows,
303        max_watermark_updates,
304    );
305
306    let s_committer = committer::<H>(
307        handler.clone(),
308        committer_config.clone(),
309        committer_rx,
310        committer_tx,
311        store.clone(),
312        metrics.clone(),
313    );
314
315    let s_commit_watermark = commit_watermark::<H>(
316        next_checkpoint,
317        committer_config,
318        watermark_rx,
319        store.clone(),
320        task.as_ref().map(|t| t.task.clone()),
321        metrics.clone(),
322    );
323
324    let s_track_reader_lo = track_main_reader_lo::<H>(
325        main_reader_lo.clone(),
326        task.as_ref().map(|t| t.reader_interval),
327        store.clone(),
328    );
329
330    let s_reader_watermark =
331        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
332
333    let s_pruner = pruner(handler, pruner_config, store, metrics);
334
335    s_processor
336        .merge(s_collector)
337        .merge(s_committer)
338        .merge(s_commit_watermark)
339        .attach(s_track_reader_lo)
340        .attach(s_reader_watermark)
341        .attach(s_pruner)
342}
343
344#[cfg(test)]
345mod tests {
346    use std::sync::Arc;
347    use std::time::Duration;
348
349    use prometheus::Registry;
350    use sui_types::digests::CheckpointDigest;
351    use tokio::sync::mpsc;
352    use tokio::time::timeout;
353
354    use crate::FieldCount;
355    use crate::metrics::IndexerMetrics;
356    use crate::mocks::store::MockConnection;
357    use crate::mocks::store::MockStore;
358    use crate::pipeline::Processor;
359    use crate::types::full_checkpoint_content::Checkpoint;
360    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
361
362    use super::*;
363
364    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
365    const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
366
367    #[derive(Clone, Debug, FieldCount)]
368    struct TestValue {
369        checkpoint: u64,
370        data: u64,
371    }
372
373    struct DataPipeline;
374
375    #[async_trait]
376    impl Processor for DataPipeline {
377        const NAME: &'static str = "test_handler";
378        type Value = TestValue;
379
380        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
381            let cp_num = checkpoint.summary.sequence_number;
382
383            // Every checkpoint will come with 2 processed values
384            Ok(vec![
385                TestValue {
386                    checkpoint: cp_num,
387                    data: cp_num * 10 + 1,
388                },
389                TestValue {
390                    checkpoint: cp_num,
391                    data: cp_num * 10 + 2,
392                },
393            ])
394        }
395    }
396
397    #[async_trait]
398    impl Handler for DataPipeline {
399        type Store = MockStore;
400        type Batch = Vec<TestValue>;
401
402        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
403        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
404        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
405
406        fn batch(
407            &self,
408            batch: &mut Self::Batch,
409            values: &mut std::vec::IntoIter<Self::Value>,
410        ) -> BatchStatus {
411            // Take all values
412            batch.extend(values);
413            BatchStatus::Pending
414        }
415
416        async fn commit<'a>(
417            &self,
418            batch: &Self::Batch,
419            conn: &mut MockConnection<'a>,
420        ) -> anyhow::Result<usize> {
421            // Group values by checkpoint
422            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
423                std::collections::HashMap::new();
424            for value in batch {
425                grouped
426                    .entry(value.checkpoint)
427                    .or_default()
428                    .push(value.data);
429            }
430
431            // Commit all data at once
432            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
433        }
434
435        async fn prune<'a>(
436            &self,
437            from: u64,
438            to_exclusive: u64,
439            conn: &mut MockConnection<'a>,
440        ) -> anyhow::Result<usize> {
441            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
442        }
443    }
444
445    struct TestSetup {
446        store: MockStore,
447        checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
448        #[allow(unused)]
449        pipeline: Service,
450    }
451
452    impl TestSetup {
453        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
454            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
455            let metrics = IndexerMetrics::new(None, &Registry::default());
456
457            let pipeline = pipeline(
458                DataPipeline,
459                next_checkpoint,
460                config,
461                store.clone(),
462                None,
463                checkpoint_rx,
464                metrics,
465            );
466
467            Self {
468                store,
469                checkpoint_tx,
470                pipeline,
471            }
472        }
473
474        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
475            let checkpoint_envelope = Arc::new(CheckpointEnvelope {
476                checkpoint: Arc::new(
477                    TestCheckpointBuilder::new(checkpoint)
478                        .with_epoch(1)
479                        .with_network_total_transactions(checkpoint * 2)
480                        .with_timestamp_ms(1000000000 + checkpoint * 1000)
481                        .build_checkpoint(),
482                ),
483                chain_id: CheckpointDigest::new([1; 32]).into(),
484            });
485            self.checkpoint_tx.send(checkpoint_envelope).await?;
486            Ok(())
487        }
488
489        async fn send_checkpoint_with_timeout(
490            &self,
491            checkpoint: u64,
492            timeout_duration: Duration,
493        ) -> anyhow::Result<()> {
494            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
495        }
496
497        async fn send_checkpoint_expect_timeout(
498            &self,
499            checkpoint: u64,
500            timeout_duration: Duration,
501        ) {
502            timeout(timeout_duration, self.send_checkpoint(checkpoint))
503                .await
504                .unwrap_err(); // Panics if send succeeds
505        }
506    }
507
508    #[tokio::test]
509    async fn test_e2e_pipeline() {
510        let config = ConcurrentConfig {
511            pruner: Some(PrunerConfig {
512                interval_ms: 5_000, // Long interval to test states before pruning
513                delay_ms: 100,      // Short delay for faster tests
514                retention: 3,       // Keep only 3 checkpoints
515                ..Default::default()
516            }),
517            ..Default::default()
518        };
519        let store = MockStore::default();
520        let setup = TestSetup::new(config, store, 0).await;
521
522        // Send initial checkpoints
523        for i in 0..3 {
524            setup
525                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
526                .await
527                .unwrap();
528        }
529
530        // Verify all initial data is available (before any pruning)
531        for i in 0..3 {
532            let data = setup
533                .store
534                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
535                .await;
536            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
537        }
538
539        // Add more checkpoints to trigger pruning
540        for i in 3..6 {
541            setup
542                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
543                .await
544                .unwrap();
545        }
546
547        // Verify data is still available BEFORE pruning kicks in
548        // With long pruning interval (5s), we can safely verify data without race conditions
549        for i in 0..6 {
550            let data = setup
551                .store
552                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
553                .await;
554            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
555        }
556
557        // Wait for pruning to occur. The pruner and reader_watermark tasks both run on
558        // the same interval, so poll until the pruner has caught up rather instead of using a
559        // fixed sleep.
560        let pruning_deadline = Duration::from_secs(15);
561        let start = tokio::time::Instant::now();
562        loop {
563            let pruned = {
564                let data = setup.store.data.get(DataPipeline::NAME).unwrap();
565                !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
566            };
567            if pruned {
568                break;
569            }
570            assert!(
571                start.elapsed() < pruning_deadline,
572                "Timed out waiting for pruning to occur"
573            );
574            tokio::time::sleep(Duration::from_millis(100)).await;
575        }
576
577        // Verify recent checkpoints are still available
578        {
579            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
580            assert!(data.contains_key(&3));
581            assert!(data.contains_key(&4));
582            assert!(data.contains_key(&5));
583        };
584    }
585
586    #[tokio::test]
587    async fn test_e2e_pipeline_without_pruning() {
588        let config = ConcurrentConfig {
589            pruner: None,
590            ..Default::default()
591        };
592        let store = MockStore::default();
593        let setup = TestSetup::new(config, store, 0).await;
594
595        // Send several checkpoints
596        for i in 0..10 {
597            setup
598                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
599                .await
600                .unwrap();
601        }
602
603        // Wait for all data to be processed and committed
604        let watermark = setup
605            .store
606            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
607            .await;
608
609        // Verify ALL data was processed correctly (no pruning)
610        for i in 0..10 {
611            let data = setup
612                .store
613                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
614                .await;
615            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
616        }
617
618        // Verify watermark progression
619        assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
620        assert_eq!(watermark.tx_hi, 18); // 9 * 2
621        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
622
623        // Verify no data was pruned - all 10 checkpoints should still exist
624        let total_checkpoints = {
625            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
626            data.len()
627        };
628        assert_eq!(total_checkpoints, 10);
629    }
630
631    #[tokio::test]
632    async fn test_out_of_order_processing() {
633        let config = ConcurrentConfig::default();
634        let store = MockStore::default();
635        let setup = TestSetup::new(config, store, 0).await;
636
637        // Send checkpoints out of order
638        let checkpoints = vec![2, 0, 4, 1, 3];
639        for cp in checkpoints {
640            setup
641                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
642                .await
643                .unwrap();
644        }
645
646        // Wait for all data to be processed
647        setup
648            .store
649            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
650            .await;
651
652        // Verify all checkpoints were processed correctly despite out-of-order arrival
653        for i in 0..5 {
654            let data = setup
655                .store
656                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
657                .await;
658            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
659        }
660    }
661
662    #[tokio::test]
663    async fn test_watermark_progression_with_gaps() {
664        let config = ConcurrentConfig::default();
665        let store = MockStore::default();
666        let setup = TestSetup::new(config, store, 0).await;
667
668        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
669        for cp in [0, 1, 3, 4] {
670            setup
671                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
672                .await
673                .unwrap();
674        }
675
676        // Wait for processing
677        tokio::time::sleep(Duration::from_secs(1)).await;
678
679        // Watermark should only progress to 1 (can't progress past the gap)
680        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
681        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
682
683        // Now send the missing checkpoint 2
684        setup
685            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
686            .await
687            .unwrap();
688
689        // Now watermark should progress to 4
690        let watermark = setup
691            .store
692            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
693            .await;
694        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
695    }
696
697    // ==================== BACK-PRESSURE TESTING ====================
698
699    #[tokio::test]
700    async fn test_back_pressure_collector_max_pending_rows() {
701        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
702        //
703        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
704        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
705        // │   Input    │    │ (fanout=2) │    │            │    │            │
706        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
707        //                │                 │                 │
708        //              [●●●]           [●●●●●●●]         [●●●●●●]
709        //            buffer: 3         buffer: 7         buffer: 6
710        //
711        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
712
713        let config = ConcurrentConfig {
714            committer: CommitterConfig {
715                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
716                write_concurrency: 1,
717                ..Default::default()
718            },
719            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
720            processor_channel_size: Some(7),
721            collector_channel_size: Some(6),
722            ..Default::default()
723        };
724        let store = MockStore::default();
725        let setup = TestSetup::new(config, store, 0).await;
726
727        // Wait for initial setup
728        tokio::time::sleep(Duration::from_millis(200)).await;
729
730        // Pipeline capacity analysis with collector back pressure:
731        // Configuration: MAX_PENDING_ROWS=4, fanout=2
732        //
733        // Channel and task breakdown:
734        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
735        // - Processor tasks: 2 tasks (fanout=2)
736        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
737        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
738        //
739        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
740
741        // Fill pipeline to capacity - these should all succeed
742        for i in 0..14 {
743            setup
744                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
745                .await
746                .unwrap();
747        }
748
749        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
750        setup
751            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
752            .await;
753
754        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
755        setup
756            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
757            .await
758            .unwrap();
759
760        // Verify data was processed correctly
761        let data = setup
762            .store
763            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
764            .await;
765        assert_eq!(data, vec![1, 2]);
766    }
767
768    #[tokio::test]
769    async fn test_back_pressure_committer_slow_commits() {
770        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
771        //
772        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
773        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
774        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
775        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
776        //                │                 │                 │
777        //              [●●●]           [●●●●●●●]          [●●●●●●]
778        //            buffer: 3    proc_chan: 7       coll_chan: 6
779        //
780        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
781
782        let config = ConcurrentConfig {
783            committer: CommitterConfig {
784                write_concurrency: 1, // Single committer for deterministic blocking
785                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
786                // this test relies on the collect interval tip to force
787                // batch flushing.
788                collect_interval_ms: 10,
789                ..Default::default()
790            },
791            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
792            processor_channel_size: Some(7),
793            collector_channel_size: Some(6),
794            ..Default::default()
795        };
796        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
797        let setup = TestSetup::new(config, store, 0).await;
798
799        // Pipeline capacity analysis with slow commits:
800        // Configuration: fanout=2, write_concurrency=1
801        //
802        // Channel and task breakdown:
803        // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
804        // - Processor tasks: 2 tasks (fanout=2)
805        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
806        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
807        // - Committer task: 1 task (blocked by slow commit)
808        //
809        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
810
811        // Fill pipeline to theoretical capacity - these should all succeed
812        for i in 0..19 {
813            setup
814                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
815                .await
816                .unwrap();
817        }
818
819        // Find the actual back pressure point
820        // Due to non-determinism in collector's tokio::select!, the collector may consume
821        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
822        // This means back pressure occurs somewhere in range 19-21.
823        let mut back_pressure_checkpoint = None;
824        for checkpoint in 19..22 {
825            if setup
826                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
827                .await
828                .is_err()
829            {
830                back_pressure_checkpoint = Some(checkpoint);
831                break;
832            }
833        }
834        assert!(
835            back_pressure_checkpoint.is_some(),
836            "Back pressure should occur between checkpoints 19-21"
837        );
838
839        // Verify that some data has been processed (pipeline is working)
840        setup
841            .store
842            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
843            .await;
844
845        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
846        setup
847            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
848            .await
849            .unwrap();
850    }
851
852    // ==================== FAILURE TESTING ====================
853
854    #[tokio::test]
855    async fn test_commit_failure_retry() {
856        let config = ConcurrentConfig::default();
857        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
858        let setup = TestSetup::new(config, store, 0).await;
859
860        // Send a checkpoint
861        setup
862            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
863            .await
864            .unwrap();
865
866        // Should eventually succeed despite initial commit failures
867        setup
868            .store
869            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
870            .await;
871
872        // Verify data was eventually committed
873        let data = setup
874            .store
875            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
876            .await;
877        assert_eq!(data, vec![1, 2]);
878    }
879
880    #[tokio::test]
881    async fn test_prune_failure_retry() {
882        let config = ConcurrentConfig {
883            pruner: Some(PrunerConfig {
884                interval_ms: 2000, // 2 seconds interval for testing
885                delay_ms: 100,     // Short delay
886                retention: 2,      // Keep only 2 checkpoints
887                ..Default::default()
888            }),
889            ..Default::default()
890        };
891
892        // Configure prune failures for range [0, 2) - fail twice then succeed
893        let store = MockStore::default().with_prune_failures(0, 2, 1);
894        let setup = TestSetup::new(config, store, 0).await;
895
896        // Send enough checkpoints to trigger pruning
897        for i in 0..4 {
898            setup
899                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
900                .await
901                .unwrap();
902        }
903
904        // Verify data is still available BEFORE pruning kicks in
905        // With long pruning interval (5s), we can safely verify data without race conditions
906        for i in 0..4 {
907            let data = setup
908                .store
909                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
910                .await;
911            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
912        }
913
914        // Wait for first pruning attempt (should fail) and verify no data has been pruned
915        setup
916            .store
917            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
918            .await;
919        {
920            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
921            for i in 0..4 {
922                assert!(data.contains_key(&i));
923            }
924        };
925
926        // Wait for second pruning attempt (should succeed)
927        setup
928            .store
929            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
930            .await;
931        {
932            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
933            // Verify recent checkpoints are still available
934            assert!(data.contains_key(&2));
935            assert!(data.contains_key(&3));
936
937            // Verify old checkpoints are pruned
938            assert!(!data.contains_key(&0));
939            assert!(!data.contains_key(&1));
940        };
941    }
942
943    #[tokio::test]
944    async fn test_reader_watermark_failure_retry() {
945        let config = ConcurrentConfig {
946            pruner: Some(PrunerConfig {
947                interval_ms: 100, // Fast interval for testing
948                delay_ms: 100,    // Short delay
949                retention: 3,     // Keep 3 checkpoints
950                ..Default::default()
951            }),
952            ..Default::default()
953        };
954
955        // Configure reader watermark failures - fail 2 times then succeed
956        let store = MockStore::default().with_reader_watermark_failures(2);
957        let setup = TestSetup::new(config, store, 0).await;
958
959        // Send checkpoints to trigger reader watermark updates
960        for i in 0..6 {
961            setup
962                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
963                .await
964                .unwrap();
965        }
966
967        // Wait for processing to complete
968        setup
969            .store
970            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
971            .await;
972
973        // Wait for reader watermark task to attempt updates (with failures and retries)
974        tokio::time::sleep(Duration::from_secs(2)).await;
975
976        // Verify that reader watermark was eventually updated despite failures
977        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
978        assert_eq!(watermark.reader_lo, 3);
979    }
980
981    #[tokio::test]
982    async fn test_database_connection_failure_retry() {
983        let config = ConcurrentConfig::default();
984        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
985        let setup = TestSetup::new(config, store, 0).await;
986
987        // Send a checkpoint
988        setup
989            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
990            .await
991            .unwrap();
992
993        // Should eventually succeed despite initial failures
994        setup
995            .store
996            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
997            .await;
998
999        // Verify data was eventually committed
1000        let data = setup
1001            .store
1002            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1003            .await;
1004        assert_eq!(data, vec![1, 2]);
1005    }
1006}