sui_indexer_alt_framework/pipeline/concurrent/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IngestionConfig;
21use crate::pipeline::Processor;
22use crate::pipeline::WatermarkPart;
23use crate::pipeline::concurrent::collector::collector;
24use crate::pipeline::concurrent::commit_watermark::commit_watermark;
25use crate::pipeline::concurrent::committer::committer;
26use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
27use crate::pipeline::concurrent::pruner::pruner;
28use crate::pipeline::concurrent::reader_watermark::reader_watermark;
29use crate::pipeline::processor::processor;
30use crate::store::ConcurrentStore;
31use crate::store::Store;
32
33mod collector;
34mod commit_watermark;
35mod committer;
36mod main_reader_lo;
37mod pruner;
38mod reader_watermark;
39
40/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum BatchStatus {
43    /// The batch can accept more values.
44    Pending,
45    /// The batch is full and should be committed.
46    Ready,
47}
48
49/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
50/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
51///
52/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
53/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
54/// usage, but each handle may choose to override these defaults, e.g.
55///
56/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
57///   sizes).
58/// - Handlers that do more work during processing may wish to increase their fanout so more of it
59///   can be done concurrently, to preserve throughput.
60///
61/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
62/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
63/// checkpoint below which all data has been committed.
64///
65/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
66/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
67/// back to the ingestion service.
68#[async_trait]
69pub trait Handler: Processor {
70    type Store: ConcurrentStore;
71    type Batch: Default + Send + Sync + 'static;
72
73    /// If at least this many rows are pending, the committer will commit them eagerly.
74    const MIN_EAGER_ROWS: usize = 50;
75
76    /// If there are more than this many rows pending, the committer applies backpressure.
77    const MAX_PENDING_ROWS: usize = 5000;
78
79    /// The maximum number of watermarks that can show up in a single batch.
80    /// This limit exists to deal with pipelines that produce no data for a majority of
81    /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
82    const MAX_WATERMARK_UPDATES: usize = 10_000;
83
84    /// Add values from the iterator to the batch. The implementation may take all, some, or none
85    /// of the values from the iterator by calling `.next()`.
86    ///
87    /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
88    /// or `BatchStatus::Pending` if the batch can accept more values.
89    ///
90    /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
91    /// may also decide to commit a batch based on the trait parameters above.
92    fn batch(
93        &self,
94        batch: &mut Self::Batch,
95        values: &mut std::vec::IntoIter<Self::Value>,
96    ) -> BatchStatus;
97
98    /// Commit the batch to the database, returning the number of rows affected.
99    async fn commit<'a>(
100        &self,
101        batch: &Self::Batch,
102        conn: &mut <Self::Store as Store>::Connection<'a>,
103    ) -> anyhow::Result<usize>;
104
105    /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
106    /// the number of rows affected. This function is optional, and defaults to not pruning at all.
107    async fn prune<'a>(
108        &self,
109        _from: u64,
110        _to_exclusive: u64,
111        _conn: &mut <Self::Store as Store>::Connection<'a>,
112    ) -> anyhow::Result<usize> {
113        Ok(0)
114    }
115}
116
117/// Configuration for a concurrent pipeline
118#[derive(Serialize, Deserialize, Debug, Clone, Default)]
119pub struct ConcurrentConfig {
120    /// Configuration for the writer, that makes forward progress.
121    pub committer: CommitterConfig,
122
123    /// Per-pipeline ingestion overrides.
124    pub ingestion: IngestionConfig,
125
126    /// Configuration for the pruner, that deletes old data.
127    pub pruner: Option<PrunerConfig>,
128
129    /// Processor concurrency. Defaults to adaptive scaling up to the number of CPUs.
130    pub fanout: Option<ConcurrencyConfig>,
131
132    /// Override for `Handler::MIN_EAGER_ROWS` (eager batch threshold).
133    pub min_eager_rows: Option<usize>,
134
135    /// Override for `Handler::MAX_PENDING_ROWS` (backpressure threshold).
136    pub max_pending_rows: Option<usize>,
137
138    /// Override for `Handler::MAX_WATERMARK_UPDATES` (watermarks per batch cap).
139    pub max_watermark_updates: Option<usize>,
140
141    /// Size of the channel between the processor and collector.
142    pub processor_channel_size: Option<usize>,
143
144    /// Size of the channel between the collector and committer.
145    pub collector_channel_size: Option<usize>,
146
147    /// Size of the channel between the committer and the watermark updater.
148    pub committer_channel_size: Option<usize>,
149}
150
151#[derive(Serialize, Deserialize, Debug, Clone)]
152pub struct PrunerConfig {
153    /// How often the pruner should check whether there is any data to prune, in milliseconds.
154    pub interval_ms: u64,
155
156    /// How long to wait after the reader low watermark was set, until it is safe to prune up until
157    /// this new watermark, in milliseconds.
158    pub delay_ms: u64,
159
160    /// How much data to keep, this is measured in checkpoints.
161    pub retention: u64,
162
163    /// The maximum range to try and prune in one request, measured in checkpoints.
164    pub max_chunk_size: u64,
165
166    /// The max number of tasks to run in parallel for pruning.
167    pub prune_concurrency: u64,
168}
169
170/// Values ready to be written to the database. This is an internal type used to communicate
171/// between the collector and the committer parts of the pipeline.
172///
173/// Values inside each batch may or may not be from the same checkpoint. Values in the same
174/// checkpoint can also be split across multiple batches.
175struct BatchedRows<H: Handler> {
176    /// The batch to write
177    batch: H::Batch,
178    /// Number of rows in the batch
179    batch_len: usize,
180    /// Proportions of all the watermarks that are represented in this chunk
181    watermark: Vec<WatermarkPart>,
182}
183
184impl<H, V> BatchedRows<H>
185where
186    H: Handler<Batch = Vec<V>, Value = V>,
187{
188    #[cfg(test)]
189    pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
190        let batch_len = batch.len();
191        Self {
192            batch,
193            batch_len,
194            watermark,
195        }
196    }
197}
198
199impl PrunerConfig {
200    pub fn interval(&self) -> Duration {
201        Duration::from_millis(self.interval_ms)
202    }
203
204    pub fn delay(&self) -> Duration {
205        Duration::from_millis(self.delay_ms)
206    }
207}
208
209impl Default for PrunerConfig {
210    fn default() -> Self {
211        Self {
212            interval_ms: 300_000,
213            delay_ms: 120_000,
214            retention: 4_000_000,
215            max_chunk_size: 2_000,
216            prune_concurrency: 1,
217        }
218    }
219}
220
221/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
222/// strictly after the `watermark` (or from the beginning if no watermark was provided).
223///
224/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
225/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
226/// the database, a committer which writes the rows out concurrently, and a watermark task to
227/// update the high watermark.
228///
229/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
230/// either because it received the checkpoints out-of-order or because of variance in processing
231/// time.
232///
233/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
234/// watermark below which all data has been committed (modulo pruning).
235///
236/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
237/// channels are created to communicate between its various components. The pipeline will shutdown
238/// if any of its input or output channels close, any of its independent tasks fail, or if it is
239/// signalled to shutdown through the returned service handle.
240pub(crate) fn pipeline<H: Handler>(
241    handler: H,
242    next_checkpoint: u64,
243    config: ConcurrentConfig,
244    store: H::Store,
245    task: Option<Task>,
246    checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
247    metrics: Arc<IndexerMetrics>,
248) -> Service {
249    info!(
250        pipeline = H::NAME,
251        "Starting pipeline with config: {config:#?}",
252    );
253
254    let ConcurrentConfig {
255        committer: committer_config,
256        ingestion: _,
257        pruner: pruner_config,
258        fanout,
259        min_eager_rows,
260        max_pending_rows,
261        max_watermark_updates,
262        processor_channel_size,
263        collector_channel_size,
264        committer_channel_size,
265    } = config;
266
267    let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
268        initial: 1,
269        min: 1,
270        max: num_cpus::get(),
271        dead_band: None,
272    });
273    let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
274    let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
275    let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
276
277    let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
278    let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
279
280    let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
281    //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
282    let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
283    //docs::/#buff
284    let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
285    let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
286    let main_reader_lo = Arc::new(SetOnce::new());
287
288    let handler = Arc::new(handler);
289
290    let s_processor = processor(
291        handler.clone(),
292        checkpoint_rx,
293        processor_tx,
294        metrics.clone(),
295        concurrency,
296        store.clone(),
297    );
298
299    let s_collector = collector::<H>(
300        handler.clone(),
301        committer_config.clone(),
302        collector_rx,
303        collector_tx,
304        main_reader_lo.clone(),
305        metrics.clone(),
306        min_eager_rows,
307        max_pending_rows,
308        max_watermark_updates,
309    );
310
311    let s_committer = committer::<H>(
312        handler.clone(),
313        committer_config.clone(),
314        committer_rx,
315        committer_tx,
316        store.clone(),
317        metrics.clone(),
318    );
319
320    let s_commit_watermark = commit_watermark::<H>(
321        next_checkpoint,
322        committer_config,
323        watermark_rx,
324        store.clone(),
325        task.as_ref().map(|t| t.task.clone()),
326        metrics.clone(),
327    );
328
329    let s_track_reader_lo = track_main_reader_lo::<H>(
330        main_reader_lo.clone(),
331        task.as_ref().map(|t| t.reader_interval),
332        store.clone(),
333    );
334
335    let s_reader_watermark =
336        reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
337
338    let s_pruner = pruner(handler, pruner_config, store, metrics);
339
340    s_processor
341        .merge(s_collector)
342        .merge(s_committer)
343        .merge(s_commit_watermark)
344        .attach(s_track_reader_lo)
345        .attach(s_reader_watermark)
346        .attach(s_pruner)
347}
348
349#[cfg(test)]
350mod tests {
351    use std::sync::Arc;
352    use std::time::Duration;
353
354    use prometheus::Registry;
355    use sui_types::digests::CheckpointDigest;
356    use tokio::sync::mpsc;
357    use tokio::time::timeout;
358
359    use crate::FieldCount;
360    use crate::metrics::IndexerMetrics;
361    use crate::mocks::store::FallibleMockConnection;
362    use crate::mocks::store::FallibleMockStore;
363    use crate::pipeline::Processor;
364    use crate::types::full_checkpoint_content::Checkpoint;
365    use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
366
367    use super::*;
368
369    const TEST_TIMEOUT: Duration = Duration::from_secs(60);
370    const TEST_SUBSCRIBER_CHANNEL_SIZE: usize = 3; // Critical for back-pressure testing calculations
371
372    #[derive(Clone, Debug, FieldCount)]
373    struct TestValue {
374        checkpoint: u64,
375        data: u64,
376    }
377
378    struct DataPipeline;
379
380    #[async_trait]
381    impl Processor for DataPipeline {
382        const NAME: &'static str = "test_handler";
383        type Value = TestValue;
384
385        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
386            let cp_num = checkpoint.summary.sequence_number;
387
388            // Every checkpoint will come with 2 processed values
389            Ok(vec![
390                TestValue {
391                    checkpoint: cp_num,
392                    data: cp_num * 10 + 1,
393                },
394                TestValue {
395                    checkpoint: cp_num,
396                    data: cp_num * 10 + 2,
397                },
398            ])
399        }
400    }
401
402    #[async_trait]
403    impl Handler for DataPipeline {
404        type Store = FallibleMockStore;
405        type Batch = Vec<TestValue>;
406
407        const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
408        const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
409        const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
410
411        fn batch(
412            &self,
413            batch: &mut Self::Batch,
414            values: &mut std::vec::IntoIter<Self::Value>,
415        ) -> BatchStatus {
416            // Take all values
417            batch.extend(values);
418            BatchStatus::Pending
419        }
420
421        async fn commit<'a>(
422            &self,
423            batch: &Self::Batch,
424            conn: &mut FallibleMockConnection<'a>,
425        ) -> anyhow::Result<usize> {
426            // Group values by checkpoint
427            let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
428                std::collections::HashMap::new();
429            for value in batch {
430                grouped
431                    .entry(value.checkpoint)
432                    .or_default()
433                    .push(value.data);
434            }
435
436            // Commit all data at once
437            conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
438        }
439
440        async fn prune<'a>(
441            &self,
442            from: u64,
443            to_exclusive: u64,
444            conn: &mut FallibleMockConnection<'a>,
445        ) -> anyhow::Result<usize> {
446            conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
447        }
448    }
449
450    struct TestSetup {
451        store: FallibleMockStore,
452        checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
453        #[allow(unused)]
454        pipeline: Service,
455    }
456
457    impl TestSetup {
458        async fn new(
459            config: ConcurrentConfig,
460            store: FallibleMockStore,
461            next_checkpoint: u64,
462        ) -> Self {
463            let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_SUBSCRIBER_CHANNEL_SIZE);
464            let metrics = IndexerMetrics::new(None, &Registry::default());
465
466            let pipeline = pipeline(
467                DataPipeline,
468                next_checkpoint,
469                config,
470                store.clone(),
471                None,
472                checkpoint_rx,
473                metrics,
474            );
475
476            Self {
477                store,
478                checkpoint_tx,
479                pipeline,
480            }
481        }
482
483        async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
484            let checkpoint_envelope = Arc::new(CheckpointEnvelope {
485                checkpoint: Arc::new(
486                    TestCheckpointBuilder::new(checkpoint)
487                        .with_epoch(1)
488                        .with_network_total_transactions(checkpoint * 2)
489                        .with_timestamp_ms(1000000000 + checkpoint * 1000)
490                        .build_checkpoint(),
491                ),
492                chain_id: CheckpointDigest::new([1; 32]).into(),
493            });
494            self.checkpoint_tx.send(checkpoint_envelope).await?;
495            Ok(())
496        }
497
498        async fn send_checkpoint_with_timeout(
499            &self,
500            checkpoint: u64,
501            timeout_duration: Duration,
502        ) -> anyhow::Result<()> {
503            timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
504        }
505
506        async fn send_checkpoint_expect_timeout(
507            &self,
508            checkpoint: u64,
509            timeout_duration: Duration,
510        ) {
511            timeout(timeout_duration, self.send_checkpoint(checkpoint))
512                .await
513                .unwrap_err(); // Panics if send succeeds
514        }
515    }
516
517    #[tokio::test]
518    async fn test_e2e_pipeline() {
519        let config = ConcurrentConfig {
520            pruner: Some(PrunerConfig {
521                interval_ms: 5_000, // Long interval to test states before pruning
522                delay_ms: 100,      // Short delay for faster tests
523                retention: 3,       // Keep only 3 checkpoints
524                ..Default::default()
525            }),
526            ..Default::default()
527        };
528        let store = FallibleMockStore::default();
529        let setup = TestSetup::new(config, store, 0).await;
530
531        // Send initial checkpoints
532        for i in 0..3 {
533            setup
534                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
535                .await
536                .unwrap();
537        }
538
539        // Verify all initial data is available (before any pruning)
540        for i in 0..3 {
541            let data = setup
542                .store
543                .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
544                .await;
545            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
546        }
547
548        // Add more checkpoints to trigger pruning
549        for i in 3..6 {
550            setup
551                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
552                .await
553                .unwrap();
554        }
555
556        // Verify data is still available BEFORE pruning kicks in
557        // With long pruning interval (5s), we can safely verify data without race conditions
558        for i in 0..6 {
559            let data = setup
560                .store
561                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
562                .await;
563            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
564        }
565
566        // Wait for pruning to occur. The pruner and reader_watermark tasks both run on
567        // the same interval, so poll until the pruner has caught up rather instead of using a
568        // fixed sleep.
569        let pruning_deadline = Duration::from_secs(15);
570        let start = tokio::time::Instant::now();
571        loop {
572            let pruned = {
573                let data = setup.store.data.get(DataPipeline::NAME).unwrap();
574                !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
575            };
576            if pruned {
577                break;
578            }
579            assert!(
580                start.elapsed() < pruning_deadline,
581                "Timed out waiting for pruning to occur"
582            );
583            tokio::time::sleep(Duration::from_millis(100)).await;
584        }
585
586        // Verify recent checkpoints are still available
587        {
588            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
589            assert!(data.contains_key(&3));
590            assert!(data.contains_key(&4));
591            assert!(data.contains_key(&5));
592        };
593    }
594
595    #[tokio::test]
596    async fn test_e2e_pipeline_without_pruning() {
597        let config = ConcurrentConfig {
598            pruner: None,
599            ..Default::default()
600        };
601        let store = FallibleMockStore::default();
602        let setup = TestSetup::new(config, store, 0).await;
603
604        // Send several checkpoints
605        for i in 0..10 {
606            setup
607                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
608                .await
609                .unwrap();
610        }
611
612        // Wait for all data to be processed and committed
613        let watermark = setup
614            .store
615            .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
616            .await;
617
618        // Verify ALL data was processed correctly (no pruning)
619        for i in 0..10 {
620            let data = setup
621                .store
622                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
623                .await;
624            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
625        }
626
627        // Verify watermark progression
628        assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
629        assert_eq!(watermark.tx_hi, 18); // 9 * 2
630        assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
631
632        // Verify no data was pruned - all 10 checkpoints should still exist
633        let total_checkpoints = {
634            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
635            data.len()
636        };
637        assert_eq!(total_checkpoints, 10);
638    }
639
640    #[tokio::test]
641    async fn test_out_of_order_processing() {
642        let config = ConcurrentConfig::default();
643        let store = FallibleMockStore::default();
644        let setup = TestSetup::new(config, store, 0).await;
645
646        // Send checkpoints out of order
647        let checkpoints = vec![2, 0, 4, 1, 3];
648        for cp in checkpoints {
649            setup
650                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
651                .await
652                .unwrap();
653        }
654
655        // Wait for all data to be processed
656        setup
657            .store
658            .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
659            .await;
660
661        // Verify all checkpoints were processed correctly despite out-of-order arrival
662        for i in 0..5 {
663            let data = setup
664                .store
665                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
666                .await;
667            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
668        }
669    }
670
671    #[tokio::test]
672    async fn test_watermark_progression_with_gaps() {
673        let config = ConcurrentConfig::default();
674        let store = FallibleMockStore::default();
675        let setup = TestSetup::new(config, store, 0).await;
676
677        // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
678        for cp in [0, 1, 3, 4] {
679            setup
680                .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
681                .await
682                .unwrap();
683        }
684
685        // Wait for processing
686        tokio::time::sleep(Duration::from_secs(1)).await;
687
688        // Watermark should only progress to 1 (can't progress past the gap)
689        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
690        assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
691
692        // Now send the missing checkpoint 2
693        setup
694            .send_checkpoint_with_timeout(2, Duration::from_millis(200))
695            .await
696            .unwrap();
697
698        // Now watermark should progress to 4
699        let watermark = setup
700            .store
701            .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
702            .await;
703        assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
704    }
705
706    // ==================== BACK-PRESSURE TESTING ====================
707
708    #[tokio::test]
709    async fn test_back_pressure_collector_max_pending_rows() {
710        // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
711        //
712        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
713        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
714        // │   Input    │    │ (fanout=2) │    │            │    │            │
715        // └────────────┘    └────────────┘    └[BOTTLENECK]┘    └────────────┘
716        //                │                 │                 │
717        //              [●●●]           [●●●●●●●]         [●●●●●●]
718        //            buffer: 3         buffer: 7         buffer: 6
719        //
720        // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
721
722        let config = ConcurrentConfig {
723            committer: CommitterConfig {
724                collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
725                write_concurrency: 1,
726                ..Default::default()
727            },
728            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
729            processor_channel_size: Some(7),
730            collector_channel_size: Some(6),
731            ..Default::default()
732        };
733        let store = FallibleMockStore::default();
734        let setup = TestSetup::new(config, store, 0).await;
735
736        // Wait for initial setup
737        tokio::time::sleep(Duration::from_millis(200)).await;
738
739        // Pipeline capacity analysis with collector back pressure:
740        // Configuration: MAX_PENDING_ROWS=4, fanout=2
741        //
742        // Channel and task breakdown:
743        // - Checkpoint->Processor channel: 3 slots (TEST_SUBSCRIBER_CHANNEL_SIZE)
744        // - Processor tasks: 2 tasks (fanout=2)
745        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
746        // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
747        //
748        // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
749
750        // Fill pipeline to capacity - these should all succeed
751        for i in 0..14 {
752            setup
753                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
754                .await
755                .unwrap();
756        }
757
758        // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
759        setup
760            .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
761            .await;
762
763        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
764        setup
765            .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
766            .await
767            .unwrap();
768
769        // Verify data was processed correctly
770        let data = setup
771            .store
772            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
773            .await;
774        assert_eq!(data, vec![1, 2]);
775    }
776
777    #[tokio::test]
778    async fn test_back_pressure_committer_slow_commits() {
779        // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
780        //
781        // ┌────────────┐    ┌────────────┐    ┌────────────┐    ┌────────────┐
782        // │ Checkpoint │ ─► │ Processor  │ ─► │ Collector  │ ─► │ Committer  │
783        // │   Input    │    │ (fanout=2) │    │            │    │🐌 10s Delay│
784        // └────────────┘    └────────────┘    └────────────┘    └[BOTTLENECK]┘
785        //                │                 │                 │
786        //              [●●●]           [●●●●●●●]          [●●●●●●]
787        //            buffer: 3    proc_chan: 7       coll_chan: 6
788        //
789        // BOTTLENECK: Committer with 10s delay blocks entire pipeline
790
791        let config = ConcurrentConfig {
792            committer: CommitterConfig {
793                write_concurrency: 1, // Single committer for deterministic blocking
794                // MIN_EAGER_ROWS is 1000 and MAX_PENDING_ROWS is 4, so
795                // this test relies on the collect interval tip to force
796                // batch flushing.
797                collect_interval_ms: 10,
798                ..Default::default()
799            },
800            fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
801            processor_channel_size: Some(7),
802            collector_channel_size: Some(6),
803            ..Default::default()
804        };
805        let store = FallibleMockStore::default().with_commit_delay(10_000); // 10 seconds delay
806        let setup = TestSetup::new(config, store, 0).await;
807
808        // Pipeline capacity analysis with slow commits:
809        // Configuration: fanout=2, write_concurrency=1
810        //
811        // Channel and task breakdown:
812        // - Checkpoint->Processor channel: 3 slots (TEST_SUBSCRIBER_CHANNEL_SIZE)
813        // - Processor tasks: 2 tasks (fanout=2)
814        // - Processor->Collector channel: 7 slots (processor_channel_size=7)
815        // - Collector->Committer channel: 6 slots (collector_channel_size=6)
816        // - Committer task: 1 task (blocked by slow commit)
817        //
818        // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
819
820        // Fill pipeline to theoretical capacity - these should all succeed
821        for i in 0..19 {
822            setup
823                .send_checkpoint_with_timeout(i, Duration::from_millis(100))
824                .await
825                .unwrap();
826        }
827
828        // Find the actual back pressure point
829        // Due to non-determinism in collector's tokio::select!, the collector may consume
830        // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
831        // This means back pressure occurs somewhere in range 19-21.
832        let mut back_pressure_checkpoint = None;
833        for checkpoint in 19..22 {
834            if setup
835                .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
836                .await
837                .is_err()
838            {
839                back_pressure_checkpoint = Some(checkpoint);
840                break;
841            }
842        }
843        assert!(
844            back_pressure_checkpoint.is_some(),
845            "Back pressure should occur between checkpoints 19-21"
846        );
847
848        // Verify that some data has been processed (pipeline is working)
849        setup
850            .store
851            .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
852            .await;
853
854        // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
855        setup
856            .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
857            .await
858            .unwrap();
859    }
860
861    // ==================== FAILURE TESTING ====================
862
863    #[tokio::test]
864    async fn test_commit_failure_retry() {
865        let config = ConcurrentConfig::default();
866        let store = FallibleMockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
867        let setup = TestSetup::new(config, store, 0).await;
868
869        // Send a checkpoint
870        setup
871            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
872            .await
873            .unwrap();
874
875        // Should eventually succeed despite initial commit failures
876        setup
877            .store
878            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
879            .await;
880
881        // Verify data was eventually committed
882        let data = setup
883            .store
884            .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
885            .await;
886        assert_eq!(data, vec![1, 2]);
887    }
888
889    #[tokio::test]
890    async fn test_prune_failure_retry() {
891        let config = ConcurrentConfig {
892            pruner: Some(PrunerConfig {
893                interval_ms: 2000, // 2 seconds interval for testing
894                delay_ms: 100,     // Short delay
895                retention: 2,      // Keep only 2 checkpoints
896                ..Default::default()
897            }),
898            ..Default::default()
899        };
900
901        // Configure prune failures for range [0, 2) - fail twice then succeed
902        let store = FallibleMockStore::default().with_prune_failures(0, 2, 1);
903        let setup = TestSetup::new(config, store, 0).await;
904
905        // Send enough checkpoints to trigger pruning
906        for i in 0..4 {
907            setup
908                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
909                .await
910                .unwrap();
911        }
912
913        // Verify data is still available BEFORE pruning kicks in
914        // With long pruning interval (5s), we can safely verify data without race conditions
915        for i in 0..4 {
916            let data = setup
917                .store
918                .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
919                .await;
920            assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
921        }
922
923        // Wait for first pruning attempt (should fail) and verify no data has been pruned
924        setup
925            .store
926            .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
927            .await;
928        {
929            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
930            for i in 0..4 {
931                assert!(data.contains_key(&i));
932            }
933        };
934
935        // Wait for second pruning attempt (should succeed)
936        setup
937            .store
938            .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
939            .await;
940        {
941            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
942            // Verify recent checkpoints are still available
943            assert!(data.contains_key(&2));
944            assert!(data.contains_key(&3));
945
946            // Verify old checkpoints are pruned
947            assert!(!data.contains_key(&0));
948            assert!(!data.contains_key(&1));
949        };
950    }
951
952    #[tokio::test]
953    async fn test_reader_watermark_failure_retry() {
954        let config = ConcurrentConfig {
955            pruner: Some(PrunerConfig {
956                interval_ms: 100, // Fast interval for testing
957                delay_ms: 100,    // Short delay
958                retention: 3,     // Keep 3 checkpoints
959                ..Default::default()
960            }),
961            ..Default::default()
962        };
963
964        // Configure reader watermark failures - fail 2 times then succeed
965        let store = FallibleMockStore::default().with_reader_watermark_failures(2);
966        let setup = TestSetup::new(config, store, 0).await;
967
968        // Send checkpoints to trigger reader watermark updates
969        for i in 0..6 {
970            setup
971                .send_checkpoint_with_timeout(i, Duration::from_millis(200))
972                .await
973                .unwrap();
974        }
975
976        // Wait for processing to complete
977        setup
978            .store
979            .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
980            .await;
981
982        // Wait for reader watermark task to attempt updates (with failures and retries)
983        tokio::time::sleep(Duration::from_secs(2)).await;
984
985        // Verify that reader watermark was eventually updated despite failures
986        let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
987        assert_eq!(watermark.reader_lo, 3);
988    }
989
990    #[tokio::test]
991    async fn test_database_connection_failure_retry() {
992        let config = ConcurrentConfig::default();
993        let store = FallibleMockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
994        let setup = TestSetup::new(config, store, 0).await;
995
996        // Send a checkpoint
997        setup
998            .send_checkpoint_with_timeout(0, Duration::from_millis(200))
999            .await
1000            .unwrap();
1001
1002        // Should eventually succeed despite initial failures
1003        setup
1004            .store
1005            .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
1006            .await;
1007
1008        // Verify data was eventually committed
1009        let data = setup
1010            .store
1011            .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1012            .await;
1013        assert_eq!(data, vec![1, 2]);
1014    }
1015}