sui_indexer_alt_framework/pipeline/concurrent/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_util::sync::CancellationToken;
10use tracing::info;
11
12use crate::{metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint};
13
14use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
15
16use self::{
17 collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
18 reader_watermark::reader_watermark,
19};
20
21mod collector;
22mod commit_watermark;
23mod committer;
24mod pruner;
25mod reader_watermark;
26
27/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum BatchStatus {
30 /// The batch can accept more values.
31 Pending,
32 /// The batch is full and should be committed.
33 Ready,
34}
35
36/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
37/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
38///
39/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
40/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
41/// usage, but each handle may choose to override these defaults, e.g.
42///
43/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
44/// sizes).
45/// - Handlers that do more work during processing may wish to increase their fanout so more of it
46/// can be done concurrently, to preserve throughput.
47///
48/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
49/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
50/// checkpoint below which all data has been committed.
51///
52/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
53/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
54/// back to the ingestion service.
55#[async_trait]
56pub trait Handler: Processor {
57 type Store: Store;
58 type Batch: Default + Send + Sync + 'static;
59
60 /// If at least this many rows are pending, the committer will commit them eagerly.
61 const MIN_EAGER_ROWS: usize = 50;
62
63 /// If there are more than this many rows pending, the committer applies backpressure.
64 const MAX_PENDING_ROWS: usize = 5000;
65
66 /// The maximum number of watermarks that can show up in a single batch.
67 /// This limit exists to deal with pipelines that produce no data for a majority of
68 /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
69 const MAX_WATERMARK_UPDATES: usize = 10_000;
70
71 /// Add values from the iterator to the batch. The implementation may take all, some, or none
72 /// of the values from the iterator by calling `.next()`.
73 ///
74 /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
75 /// or `BatchStatus::Pending` if the batch can accept more values.
76 ///
77 /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
78 /// may also decide to commit a batch based on the trait parameters above.
79 fn batch(
80 &self,
81 batch: &mut Self::Batch,
82 values: &mut std::vec::IntoIter<Self::Value>,
83 ) -> BatchStatus;
84
85 /// Commit the batch to the database, returning the number of rows affected.
86 async fn commit<'a>(
87 &self,
88 batch: &Self::Batch,
89 conn: &mut <Self::Store as Store>::Connection<'a>,
90 ) -> anyhow::Result<usize>;
91
92 /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
93 /// the number of rows affected. This function is optional, and defaults to not pruning at all.
94 async fn prune<'a>(
95 &self,
96 _from: u64,
97 _to_exclusive: u64,
98 _conn: &mut <Self::Store as Store>::Connection<'a>,
99 ) -> anyhow::Result<usize> {
100 Ok(0)
101 }
102}
103
104/// Configuration for a concurrent pipeline
105#[derive(Serialize, Deserialize, Debug, Clone, Default)]
106pub struct ConcurrentConfig {
107 /// Configuration for the writer, that makes forward progress.
108 pub committer: CommitterConfig,
109
110 /// Configuration for the pruner, that deletes old data.
111 pub pruner: Option<PrunerConfig>,
112}
113
114#[derive(Serialize, Deserialize, Debug, Clone)]
115pub struct PrunerConfig {
116 /// How often the pruner should check whether there is any data to prune, in milliseconds.
117 pub interval_ms: u64,
118
119 /// How long to wait after the reader low watermark was set, until it is safe to prune up until
120 /// this new watermark, in milliseconds.
121 pub delay_ms: u64,
122
123 /// How much data to keep, this is measured in checkpoints.
124 pub retention: u64,
125
126 /// The maximum range to try and prune in one request, measured in checkpoints.
127 pub max_chunk_size: u64,
128
129 /// The max number of tasks to run in parallel for pruning.
130 pub prune_concurrency: u64,
131}
132
133/// Values ready to be written to the database. This is an internal type used to communicate
134/// between the collector and the committer parts of the pipeline.
135///
136/// Values inside each batch may or may not be from the same checkpoint. Values in the same
137/// checkpoint can also be split across multiple batches.
138struct BatchedRows<H: Handler> {
139 /// The batch to write
140 batch: H::Batch,
141 /// Number of rows in the batch
142 batch_len: usize,
143 /// Proportions of all the watermarks that are represented in this chunk
144 watermark: Vec<WatermarkPart>,
145}
146
147impl<H, V> BatchedRows<H>
148where
149 H: Handler<Batch = Vec<V>, Value = V>,
150{
151 #[cfg(test)]
152 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
153 let batch_len = batch.len();
154 Self {
155 batch,
156 batch_len,
157 watermark,
158 }
159 }
160}
161
162impl PrunerConfig {
163 pub fn interval(&self) -> Duration {
164 Duration::from_millis(self.interval_ms)
165 }
166
167 pub fn delay(&self) -> Duration {
168 Duration::from_millis(self.delay_ms)
169 }
170}
171
172impl Default for PrunerConfig {
173 fn default() -> Self {
174 Self {
175 interval_ms: 300_000,
176 delay_ms: 120_000,
177 retention: 4_000_000,
178 max_chunk_size: 2_000,
179 prune_concurrency: 1,
180 }
181 }
182}
183
184/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
185/// strictly after the `watermark` (or from the beginning if no watermark was provided).
186///
187/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
188/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
189/// the database, a committer which writes the rows out concurrently, and a watermark task to
190/// update the high watermark.
191///
192/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
193/// either because it received the checkpoints out-of-order or because of variance in processing
194/// time.
195///
196/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
197/// watermark below which all data has been committed (modulo pruning), as long as `skip_watermark`
198/// is not true.
199///
200/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
201/// channels are created to communicate between its various components. The pipeline can be
202/// shutdown using its `cancel` token, and will also shutdown if any of its independent tasks
203/// reports an issue.
204pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
205 handler: H,
206 next_checkpoint: u64,
207 config: ConcurrentConfig,
208 skip_watermark: bool,
209 store: H::Store,
210 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
211 metrics: Arc<IndexerMetrics>,
212 cancel: CancellationToken,
213) -> JoinHandle<()> {
214 info!(
215 pipeline = H::NAME,
216 "Starting pipeline with config: {:?}", config
217 );
218 let ConcurrentConfig {
219 committer: committer_config,
220 pruner: pruner_config,
221 } = config;
222
223 let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
224 //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
225 let (collector_tx, committer_rx) =
226 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
227 //docs::/#buff
228 let (committer_tx, watermark_rx) =
229 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
230
231 // The pruner is not connected to the rest of the tasks by channels, so it needs to be
232 // explicitly signalled to shutdown when the other tasks shutdown, in addition to listening to
233 // the global cancel signal. We achieve this by creating a child cancel token that we call
234 // cancel on once the committer tasks have shutdown.
235 let pruner_cancel = cancel.child_token();
236 let handler = Arc::new(handler);
237
238 let processor = processor(
239 handler.clone(),
240 checkpoint_rx,
241 processor_tx,
242 metrics.clone(),
243 cancel.clone(),
244 );
245
246 let collector = collector::<H>(
247 handler.clone(),
248 committer_config.clone(),
249 collector_rx,
250 collector_tx,
251 metrics.clone(),
252 cancel.clone(),
253 );
254
255 let committer = committer::<H>(
256 handler.clone(),
257 committer_config.clone(),
258 skip_watermark,
259 committer_rx,
260 committer_tx,
261 store.clone(),
262 metrics.clone(),
263 cancel.clone(),
264 );
265
266 let commit_watermark = commit_watermark::<H>(
267 next_checkpoint,
268 committer_config,
269 skip_watermark,
270 watermark_rx,
271 store.clone(),
272 metrics.clone(),
273 cancel,
274 );
275
276 let reader_watermark = reader_watermark::<H>(
277 pruner_config.clone(),
278 store.clone(),
279 metrics.clone(),
280 pruner_cancel.clone(),
281 );
282
283 let pruner = pruner(
284 handler,
285 pruner_config,
286 store,
287 metrics,
288 pruner_cancel.clone(),
289 );
290
291 tokio::spawn(async move {
292 let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
293
294 pruner_cancel.cancel();
295 let _ = futures::join!(reader_watermark, pruner);
296 })
297}
298
299#[cfg(test)]
300mod tests {
301 use std::{sync::Arc, time::Duration};
302
303 use prometheus::Registry;
304 use tokio::{sync::mpsc, time::timeout};
305 use tokio_util::sync::CancellationToken;
306
307 use crate::{
308 FieldCount,
309 metrics::IndexerMetrics,
310 mocks::store::{MockConnection, MockStore},
311 pipeline::Processor,
312 types::{
313 full_checkpoint_content::Checkpoint,
314 test_checkpoint_data_builder::TestCheckpointBuilder,
315 },
316 };
317
318 use super::*;
319
320 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
321 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
322
323 #[derive(Clone, Debug, FieldCount)]
324 struct TestValue {
325 checkpoint: u64,
326 data: u64,
327 }
328
329 struct DataPipeline;
330
331 #[async_trait]
332 impl Processor for DataPipeline {
333 const NAME: &'static str = "test_handler";
334 const FANOUT: usize = 2;
335 type Value = TestValue;
336
337 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
338 let cp_num = checkpoint.summary.sequence_number;
339
340 // Every checkpoint will come with 2 processed values
341 Ok(vec![
342 TestValue {
343 checkpoint: cp_num,
344 data: cp_num * 10 + 1,
345 },
346 TestValue {
347 checkpoint: cp_num,
348 data: cp_num * 10 + 2,
349 },
350 ])
351 }
352 }
353
354 #[async_trait]
355 impl Handler for DataPipeline {
356 type Store = MockStore;
357 type Batch = Vec<TestValue>;
358
359 const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
360 const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
361 const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
362
363 fn batch(
364 &self,
365 batch: &mut Self::Batch,
366 values: &mut std::vec::IntoIter<Self::Value>,
367 ) -> BatchStatus {
368 // Take all values
369 batch.extend(values);
370 BatchStatus::Pending
371 }
372
373 async fn commit<'a>(
374 &self,
375 batch: &Self::Batch,
376 conn: &mut MockConnection<'a>,
377 ) -> anyhow::Result<usize> {
378 // Group values by checkpoint
379 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
380 std::collections::HashMap::new();
381 for value in batch {
382 grouped
383 .entry(value.checkpoint)
384 .or_default()
385 .push(value.data);
386 }
387
388 // Commit all data at once
389 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
390 }
391
392 async fn prune<'a>(
393 &self,
394 from: u64,
395 to_exclusive: u64,
396 conn: &mut MockConnection<'a>,
397 ) -> anyhow::Result<usize> {
398 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
399 }
400 }
401
402 struct TestSetup {
403 store: MockStore,
404 checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
405 pipeline_handle: JoinHandle<()>,
406 cancel: CancellationToken,
407 }
408
409 impl TestSetup {
410 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
411 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
412 let metrics = IndexerMetrics::new(None, &Registry::default());
413 let cancel = CancellationToken::new();
414
415 let skip_watermark = false;
416 let pipeline_handle = pipeline(
417 DataPipeline,
418 next_checkpoint,
419 config,
420 skip_watermark,
421 store.clone(),
422 checkpoint_rx,
423 metrics,
424 cancel.clone(),
425 );
426
427 Self {
428 store,
429 checkpoint_tx,
430 pipeline_handle,
431 cancel,
432 }
433 }
434
435 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
436 let checkpoint = Arc::new(
437 TestCheckpointBuilder::new(checkpoint)
438 .with_epoch(1)
439 .with_network_total_transactions(checkpoint * 2)
440 .with_timestamp_ms(1000000000 + checkpoint * 1000)
441 .build_checkpoint(),
442 );
443 self.checkpoint_tx.send(checkpoint).await?;
444 Ok(())
445 }
446
447 async fn shutdown(self) {
448 drop(self.checkpoint_tx);
449 self.cancel.cancel();
450 let _ = self.pipeline_handle.await;
451 }
452
453 async fn send_checkpoint_with_timeout(
454 &self,
455 checkpoint: u64,
456 timeout_duration: Duration,
457 ) -> anyhow::Result<()> {
458 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
459 }
460
461 async fn send_checkpoint_expect_timeout(
462 &self,
463 checkpoint: u64,
464 timeout_duration: Duration,
465 ) {
466 timeout(timeout_duration, self.send_checkpoint(checkpoint))
467 .await
468 .unwrap_err(); // Panics if send succeeds
469 }
470 }
471
472 #[tokio::test]
473 async fn test_e2e_pipeline() {
474 let config = ConcurrentConfig {
475 pruner: Some(PrunerConfig {
476 interval_ms: 5_000, // Long interval to test states before pruning
477 delay_ms: 100, // Short delay for faster tests
478 retention: 3, // Keep only 3 checkpoints
479 ..Default::default()
480 }),
481 ..Default::default()
482 };
483 let store = MockStore::default();
484 let setup = TestSetup::new(config, store, 0).await;
485
486 // Send initial checkpoints
487 for i in 0..3 {
488 setup
489 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
490 .await
491 .unwrap();
492 }
493
494 // Verify all initial data is available (before any pruning)
495 for i in 0..3 {
496 let data = setup
497 .store
498 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
499 .await;
500 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
501 }
502
503 // Add more checkpoints to trigger pruning
504 for i in 3..6 {
505 setup
506 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
507 .await
508 .unwrap();
509 }
510
511 // Verify data is still available BEFORE pruning kicks in
512 // With long pruning interval (5s), we can safely verify data without race conditions
513 for i in 0..6 {
514 let data = setup
515 .store
516 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
517 .await;
518 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
519 }
520
521 // Wait for pruning to occur (5s + delay + processing time)
522 tokio::time::sleep(Duration::from_millis(5_200)).await;
523
524 // Verify pruning has occurred
525 {
526 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
527
528 // Verify recent checkpoints are still available
529 assert!(data.contains_key(&3));
530 assert!(data.contains_key(&4));
531 assert!(data.contains_key(&5));
532
533 // Verify old checkpoints are pruned
534 assert!(!data.contains_key(&0));
535 assert!(!data.contains_key(&1));
536 assert!(!data.contains_key(&2));
537 };
538
539 setup.shutdown().await;
540 }
541
542 #[tokio::test]
543 async fn test_e2e_pipeline_without_pruning() {
544 let config = ConcurrentConfig {
545 pruner: None,
546 ..Default::default()
547 };
548 let store = MockStore::default();
549 let setup = TestSetup::new(config, store, 0).await;
550
551 // Send several checkpoints
552 for i in 0..10 {
553 setup
554 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
555 .await
556 .unwrap();
557 }
558
559 // Wait for all data to be processed and committed
560 let watermark = setup
561 .store
562 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
563 .await;
564
565 // Verify ALL data was processed correctly (no pruning)
566 for i in 0..10 {
567 let data = setup
568 .store
569 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
570 .await;
571 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
572 }
573
574 // Verify watermark progression
575 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
576 assert_eq!(watermark.tx_hi, 18); // 9 * 2
577 assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
578
579 // Verify no data was pruned - all 10 checkpoints should still exist
580 let total_checkpoints = {
581 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
582 data.len()
583 };
584 assert_eq!(total_checkpoints, 10);
585
586 setup.shutdown().await;
587 }
588
589 #[tokio::test]
590 async fn test_out_of_order_processing() {
591 let config = ConcurrentConfig::default();
592 let store = MockStore::default();
593 let setup = TestSetup::new(config, store, 0).await;
594
595 // Send checkpoints out of order
596 let checkpoints = vec![2, 0, 4, 1, 3];
597 for cp in checkpoints {
598 setup
599 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
600 .await
601 .unwrap();
602 }
603
604 // Wait for all data to be processed
605 setup
606 .store
607 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
608 .await;
609
610 // Verify all checkpoints were processed correctly despite out-of-order arrival
611 for i in 0..5 {
612 let data = setup
613 .store
614 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
615 .await;
616 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
617 }
618
619 setup.shutdown().await;
620 }
621
622 #[tokio::test]
623 async fn test_watermark_progression_with_gaps() {
624 let config = ConcurrentConfig::default();
625 let store = MockStore::default();
626 let setup = TestSetup::new(config, store, 0).await;
627
628 // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
629 for cp in [0, 1, 3, 4] {
630 setup
631 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
632 .await
633 .unwrap();
634 }
635
636 // Wait for processing
637 tokio::time::sleep(Duration::from_secs(1)).await;
638
639 // Watermark should only progress to 1 (can't progress past the gap)
640 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
641 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
642
643 // Now send the missing checkpoint 2
644 setup
645 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
646 .await
647 .unwrap();
648
649 // Now watermark should progress to 4
650 let watermark = setup
651 .store
652 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
653 .await;
654 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
655
656 setup.shutdown().await;
657 }
658
659 // ==================== BACK-PRESSURE TESTING ====================
660
661 #[tokio::test]
662 async fn test_back_pressure_collector_max_pending_rows() {
663 // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
664 //
665 // ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
666 // │ Checkpoint │ ─► │ Processor │ ─► │ Collector │ ─► │ Committer │
667 // │ Input │ │ (FANOUT=2) │ │ │ │ │
668 // └────────────┘ └────────────┘ └[BOTTLENECK]┘ └────────────┘
669 // │ │ │
670 // [●●●] [●●●●●●●] [●●●●●●]
671 // buffer: 3 buffer: 7 buffer: 6
672 //
673 // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
674
675 let config = ConcurrentConfig {
676 committer: CommitterConfig {
677 collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
678 write_concurrency: 1,
679 ..Default::default()
680 },
681 ..Default::default()
682 };
683 let store = MockStore::default();
684 let setup = TestSetup::new(config, store, 0).await;
685
686 // Wait for initial setup
687 tokio::time::sleep(Duration::from_millis(200)).await;
688
689 // Pipeline capacity analysis with collector back pressure:
690 // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
691 //
692 // Channel and task breakdown:
693 // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
694 // - Processor tasks: 2 tasks (FANOUT=2)
695 // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
696 // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
697 //
698 // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
699
700 // Fill pipeline to capacity - these should all succeed
701 for i in 0..14 {
702 setup
703 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
704 .await
705 .unwrap();
706 }
707
708 // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
709 setup
710 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
711 .await;
712
713 // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
714 setup
715 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
716 .await
717 .unwrap();
718
719 // Verify data was processed correctly
720 let data = setup
721 .store
722 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
723 .await;
724 assert_eq!(data, vec![1, 2]);
725
726 setup.shutdown().await;
727 }
728
729 #[tokio::test]
730 async fn test_back_pressure_committer_slow_commits() {
731 // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
732 //
733 // ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
734 // │ Checkpoint │ ─► │ Processor │ ─► │ Collector │ ─► │ Committer │
735 // │ Input │ │ (FANOUT=2) │ │ │ │🐌 10s Delay│
736 // └────────────┘ └────────────┘ └────────────┘ └[BOTTLENECK]┘
737 // │ │ │
738 // [●●●] [●●●●●●●] [●●●●●●]
739 // buffer: 3 buffer: 7 buffer: 6
740 //
741 // BOTTLENECK: Committer with 10s delay blocks entire pipeline
742
743 let config = ConcurrentConfig {
744 committer: CommitterConfig {
745 write_concurrency: 1, // Single committer for deterministic blocking
746 ..Default::default()
747 },
748 ..Default::default()
749 };
750 let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
751 let setup = TestSetup::new(config, store, 0).await;
752
753 // Pipeline capacity analysis with slow commits:
754 // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
755 //
756 // Channel and task breakdown:
757 // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
758 // - Processor tasks: 2 tasks (FANOUT=2)
759 // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
760 // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
761 // - Committer task: 1 task (blocked by slow commit)
762 //
763 // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
764
765 // Fill pipeline to theoretical capacity - these should all succeed
766 for i in 0..19 {
767 setup
768 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
769 .await
770 .unwrap();
771 }
772
773 // Find the actual back pressure point
774 // Due to non-determinism in collector's tokio::select!, the collector may consume
775 // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
776 // This means back pressure occurs somewhere in range 19-21.
777 let mut back_pressure_checkpoint = None;
778 for checkpoint in 19..22 {
779 if setup
780 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
781 .await
782 .is_err()
783 {
784 back_pressure_checkpoint = Some(checkpoint);
785 break;
786 }
787 }
788 assert!(
789 back_pressure_checkpoint.is_some(),
790 "Back pressure should occur between checkpoints 19-21"
791 );
792
793 // Verify that some data has been processed (pipeline is working)
794 setup
795 .store
796 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
797 .await;
798
799 // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
800 setup
801 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
802 .await
803 .unwrap();
804
805 setup.shutdown().await;
806 }
807
808 // ==================== FAILURE TESTING ====================
809
810 #[tokio::test]
811 async fn test_commit_failure_retry() {
812 let config = ConcurrentConfig::default();
813 let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
814 let setup = TestSetup::new(config, store, 0).await;
815
816 // Send a checkpoint
817 setup
818 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
819 .await
820 .unwrap();
821
822 // Should eventually succeed despite initial commit failures
823 setup
824 .store
825 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
826 .await;
827
828 // Verify data was eventually committed
829 let data = setup
830 .store
831 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
832 .await;
833 assert_eq!(data, vec![1, 2]);
834
835 setup.shutdown().await;
836 }
837
838 #[tokio::test]
839 async fn test_prune_failure_retry() {
840 let config = ConcurrentConfig {
841 pruner: Some(PrunerConfig {
842 interval_ms: 2000, // 2 seconds interval for testing
843 delay_ms: 100, // Short delay
844 retention: 2, // Keep only 2 checkpoints
845 ..Default::default()
846 }),
847 ..Default::default()
848 };
849
850 // Configure prune failures for range [0, 2) - fail twice then succeed
851 let store = MockStore::default().with_prune_failures(0, 2, 1);
852 let setup = TestSetup::new(config, store, 0).await;
853
854 // Send enough checkpoints to trigger pruning
855 for i in 0..4 {
856 setup
857 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
858 .await
859 .unwrap();
860 }
861
862 // Verify data is still available BEFORE pruning kicks in
863 // With long pruning interval (5s), we can safely verify data without race conditions
864 for i in 0..4 {
865 let data = setup
866 .store
867 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
868 .await;
869 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
870 }
871
872 // Wait for first pruning attempt (should fail) and verify no data has been pruned
873 setup
874 .store
875 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
876 .await;
877 {
878 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
879 for i in 0..4 {
880 assert!(data.contains_key(&i));
881 }
882 };
883
884 // Wait for second pruning attempt (should succeed)
885 setup
886 .store
887 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
888 .await;
889 {
890 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
891 // Verify recent checkpoints are still available
892 assert!(data.contains_key(&2));
893 assert!(data.contains_key(&3));
894
895 // Verify old checkpoints are pruned
896 assert!(!data.contains_key(&0));
897 assert!(!data.contains_key(&1));
898 };
899
900 setup.shutdown().await;
901 }
902
903 #[tokio::test]
904 async fn test_reader_watermark_failure_retry() {
905 let config = ConcurrentConfig {
906 pruner: Some(PrunerConfig {
907 interval_ms: 100, // Fast interval for testing
908 delay_ms: 100, // Short delay
909 retention: 3, // Keep 3 checkpoints
910 ..Default::default()
911 }),
912 ..Default::default()
913 };
914
915 // Configure reader watermark failures - fail 2 times then succeed
916 let store = MockStore::default().with_reader_watermark_failures(2);
917 let setup = TestSetup::new(config, store, 0).await;
918
919 // Send checkpoints to trigger reader watermark updates
920 for i in 0..6 {
921 setup
922 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
923 .await
924 .unwrap();
925 }
926
927 // Wait for processing to complete
928 setup
929 .store
930 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
931 .await;
932
933 // Wait for reader watermark task to attempt updates (with failures and retries)
934 tokio::time::sleep(Duration::from_secs(2)).await;
935
936 // Verify that reader watermark was eventually updated despite failures
937 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
938 assert_eq!(watermark.reader_lo, 3);
939
940 setup.shutdown().await;
941 }
942
943 #[tokio::test]
944 async fn test_database_connection_failure_retry() {
945 let config = ConcurrentConfig::default();
946 let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
947 let setup = TestSetup::new(config, store, 0).await;
948
949 // Send a checkpoint
950 setup
951 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
952 .await
953 .unwrap();
954
955 // Should eventually succeed despite initial failures
956 setup
957 .store
958 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
959 .await;
960
961 // Verify data was eventually committed
962 let data = setup
963 .store
964 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
965 .await;
966 assert_eq!(data, vec![1, 2]);
967
968 setup.shutdown().await;
969 }
970}