sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IngestionConfig;
21use crate::pipeline::Processor;
22use crate::pipeline::WatermarkPart;
23use crate::pipeline::concurrent::collector::collector;
24use crate::pipeline::concurrent::commit_watermark::commit_watermark;
25use crate::pipeline::concurrent::committer::committer;
26use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
27use crate::pipeline::concurrent::pruner::pruner;
28use crate::pipeline::concurrent::reader_watermark::reader_watermark;
29use crate::pipeline::processor::processor;
30use crate::store::ConcurrentStore;
31use crate::store::Store;
32
33mod collector;
34mod commit_watermark;
35mod committer;
36mod main_reader_lo;
37mod pruner;
38mod reader_watermark;
39
40/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum BatchStatus {
43    /// The batch can accept more values.
44    Pending,
45    /// The batch is full and should be committed.
46    Ready,
47}
48
49/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
50/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
51///
52/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
53/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
54/// usage, but each handle may choose to override these defaults, e.g.
55///
56/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
57///   sizes).
58/// - Handlers that do more work during processing may wish to increase their fanout so more of it
59///   can be done concurrently, to preserve throughput.
60///
61/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
62/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
63/// checkpoint below which all data has been committed.
64///
65/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
66/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
67/// back to the ingestion service.
68#[async_trait]
69pub trait Handler: Processor {
70    type Store: ConcurrentStore;
71    type Batch: Default + Send + Sync + 'static;
72
73    /// If at least this many rows are pending, the committer will commit them eagerly.
74    const MIN_EAGER_ROWS: usize = 50;
75
76    /// If there are more than this many rows pending, the committer applies backpressure.
77    const MAX_PENDING_ROWS: usize = 5000;
78
79    /// The maximum number of watermarks that can show up in a single batch.
80    /// This limit exists to deal with pipelines that produce no data for a majority of
81    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
82    const MAX_WATERMARK_UPDATES: usize = 10_000;
83
84    /// Add values from the iterator to the batch. The implementation may take all, some, or none
85    /// of the values from the iterator by calling `.next()`.
86    ///
87    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
88    /// or `BatchStatus::Pending` if the batch can accept more values.
89    ///
90    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
91    /// may also decide to commit a batch based on the trait parameters above.
92    fn batch(
93        &self,
94        batch: &mut Self::Batch,
95        values: &mut std::vec::IntoIter<Self::Value>,
96    ) -> BatchStatus;
97
98    /// Commit the batch to the database, returning the number of rows affected.
99    async fn commit<'a>(
100        &self,
101        batch: &Self::Batch,
102        conn: &mut <Self::Store as Store>::Connection<'a>,
103    ) -> anyhow::Result<usize>;
104
105    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
106    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
107    async fn prune<'a>(
108        &self,
109        _from: u64,
110        _to_exclusive: u64,
111        _conn: &mut <Self::Store as Store>::Connection<'a>,
112    ) -> anyhow::Result<usize> {
113        Ok(0)
114    }
115}
116
117/// Configuration for a concurrent pipeline
118#[derive(Serialize, Deserialize, Debug, Clone, Default)]
119pub struct ConcurrentConfig {
120    /// Configuration for the writer, that makes forward progress.
121    pub committer: CommitterConfig,
122
123    /// Per-pipeline ingestion overrides.
124    pub ingestion: IngestionConfig,
125
126    /// Configuration for the pruner, that deletes old data.
127    pub pruner: Option<PrunerConfig>,
128
129    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
130    pub fanout: Option<ConcurrencyConfig>,
131
132    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
133    pub min_eager_rows: Option<usize>,
134
135    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
136    pub max_pending_rows: Option<usize>,
137
138    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
139    pub max_watermark_updates: Option<usize>,
140
141    /// Size of the channel between the processor and collector.
142    pub processor_channel_size: Option<usize>,
143
144    /// Size of the channel between the collector and committer.
145    pub collector_channel_size: Option<usize>,
146
147    /// Size of the channel between the committer and the watermark updater.
148    pub committer_channel_size: Option<usize>,
149}
150
151#[derive(Serialize, Deserialize, Debug, Clone)]
152pub struct PrunerConfig {
153    /// How often the pruner should check whether there is any data to prune, in milliseconds.
154    pub interval_ms: u64,
155
156    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
157    /// this new watermark, in milliseconds.
158    pub delay_ms: u64,
159
160    /// How much data to keep, this is measured in checkpoints.
161    pub retention: u64,
162
163    /// The maximum range to try and prune in one request, measured in checkpoints.
164    pub max_chunk_size: u64,
165
166    /// The max number of tasks to run in parallel for pruning.
167    pub prune_concurrency: u64,
168}
169
170/// Values ready to be written to the database. This is an internal type used to communicate
171/// between the collector and the committer parts of the pipeline.
172///
173/// Values inside each batch may or may not be from the same checkpoint. Values in the same
174/// checkpoint can also be split across multiple batches.
175struct BatchedRows<H: Handler> {
176    /// The batch to write
177    batch: H::Batch,
178    /// Number of rows in the batch
179    batch_len: usize,
180    /// Proportions of all the watermarks that are represented in this chunk
181    watermark: Vec<WatermarkPart>,
182}
183
184impl<H, V> BatchedRows<H>
185where
186    H: Handler<Batch = Vec<V>, Value = V>,
187{
188    #[cfg(test)]
189    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
190        let batch_len = batch.len();
191        Self {
192            batch,
193            batch_len,
194            watermark,
195        }
196    }
197}
198
199impl PrunerConfig {
200    pub fn interval(&self) -> Duration {
201        Duration::from_millis(self.interval_ms)
202    }
203
204    pub fn delay(&self) -> Duration {
205        Duration::from_millis(self.delay_ms)
206    }
207}
208
209impl Default for PrunerConfig {
210    fn default() -> Self {
211        Self {
212            interval_ms: 300_000,
213            delay_ms: 120_000,
214            retention: 4_000_000,
215            max_chunk_size: 2_000,
216            prune_concurrency: 1,
217        }
218    }
219}
220
221/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
222/// strictly after the `watermark` (or from the beginning if no watermark was provided).
223///
224/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
225/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
226/// the database, a committer which writes the rows out concurrently, and a watermark task to
227/// update the high watermark.
228///
229/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
230/// either because it received the checkpoints out-of-order or because of variance in processing
231/// time.
232///
233/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
234/// watermark below which all data has been committed (modulo pruning).
235///
236/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
237/// channels are created to communicate between its various components. The pipeline will shutdown
238/// if any of its input or output channels close, any of its independent tasks fail, or if it is
239/// signalled to shutdown through the returned service handle.
240pub(crate) fn pipeline<H: Handler>(
241    handler: H,
242    next_checkpoint: u64,
243    config: ConcurrentConfig,
244    store: H::Store,
245    task: Option<Task>,
246    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
247    metrics: Arc<IndexerMetrics>,
248) -> Service {
249    info!(
250        pipeline = H::NAME,
251        "Starting pipeline with config: {config:#?}",
252    );
253
254    let ConcurrentConfig {
255        committer: committer_config,
256        ingestion: _,
257        pruner: pruner_config,
258        fanout,
259        min_eager_rows,
260        max_pending_rows,
261        max_watermark_updates,
262        processor_channel_size,
263        collector_channel_size,
264        committer_channel_size,
265    } = config;
266
267    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
268        initial: 1,
269        min: 1,
270        max: num_cpus::get(),
271        dead_band: None,
272    });
273    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
274    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
275    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
276
277    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
278    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
279
280    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
281    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
282    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
283    //docs::/#buff
284    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
285    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
286    let main_reader_lo = Arc::new(SetOnce::new());
287
288    let handler = Arc::new(handler);
289
290    let s_processor = processor(
291        handler.clone(),
292        checkpoint_rx,
293        processor_tx,
294        metrics.clone(),
295        concurrency,
296        store.clone(),
297    );
298
299    let s_collector = collector::<H>(
300        handler.clone(),
301        committer_config.clone(),
302        collector_rx,
303        collector_tx,
304        main_reader_lo.clone(),
305        metrics.clone(),
306        min_eager_rows,
307        max_pending_rows,
308        max_watermark_updates,
309    );
310
311    let s_committer = committer::<H>(
312        handler.clone(),
313        committer_config.clone(),
314        committer_rx,
315        committer_tx,
316        store.clone(),
317        metrics.clone(),
318    );
319
320    let s_commit_watermark = commit_watermark::<H>(
321        next_checkpoint,
322        committer_config,
323        watermark_rx,
324        store.clone(),
325        task.as_ref().map(|t| t.task.clone()),
326        metrics.clone(),
327    );
328
329    let s_track_reader_lo = track_main_reader_lo::<H>(
330        main_reader_lo.clone(),
331        task.as_ref().map(|t| t.reader_interval),
332        store.clone(),
333    );
334
335    let s_reader_watermark =
336        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
337
338    let s_pruner = pruner(handler, pruner_config, store, metrics);
339
340    s_processor
341        .merge(s_collector)
342        .merge(s_committer)
343        .merge(s_commit_watermark)
344        .attach(s_track_reader_lo)
345        .attach(s_reader_watermark)
346        .attach(s_pruner)
347}
348
349#[cfg(test)]
350mod tests {
351    use std::sync::Arc;
352    use std::time::Duration;
353
354    use prometheus::Registry;
355    use sui_types::digests::CheckpointDigest;
356    use tokio::sync::mpsc;
357    use tokio::time::timeout;
358
359    use crate::FieldCount;
360    use crate::metrics::IndexerMetrics;
361    use crate::mocks::store::MockConnection;
362    use crate::mocks::store::MockStore;
363    use crate::pipeline::Processor;
364    use crate::types::full_checkpoint_content::Checkpoint;
365    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
366
367    use super::*;
368
369    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
370    const TEST_SUBSCRIBER_CHANNEL_SIZE: usize = 3; // Critical for back-pressure testing calculations
371
372    #[derive(Clone, Debug, FieldCount)]
373    struct TestValue {
374        checkpoint: u64,
375        data: u64,
376    }
377
378    struct DataPipeline;
379
380    #[async_trait]
381    impl Processor for DataPipeline {
382        const NAME: &'static str = "test_handler";
383        type Value = TestValue;
384
385        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
386            let cp_num = checkpoint.summary.sequence_number;
387
388            // Every checkpoint will come with 2 processed values
389            Ok(vec![
390                TestValue {
391                    checkpoint: cp_num,
392                    data: cp_num * 10 + 1,
393                },
394                TestValue {
395                    checkpoint: cp_num,
396                    data: cp_num * 10 + 2,
397                },
398            ])
399        }
400    }
401
402    #[async_trait]
403    impl Handler for DataPipeline {
404        type Store = MockStore;
405        type Batch = Vec<TestValue>;
406
407        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
408        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
409        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
410
411        fn batch(
412            &self,
413            batch: &mut Self::Batch,
414            values: &mut std::vec::IntoIter<Self::Value>,
415        ) -> BatchStatus {
416            // Take all values
417            batch.extend(values);
418            BatchStatus::Pending
419        }
420
421        async fn commit<'a>(
422            &self,
423            batch: &Self::Batch,
424            conn: &mut MockConnection<'a>,
425        ) -> anyhow::Result<usize> {
426            // Group values by checkpoint
427            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
428                std::collections::HashMap::new();
429            for value in batch {
430                grouped
431                    .entry(value.checkpoint)
432                    .or_default()
433                    .push(value.data);
434            }
435
436            // Commit all data at once
437            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
438        }
439
440        async fn prune<'a>(
441            &self,
442            from: u64,
443            to_exclusive: u64,
444            conn: &mut MockConnection<'a>,
445        ) -> anyhow::Result<usize> {
446            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
447        }
448    }
449
450    struct TestSetup {
451        store: MockStore,
452        checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
453        #[allow(unused)]
454        pipeline: Service,
455    }
456
457    impl TestSetup {
458        async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
459            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_SUBSCRIBER_CHANNEL_SIZE);
460            let metrics = IndexerMetrics::new(None, &Registry::default());
461
462            let pipeline = pipeline(
463                DataPipeline,
464                next_checkpoint,
465                config,
466                store.clone(),
467                None,
468                checkpoint_rx,
469                metrics,
470            );
471
472            Self {
473                store,
474                checkpoint_tx,
475                pipeline,
476            }
477        }
478
479        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
480            let checkpoint_envelope = Arc::new(CheckpointEnvelope {
481                checkpoint: Arc::new(
482                    TestCheckpointBuilder::new(checkpoint)
483                        .with_epoch(1)
484                        .with_network_total_transactions(checkpoint * 2)
485                        .with_timestamp_ms(1000000000 + checkpoint * 1000)
486                        .build_checkpoint(),
487                ),
488                chain_id: CheckpointDigest::new([1; 32]).into(),
489            });
490            self.checkpoint_tx.send(checkpoint_envelope).await?;
491            Ok(())
492        }
493
494        async fn send_checkpoint_with_timeout(
495            &self,
496            checkpoint: u64,
497            timeout_duration: Duration,
498        ) -> anyhow::Result<()> {
499            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
500        }
501
502        async fn send_checkpoint_expect_timeout(
503            &self,
504            checkpoint: u64,
505            timeout_duration: Duration,
506        ) {
507            timeout(timeout_duration, self.send_checkpoint(checkpoint))
508                .await
509                .unwrap_err(); // Panics if send succeeds
510        }
511    }
512
513    #[tokio::test]
514    async fn test_e2e_pipeline() {
515        let config = ConcurrentConfig {
516            pruner: Some(PrunerConfig {
517                interval_ms: 5_000, // Long interval to test states before pruning
518                delay_ms: 100,      // Short delay for faster tests
519                retention: 3,       // Keep only 3 checkpoints
520                ..Default::default()
521            }),
522            ..Default::default()
523        };
524        let store = MockStore::default();
525        let setup = TestSetup::new(config, store, 0).await;
526
527        // Send initial checkpoints
528        for i in 0..3 {
529            setup
530                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
531                .await
532                .unwrap();
533        }
534
535        // Verify all initial data is available (before any pruning)
536        for i in 0..3 {
537            let data = setup
538                .store
539                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
540                .await;
541            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
542        }
543
544        // Add more checkpoints to trigger pruning
545        for i in 3..6 {
546            setup
547                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
548                .await
549                .unwrap();
550        }
551
552        // Verify data is still available BEFORE pruning kicks in
553        // With long pruning interval (5s), we can safely verify data without race conditions
554        for i in 0..6 {
555            let data = setup
556                .store
557                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
558                .await;
559            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
560        }
561
562        // Wait for pruning to occur. The pruner and reader_watermark tasks both run on
563        // the same interval, so poll until the pruner has caught up rather instead of using a
564        // fixed sleep.
565        let pruning_deadline = Duration::from_secs(15);
566        let start = tokio::time::Instant::now();
567        loop {
568            let pruned = {
569                let data = setup.store.data.get(DataPipeline::NAME).unwrap();
570                !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
571            };
572            if pruned {
573                break;
574            }
575            assert!(
576                start.elapsed() < pruning_deadline,
577                "Timed out waiting for pruning to occur"
578            );
579            tokio::time::sleep(Duration::from_millis(100)).await;
580        }
581
582        // Verify recent checkpoints are still available
583        {
584            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
585            assert!(data.contains_key(&3));
586            assert!(data.contains_key(&4));
587            assert!(data.contains_key(&5));
588        };
589    }
590
591    #[tokio::test]
592    async fn test_e2e_pipeline_without_pruning() {
593        let config = ConcurrentConfig {
594            pruner: None,
595            ..Default::default()
596        };
597        let store = MockStore::default();
598        let setup = TestSetup::new(config, store, 0).await;
599
600        // Send several checkpoints
601        for i in 0..10 {
602            setup
603                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
604                .await
605                .unwrap();
606        }
607
608        // Wait for all data to be processed and committed
609        let watermark = setup
610            .store
611            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
612            .await;
613
614        // Verify ALL data was processed correctly (no pruning)
615        for i in 0..10 {
616            let data = setup
617                .store
618                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
619                .await;
620            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
621        }
622
623        // Verify watermark progression
624        assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
625        assert_eq!(watermark.tx_hi, 18); // 9 * 2
626        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
627
628        // Verify no data was pruned - all 10 checkpoints should still exist
629        let total_checkpoints = {
630            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
631            data.len()
632        };
633        assert_eq!(total_checkpoints, 10);
634    }
635
636    #[tokio::test]
637    async fn test_out_of_order_processing() {
638        let config = ConcurrentConfig::default();
639        let store = MockStore::default();
640        let setup = TestSetup::new(config, store, 0).await;
641
642        // Send checkpoints out of order
643        let checkpoints = vec![2, 0, 4, 1, 3];
644        for cp in checkpoints {
645            setup
646                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
647                .await
648                .unwrap();
649        }
650
651        // Wait for all data to be processed
652        setup
653            .store
654            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
655            .await;
656
657        // Verify all checkpoints were processed correctly despite out-of-order arrival
658        for i in 0..5 {
659            let data = setup
660                .store
661                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
662                .await;
663            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
664        }
665    }
666
667    #[tokio::test]
668    async fn test_watermark_progression_with_gaps() {
669        let config = ConcurrentConfig::default();
670        let store = MockStore::default();
671        let setup = TestSetup::new(config, store, 0).await;
672
673        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
674        for cp in [0, 1, 3, 4] {
675            setup
676                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
677                .await
678                .unwrap();
679        }
680
681        // Wait for processing
682        tokio::time::sleep(Duration::from_secs(1)).await;
683
684        // Watermark should only progress to 1 (can't progress past the gap)
685        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
686        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
687
688        // Now send the missing checkpoint 2
689        setup
690            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
691            .await
692            .unwrap();
693
694        // Now watermark should progress to 4
695        let watermark = setup
696            .store
697            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
698            .await;
699        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
700    }
701
702    // ==================== BACK-PRESSURE TESTING ====================
703
704    #[tokio::test]
705    async fn test_back_pressure_collector_max_pending_rows() {
706        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
707        //
708        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
709        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
710        // │   Input    │    │ (fanout=2) │    │            │    │            │
711        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
712        //                │                 │                 │
713        //              [●●●]           [●●●●●●●]         [●●●●●●]
714        //            buffer: 3         buffer: 7         buffer: 6
715        //
716        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
717
718        let config = ConcurrentConfig {
719            committer: CommitterConfig {
720                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
721                write_concurrency: 1,
722                ..Default::default()
723            },
724            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
725            processor_channel_size: Some(7),
726            collector_channel_size: Some(6),
727            ..Default::default()
728        };
729        let store = MockStore::default();
730        let setup = TestSetup::new(config, store, 0).await;
731
732        // Wait for initial setup
733        tokio::time::sleep(Duration::from_millis(200)).await;
734
735        // Pipeline capacity analysis with collector back pressure:
736        // Configuration: MAX_PENDING_ROWS=4, fanout=2
737        //
738        // Channel and task breakdown:
739        // - Checkpoint->Processor channel: 3 slots (TEST_SUBSCRIBER_CHANNEL_SIZE)
740        // - Processor tasks: 2 tasks (fanout=2)
741        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
742        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
743        //
744        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
745
746        // Fill pipeline to capacity - these should all succeed
747        for i in 0..14 {
748            setup
749                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
750                .await
751                .unwrap();
752        }
753
754        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
755        setup
756            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
757            .await;
758
759        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
760        setup
761            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
762            .await
763            .unwrap();
764
765        // Verify data was processed correctly
766        let data = setup
767            .store
768            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
769            .await;
770        assert_eq!(data, vec![1, 2]);
771    }
772
773    #[tokio::test]
774    async fn test_back_pressure_committer_slow_commits() {
775        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
776        //
777        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
778        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
779        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
780        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
781        //                │                 │                 │
782        //              [●●●]           [●●●●●●●]          [●●●●●●]
783        //            buffer: 3    proc_chan: 7       coll_chan: 6
784        //
785        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
786
787        let config = ConcurrentConfig {
788            committer: CommitterConfig {
789                write_concurrency: 1, // Single committer for deterministic blocking
790                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
791                // this test relies on the collect interval tip to force
792                // batch flushing.
793                collect_interval_ms: 10,
794                ..Default::default()
795            },
796            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
797            processor_channel_size: Some(7),
798            collector_channel_size: Some(6),
799            ..Default::default()
800        };
801        let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
802        let setup = TestSetup::new(config, store, 0).await;
803
804        // Pipeline capacity analysis with slow commits:
805        // Configuration: fanout=2, write_concurrency=1
806        //
807        // Channel and task breakdown:
808        // - Checkpoint->Processor channel: 3 slots (TEST_SUBSCRIBER_CHANNEL_SIZE)
809        // - Processor tasks: 2 tasks (fanout=2)
810        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
811        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
812        // - Committer task: 1 task (blocked by slow commit)
813        //
814        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
815
816        // Fill pipeline to theoretical capacity - these should all succeed
817        for i in 0..19 {
818            setup
819                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
820                .await
821                .unwrap();
822        }
823
824        // Find the actual back pressure point
825        // Due to non-determinism in collector's tokio::select!, the collector may consume
826        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
827        // This means back pressure occurs somewhere in range 19-21.
828        let mut back_pressure_checkpoint = None;
829        for checkpoint in 19..22 {
830            if setup
831                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
832                .await
833                .is_err()
834            {
835                back_pressure_checkpoint = Some(checkpoint);
836                break;
837            }
838        }
839        assert!(
840            back_pressure_checkpoint.is_some(),
841            "Back pressure should occur between checkpoints 19-21"
842        );
843
844        // Verify that some data has been processed (pipeline is working)
845        setup
846            .store
847            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
848            .await;
849
850        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
851        setup
852            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
853            .await
854            .unwrap();
855    }
856
857    // ==================== FAILURE TESTING ====================
858
859    #[tokio::test]
860    async fn test_commit_failure_retry() {
861        let config = ConcurrentConfig::default();
862        let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
863        let setup = TestSetup::new(config, store, 0).await;
864
865        // Send a checkpoint
866        setup
867            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
868            .await
869            .unwrap();
870
871        // Should eventually succeed despite initial commit failures
872        setup
873            .store
874            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
875            .await;
876
877        // Verify data was eventually committed
878        let data = setup
879            .store
880            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
881            .await;
882        assert_eq!(data, vec![1, 2]);
883    }
884
885    #[tokio::test]
886    async fn test_prune_failure_retry() {
887        let config = ConcurrentConfig {
888            pruner: Some(PrunerConfig {
889                interval_ms: 2000, // 2 seconds interval for testing
890                delay_ms: 100,     // Short delay
891                retention: 2,      // Keep only 2 checkpoints
892                ..Default::default()
893            }),
894            ..Default::default()
895        };
896
897        // Configure prune failures for range [0, 2) - fail twice then succeed
898        let store = MockStore::default().with_prune_failures(0, 2, 1);
899        let setup = TestSetup::new(config, store, 0).await;
900
901        // Send enough checkpoints to trigger pruning
902        for i in 0..4 {
903            setup
904                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
905                .await
906                .unwrap();
907        }
908
909        // Verify data is still available BEFORE pruning kicks in
910        // With long pruning interval (5s), we can safely verify data without race conditions
911        for i in 0..4 {
912            let data = setup
913                .store
914                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
915                .await;
916            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
917        }
918
919        // Wait for first pruning attempt (should fail) and verify no data has been pruned
920        setup
921            .store
922            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
923            .await;
924        {
925            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
926            for i in 0..4 {
927                assert!(data.contains_key(&i));
928            }
929        };
930
931        // Wait for second pruning attempt (should succeed)
932        setup
933            .store
934            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
935            .await;
936        {
937            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
938            // Verify recent checkpoints are still available
939            assert!(data.contains_key(&2));
940            assert!(data.contains_key(&3));
941
942            // Verify old checkpoints are pruned
943            assert!(!data.contains_key(&0));
944            assert!(!data.contains_key(&1));
945        };
946    }
947
948    #[tokio::test]
949    async fn test_reader_watermark_failure_retry() {
950        let config = ConcurrentConfig {
951            pruner: Some(PrunerConfig {
952                interval_ms: 100, // Fast interval for testing
953                delay_ms: 100,    // Short delay
954                retention: 3,     // Keep 3 checkpoints
955                ..Default::default()
956            }),
957            ..Default::default()
958        };
959
960        // Configure reader watermark failures - fail 2 times then succeed
961        let store = MockStore::default().with_reader_watermark_failures(2);
962        let setup = TestSetup::new(config, store, 0).await;
963
964        // Send checkpoints to trigger reader watermark updates
965        for i in 0..6 {
966            setup
967                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
968                .await
969                .unwrap();
970        }
971
972        // Wait for processing to complete
973        setup
974            .store
975            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
976            .await;
977
978        // Wait for reader watermark task to attempt updates (with failures and retries)
979        tokio::time::sleep(Duration::from_secs(2)).await;
980
981        // Verify that reader watermark was eventually updated despite failures
982        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
983        assert_eq!(watermark.reader_lo, 3);
984    }
985
986    #[tokio::test]
987    async fn test_database_connection_failure_retry() {
988        let config = ConcurrentConfig::default();
989        let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
990        let setup = TestSetup::new(config, store, 0).await;
991
992        // Send a checkpoint
993        setup
994            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
995            .await
996            .unwrap();
997
998        // Should eventually succeed despite initial failures
999        setup
1000            .store
1001            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
1002            .await;
1003
1004        // Verify data was eventually committed
1005        let data = setup
1006            .store
1007            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1008            .await;
1009        assert_eq!(data, vec![1, 2]);
1010    }
1011}