sui_indexer_alt_framework/pipeline/concurrent/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use sui_futures::service::Service;
9use tokio::sync::{SetOnce, mpsc};
10use tracing::info;
11
12use crate::{
13 Task, metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint,
14};
15
16use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
17
18use self::{
19 collector::collector, commit_watermark::commit_watermark, committer::committer,
20 main_reader_lo::track_main_reader_lo, pruner::pruner, reader_watermark::reader_watermark,
21};
22
23mod collector;
24mod commit_watermark;
25mod committer;
26mod main_reader_lo;
27mod pruner;
28mod reader_watermark;
29
30/// Status returned by `Handler::batch` to indicate whether the batch is ready to be committed.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum BatchStatus {
33 /// The batch can accept more values.
34 Pending,
35 /// The batch is full and should be committed.
36 Ready,
37}
38
39/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
40/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
41///
42/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
43/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
44/// usage, but each handle may choose to override these defaults, e.g.
45///
46/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
47/// sizes).
48/// - Handlers that do more work during processing may wish to increase their fanout so more of it
49/// can be done concurrently, to preserve throughput.
50///
51/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
52/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
53/// checkpoint below which all data has been committed.
54///
55/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
56/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
57/// back to the ingestion service.
58#[async_trait]
59pub trait Handler: Processor {
60 type Store: Store;
61 type Batch: Default + Send + Sync + 'static;
62
63 /// If at least this many rows are pending, the committer will commit them eagerly.
64 const MIN_EAGER_ROWS: usize = 50;
65
66 /// If there are more than this many rows pending, the committer applies backpressure.
67 const MAX_PENDING_ROWS: usize = 5000;
68
69 /// The maximum number of watermarks that can show up in a single batch.
70 /// This limit exists to deal with pipelines that produce no data for a majority of
71 /// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
72 const MAX_WATERMARK_UPDATES: usize = 10_000;
73
74 /// Add values from the iterator to the batch. The implementation may take all, some, or none
75 /// of the values from the iterator by calling `.next()`.
76 ///
77 /// Returns `BatchStatus::Ready` if the batch is full and should be committed,
78 /// or `BatchStatus::Pending` if the batch can accept more values.
79 ///
80 /// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
81 /// may also decide to commit a batch based on the trait parameters above.
82 fn batch(
83 &self,
84 batch: &mut Self::Batch,
85 values: &mut std::vec::IntoIter<Self::Value>,
86 ) -> BatchStatus;
87
88 /// Commit the batch to the database, returning the number of rows affected.
89 async fn commit<'a>(
90 &self,
91 batch: &Self::Batch,
92 conn: &mut <Self::Store as Store>::Connection<'a>,
93 ) -> anyhow::Result<usize>;
94
95 /// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
96 /// the number of rows affected. This function is optional, and defaults to not pruning at all.
97 async fn prune<'a>(
98 &self,
99 _from: u64,
100 _to_exclusive: u64,
101 _conn: &mut <Self::Store as Store>::Connection<'a>,
102 ) -> anyhow::Result<usize> {
103 Ok(0)
104 }
105}
106
107/// Configuration for a concurrent pipeline
108#[derive(Serialize, Deserialize, Debug, Clone, Default)]
109pub struct ConcurrentConfig {
110 /// Configuration for the writer, that makes forward progress.
111 pub committer: CommitterConfig,
112
113 /// Configuration for the pruner, that deletes old data.
114 pub pruner: Option<PrunerConfig>,
115}
116
117#[derive(Serialize, Deserialize, Debug, Clone)]
118pub struct PrunerConfig {
119 /// How often the pruner should check whether there is any data to prune, in milliseconds.
120 pub interval_ms: u64,
121
122 /// How long to wait after the reader low watermark was set, until it is safe to prune up until
123 /// this new watermark, in milliseconds.
124 pub delay_ms: u64,
125
126 /// How much data to keep, this is measured in checkpoints.
127 pub retention: u64,
128
129 /// The maximum range to try and prune in one request, measured in checkpoints.
130 pub max_chunk_size: u64,
131
132 /// The max number of tasks to run in parallel for pruning.
133 pub prune_concurrency: u64,
134}
135
136/// Values ready to be written to the database. This is an internal type used to communicate
137/// between the collector and the committer parts of the pipeline.
138///
139/// Values inside each batch may or may not be from the same checkpoint. Values in the same
140/// checkpoint can also be split across multiple batches.
141struct BatchedRows<H: Handler> {
142 /// The batch to write
143 batch: H::Batch,
144 /// Number of rows in the batch
145 batch_len: usize,
146 /// Proportions of all the watermarks that are represented in this chunk
147 watermark: Vec<WatermarkPart>,
148}
149
150impl<H, V> BatchedRows<H>
151where
152 H: Handler<Batch = Vec<V>, Value = V>,
153{
154 #[cfg(test)]
155 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
156 let batch_len = batch.len();
157 Self {
158 batch,
159 batch_len,
160 watermark,
161 }
162 }
163}
164
165impl PrunerConfig {
166 pub fn interval(&self) -> Duration {
167 Duration::from_millis(self.interval_ms)
168 }
169
170 pub fn delay(&self) -> Duration {
171 Duration::from_millis(self.delay_ms)
172 }
173}
174
175impl Default for PrunerConfig {
176 fn default() -> Self {
177 Self {
178 interval_ms: 300_000,
179 delay_ms: 120_000,
180 retention: 4_000_000,
181 max_chunk_size: 2_000,
182 prune_concurrency: 1,
183 }
184 }
185}
186
187/// Start a new concurrent (out-of-order) indexing pipeline served by the handler, `H`. Starting
188/// strictly after the `watermark` (or from the beginning if no watermark was provided).
189///
190/// Each pipeline consists of a processor task which takes checkpoint data and breaks it down into
191/// rows, ready for insertion, a collector which batches those rows into an appropriate size for
192/// the database, a committer which writes the rows out concurrently, and a watermark task to
193/// update the high watermark.
194///
195/// Committing is performed out-of-order: the pipeline may write out checkpoints out-of-order,
196/// either because it received the checkpoints out-of-order or because of variance in processing
197/// time.
198///
199/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
200/// watermark below which all data has been committed (modulo pruning).
201///
202/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
203/// channels are created to communicate between its various components. The pipeline will shutdown
204/// if any of its input or output channels close, any of its independent tasks fail, or if it is
205/// signalled to shutdown through the returned service handle.
206pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
207 handler: H,
208 next_checkpoint: u64,
209 config: ConcurrentConfig,
210 store: H::Store,
211 task: Option<Task>,
212 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
213 metrics: Arc<IndexerMetrics>,
214) -> Service {
215 info!(
216 pipeline = H::NAME,
217 "Starting pipeline with config: {config:#?}",
218 );
219
220 let ConcurrentConfig {
221 committer: committer_config,
222 pruner: pruner_config,
223 } = config;
224
225 let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
226 //docs::#buff (see docs/content/guides/developer/advanced/custom-indexer.mdx)
227 let (collector_tx, committer_rx) =
228 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
229 //docs::/#buff
230 let (committer_tx, watermark_rx) =
231 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
232 let main_reader_lo = Arc::new(SetOnce::new());
233
234 let handler = Arc::new(handler);
235
236 let s_processor = processor(
237 handler.clone(),
238 checkpoint_rx,
239 processor_tx,
240 metrics.clone(),
241 );
242
243 let s_collector = collector::<H>(
244 handler.clone(),
245 committer_config.clone(),
246 collector_rx,
247 collector_tx,
248 main_reader_lo.clone(),
249 metrics.clone(),
250 );
251
252 let s_committer = committer::<H>(
253 handler.clone(),
254 committer_config.clone(),
255 committer_rx,
256 committer_tx,
257 store.clone(),
258 metrics.clone(),
259 );
260
261 let s_commit_watermark = commit_watermark::<H>(
262 next_checkpoint,
263 committer_config,
264 watermark_rx,
265 store.clone(),
266 task.as_ref().map(|t| t.task.clone()),
267 metrics.clone(),
268 );
269
270 let s_track_reader_lo = track_main_reader_lo::<H>(
271 main_reader_lo.clone(),
272 task.as_ref().map(|t| t.reader_interval),
273 store.clone(),
274 );
275
276 let s_reader_watermark =
277 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
278
279 let s_pruner = pruner(handler, pruner_config, store, metrics);
280
281 s_processor
282 .merge(s_collector)
283 .merge(s_committer)
284 .merge(s_commit_watermark)
285 .attach(s_track_reader_lo)
286 .attach(s_reader_watermark)
287 .attach(s_pruner)
288}
289
290#[cfg(test)]
291mod tests {
292 use std::{sync::Arc, time::Duration};
293
294 use prometheus::Registry;
295 use tokio::{sync::mpsc, time::timeout};
296
297 use crate::{
298 FieldCount,
299 metrics::IndexerMetrics,
300 mocks::store::{MockConnection, MockStore},
301 pipeline::Processor,
302 types::{
303 full_checkpoint_content::Checkpoint,
304 test_checkpoint_data_builder::TestCheckpointBuilder,
305 },
306 };
307
308 use super::*;
309
310 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
311 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; // Critical for back-pressure testing calculations
312
313 #[derive(Clone, Debug, FieldCount)]
314 struct TestValue {
315 checkpoint: u64,
316 data: u64,
317 }
318
319 struct DataPipeline;
320
321 #[async_trait]
322 impl Processor for DataPipeline {
323 const NAME: &'static str = "test_handler";
324 const FANOUT: usize = 2;
325 type Value = TestValue;
326
327 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
328 let cp_num = checkpoint.summary.sequence_number;
329
330 // Every checkpoint will come with 2 processed values
331 Ok(vec![
332 TestValue {
333 checkpoint: cp_num,
334 data: cp_num * 10 + 1,
335 },
336 TestValue {
337 checkpoint: cp_num,
338 data: cp_num * 10 + 2,
339 },
340 ])
341 }
342 }
343
344 #[async_trait]
345 impl Handler for DataPipeline {
346 type Store = MockStore;
347 type Batch = Vec<TestValue>;
348
349 const MIN_EAGER_ROWS: usize = 1000; // High value to disable eager batching
350 const MAX_PENDING_ROWS: usize = 4; // Small value to trigger back pressure quickly
351 const MAX_WATERMARK_UPDATES: usize = 1; // Each batch will have 1 checkpoint for an ease of testing.
352
353 fn batch(
354 &self,
355 batch: &mut Self::Batch,
356 values: &mut std::vec::IntoIter<Self::Value>,
357 ) -> BatchStatus {
358 // Take all values
359 batch.extend(values);
360 BatchStatus::Pending
361 }
362
363 async fn commit<'a>(
364 &self,
365 batch: &Self::Batch,
366 conn: &mut MockConnection<'a>,
367 ) -> anyhow::Result<usize> {
368 // Group values by checkpoint
369 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
370 std::collections::HashMap::new();
371 for value in batch {
372 grouped
373 .entry(value.checkpoint)
374 .or_default()
375 .push(value.data);
376 }
377
378 // Commit all data at once
379 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
380 }
381
382 async fn prune<'a>(
383 &self,
384 from: u64,
385 to_exclusive: u64,
386 conn: &mut MockConnection<'a>,
387 ) -> anyhow::Result<usize> {
388 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
389 }
390 }
391
392 struct TestSetup {
393 store: MockStore,
394 checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
395 #[allow(unused)]
396 pipeline: Service,
397 }
398
399 impl TestSetup {
400 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
401 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
402 let metrics = IndexerMetrics::new(None, &Registry::default());
403
404 let pipeline = pipeline(
405 DataPipeline,
406 next_checkpoint,
407 config,
408 store.clone(),
409 None,
410 checkpoint_rx,
411 metrics,
412 );
413
414 Self {
415 store,
416 checkpoint_tx,
417 pipeline,
418 }
419 }
420
421 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
422 let checkpoint = Arc::new(
423 TestCheckpointBuilder::new(checkpoint)
424 .with_epoch(1)
425 .with_network_total_transactions(checkpoint * 2)
426 .with_timestamp_ms(1000000000 + checkpoint * 1000)
427 .build_checkpoint(),
428 );
429 self.checkpoint_tx.send(checkpoint).await?;
430 Ok(())
431 }
432
433 async fn send_checkpoint_with_timeout(
434 &self,
435 checkpoint: u64,
436 timeout_duration: Duration,
437 ) -> anyhow::Result<()> {
438 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
439 }
440
441 async fn send_checkpoint_expect_timeout(
442 &self,
443 checkpoint: u64,
444 timeout_duration: Duration,
445 ) {
446 timeout(timeout_duration, self.send_checkpoint(checkpoint))
447 .await
448 .unwrap_err(); // Panics if send succeeds
449 }
450 }
451
452 #[tokio::test]
453 async fn test_e2e_pipeline() {
454 let config = ConcurrentConfig {
455 pruner: Some(PrunerConfig {
456 interval_ms: 5_000, // Long interval to test states before pruning
457 delay_ms: 100, // Short delay for faster tests
458 retention: 3, // Keep only 3 checkpoints
459 ..Default::default()
460 }),
461 ..Default::default()
462 };
463 let store = MockStore::default();
464 let setup = TestSetup::new(config, store, 0).await;
465
466 // Send initial checkpoints
467 for i in 0..3 {
468 setup
469 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
470 .await
471 .unwrap();
472 }
473
474 // Verify all initial data is available (before any pruning)
475 for i in 0..3 {
476 let data = setup
477 .store
478 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
479 .await;
480 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
481 }
482
483 // Add more checkpoints to trigger pruning
484 for i in 3..6 {
485 setup
486 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
487 .await
488 .unwrap();
489 }
490
491 // Verify data is still available BEFORE pruning kicks in
492 // With long pruning interval (5s), we can safely verify data without race conditions
493 for i in 0..6 {
494 let data = setup
495 .store
496 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
497 .await;
498 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
499 }
500
501 // Wait for pruning to occur (5s + delay + processing time)
502 tokio::time::sleep(Duration::from_millis(5_200)).await;
503
504 // Verify pruning has occurred
505 {
506 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
507
508 // Verify recent checkpoints are still available
509 assert!(data.contains_key(&3));
510 assert!(data.contains_key(&4));
511 assert!(data.contains_key(&5));
512
513 // Verify old checkpoints are pruned
514 assert!(!data.contains_key(&0));
515 assert!(!data.contains_key(&1));
516 assert!(!data.contains_key(&2));
517 };
518 }
519
520 #[tokio::test]
521 async fn test_e2e_pipeline_without_pruning() {
522 let config = ConcurrentConfig {
523 pruner: None,
524 ..Default::default()
525 };
526 let store = MockStore::default();
527 let setup = TestSetup::new(config, store, 0).await;
528
529 // Send several checkpoints
530 for i in 0..10 {
531 setup
532 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
533 .await
534 .unwrap();
535 }
536
537 // Wait for all data to be processed and committed
538 let watermark = setup
539 .store
540 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
541 .await;
542
543 // Verify ALL data was processed correctly (no pruning)
544 for i in 0..10 {
545 let data = setup
546 .store
547 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
548 .await;
549 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
550 }
551
552 // Verify watermark progression
553 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
554 assert_eq!(watermark.tx_hi, 18); // 9 * 2
555 assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); // 1000000000 + 9 * 1000
556
557 // Verify no data was pruned - all 10 checkpoints should still exist
558 let total_checkpoints = {
559 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
560 data.len()
561 };
562 assert_eq!(total_checkpoints, 10);
563 }
564
565 #[tokio::test]
566 async fn test_out_of_order_processing() {
567 let config = ConcurrentConfig::default();
568 let store = MockStore::default();
569 let setup = TestSetup::new(config, store, 0).await;
570
571 // Send checkpoints out of order
572 let checkpoints = vec![2, 0, 4, 1, 3];
573 for cp in checkpoints {
574 setup
575 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
576 .await
577 .unwrap();
578 }
579
580 // Wait for all data to be processed
581 setup
582 .store
583 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
584 .await;
585
586 // Verify all checkpoints were processed correctly despite out-of-order arrival
587 for i in 0..5 {
588 let data = setup
589 .store
590 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
591 .await;
592 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
593 }
594 }
595
596 #[tokio::test]
597 async fn test_watermark_progression_with_gaps() {
598 let config = ConcurrentConfig::default();
599 let store = MockStore::default();
600 let setup = TestSetup::new(config, store, 0).await;
601
602 // Send checkpoints with a gap (0, 1, 3, 4) - missing checkpoint 2
603 for cp in [0, 1, 3, 4] {
604 setup
605 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
606 .await
607 .unwrap();
608 }
609
610 // Wait for processing
611 tokio::time::sleep(Duration::from_secs(1)).await;
612
613 // Watermark should only progress to 1 (can't progress past the gap)
614 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
615 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
616
617 // Now send the missing checkpoint 2
618 setup
619 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
620 .await
621 .unwrap();
622
623 // Now watermark should progress to 4
624 let watermark = setup
625 .store
626 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
627 .await;
628 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
629 }
630
631 // ==================== BACK-PRESSURE TESTING ====================
632
633 #[tokio::test]
634 async fn test_back_pressure_collector_max_pending_rows() {
635 // Pipeline Diagram - Collector Back Pressure via MAX_PENDING_ROWS:
636 //
637 // ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
638 // │ Checkpoint │ ─► │ Processor │ ─► │ Collector │ ─► │ Committer │
639 // │ Input │ │ (FANOUT=2) │ │ │ │ │
640 // └────────────┘ └────────────┘ └[BOTTLENECK]┘ └────────────┘
641 // │ │ │
642 // [●●●] [●●●●●●●] [●●●●●●]
643 // buffer: 3 buffer: 7 buffer: 6
644 //
645 // BOTTLENECK: Collector stops accepting when pending rows ≥ MAX_PENDING_ROWS (4)
646
647 let config = ConcurrentConfig {
648 committer: CommitterConfig {
649 collect_interval_ms: 5_000, // Long interval to prevent timer-driven collection
650 write_concurrency: 1,
651 ..Default::default()
652 },
653 ..Default::default()
654 };
655 let store = MockStore::default();
656 let setup = TestSetup::new(config, store, 0).await;
657
658 // Wait for initial setup
659 tokio::time::sleep(Duration::from_millis(200)).await;
660
661 // Pipeline capacity analysis with collector back pressure:
662 // Configuration: MAX_PENDING_ROWS=4, FANOUT=2, PIPELINE_BUFFER=5
663 //
664 // Channel and task breakdown:
665 // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
666 // - Processor tasks: 2 tasks (FANOUT=2)
667 // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
668 // - Collector pending: 2 checkpoints × 2 values = 4 values (hits MAX_PENDING_ROWS=4)
669 //
670 // Total capacity: 3 + 2 + 7 + 2 = 14 checkpoints
671
672 // Fill pipeline to capacity - these should all succeed
673 for i in 0..14 {
674 setup
675 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
676 .await
677 .unwrap();
678 }
679
680 // Checkpoint 14 should block due to MAX_PENDING_ROWS back pressure
681 setup
682 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
683 .await;
684
685 // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
686 setup
687 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
688 .await
689 .unwrap();
690
691 // Verify data was processed correctly
692 let data = setup
693 .store
694 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
695 .await;
696 assert_eq!(data, vec![1, 2]);
697 }
698
699 #[tokio::test]
700 async fn test_back_pressure_committer_slow_commits() {
701 // Pipeline Diagram - Committer Back Pressure via Slow Database Commits:
702 //
703 // ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
704 // │ Checkpoint │ ─► │ Processor │ ─► │ Collector │ ─► │ Committer │
705 // │ Input │ │ (FANOUT=2) │ │ │ │🐌 10s Delay│
706 // └────────────┘ └────────────┘ └────────────┘ └[BOTTLENECK]┘
707 // │ │ │
708 // [●●●] [●●●●●●●] [●●●●●●]
709 // buffer: 3 buffer: 7 buffer: 6
710 //
711 // BOTTLENECK: Committer with 10s delay blocks entire pipeline
712
713 let config = ConcurrentConfig {
714 committer: CommitterConfig {
715 write_concurrency: 1, // Single committer for deterministic blocking
716 ..Default::default()
717 },
718 ..Default::default()
719 };
720 let store = MockStore::default().with_commit_delay(10_000); // 10 seconds delay
721 let setup = TestSetup::new(config, store, 0).await;
722
723 // Pipeline capacity analysis with slow commits:
724 // Configuration: FANOUT=2, write_concurrency=1, PIPELINE_BUFFER=5
725 //
726 // Channel and task breakdown:
727 // - Checkpoint->Processor channel: 3 slots (TEST_CHECKPOINT_BUFFER_SIZE)
728 // - Processor tasks: 2 tasks (FANOUT=2)
729 // - Processor->Collector channel: 7 slots (FANOUT=2 + PIPELINE_BUFFER=5)
730 // - Collector->Committer channel: 6 slots (write_concurrency=1 + PIPELINE_BUFFER=5)
731 // - Committer task: 1 task (blocked by slow commit)
732 //
733 // Total theoretical capacity: 3 + 2 + 7 + 6 + 1 = 19 checkpoints
734
735 // Fill pipeline to theoretical capacity - these should all succeed
736 for i in 0..19 {
737 setup
738 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
739 .await
740 .unwrap();
741 }
742
743 // Find the actual back pressure point
744 // Due to non-determinism in collector's tokio::select!, the collector may consume
745 // up to 2 checkpoints (filling MAX_PENDING_ROWS=4) before applying back pressure.
746 // This means back pressure occurs somewhere in range 19-21.
747 let mut back_pressure_checkpoint = None;
748 for checkpoint in 19..22 {
749 if setup
750 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
751 .await
752 .is_err()
753 {
754 back_pressure_checkpoint = Some(checkpoint);
755 break;
756 }
757 }
758 assert!(
759 back_pressure_checkpoint.is_some(),
760 "Back pressure should occur between checkpoints 19-21"
761 );
762
763 // Verify that some data has been processed (pipeline is working)
764 setup
765 .store
766 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
767 .await;
768
769 // Allow pipeline to drain by sending the blocked checkpoint with longer timeout
770 setup
771 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
772 .await
773 .unwrap();
774 }
775
776 // ==================== FAILURE TESTING ====================
777
778 #[tokio::test]
779 async fn test_commit_failure_retry() {
780 let config = ConcurrentConfig::default();
781 let store = MockStore::default().with_commit_failures(2); // Fail 2 times, then succeed
782 let setup = TestSetup::new(config, store, 0).await;
783
784 // Send a checkpoint
785 setup
786 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
787 .await
788 .unwrap();
789
790 // Should eventually succeed despite initial commit failures
791 setup
792 .store
793 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
794 .await;
795
796 // Verify data was eventually committed
797 let data = setup
798 .store
799 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
800 .await;
801 assert_eq!(data, vec![1, 2]);
802 }
803
804 #[tokio::test]
805 async fn test_prune_failure_retry() {
806 let config = ConcurrentConfig {
807 pruner: Some(PrunerConfig {
808 interval_ms: 2000, // 2 seconds interval for testing
809 delay_ms: 100, // Short delay
810 retention: 2, // Keep only 2 checkpoints
811 ..Default::default()
812 }),
813 ..Default::default()
814 };
815
816 // Configure prune failures for range [0, 2) - fail twice then succeed
817 let store = MockStore::default().with_prune_failures(0, 2, 1);
818 let setup = TestSetup::new(config, store, 0).await;
819
820 // Send enough checkpoints to trigger pruning
821 for i in 0..4 {
822 setup
823 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
824 .await
825 .unwrap();
826 }
827
828 // Verify data is still available BEFORE pruning kicks in
829 // With long pruning interval (5s), we can safely verify data without race conditions
830 for i in 0..4 {
831 let data = setup
832 .store
833 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
834 .await;
835 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
836 }
837
838 // Wait for first pruning attempt (should fail) and verify no data has been pruned
839 setup
840 .store
841 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
842 .await;
843 {
844 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
845 for i in 0..4 {
846 assert!(data.contains_key(&i));
847 }
848 };
849
850 // Wait for second pruning attempt (should succeed)
851 setup
852 .store
853 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
854 .await;
855 {
856 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
857 // Verify recent checkpoints are still available
858 assert!(data.contains_key(&2));
859 assert!(data.contains_key(&3));
860
861 // Verify old checkpoints are pruned
862 assert!(!data.contains_key(&0));
863 assert!(!data.contains_key(&1));
864 };
865 }
866
867 #[tokio::test]
868 async fn test_reader_watermark_failure_retry() {
869 let config = ConcurrentConfig {
870 pruner: Some(PrunerConfig {
871 interval_ms: 100, // Fast interval for testing
872 delay_ms: 100, // Short delay
873 retention: 3, // Keep 3 checkpoints
874 ..Default::default()
875 }),
876 ..Default::default()
877 };
878
879 // Configure reader watermark failures - fail 2 times then succeed
880 let store = MockStore::default().with_reader_watermark_failures(2);
881 let setup = TestSetup::new(config, store, 0).await;
882
883 // Send checkpoints to trigger reader watermark updates
884 for i in 0..6 {
885 setup
886 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
887 .await
888 .unwrap();
889 }
890
891 // Wait for processing to complete
892 setup
893 .store
894 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
895 .await;
896
897 // Wait for reader watermark task to attempt updates (with failures and retries)
898 tokio::time::sleep(Duration::from_secs(2)).await;
899
900 // Verify that reader watermark was eventually updated despite failures
901 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
902 assert_eq!(watermark.reader_lo, 3);
903 }
904
905 #[tokio::test]
906 async fn test_database_connection_failure_retry() {
907 let config = ConcurrentConfig::default();
908 let store = MockStore::default().with_connection_failures(2); // Fail 2 times, then succeed
909 let setup = TestSetup::new(config, store, 0).await;
910
911 // Send a checkpoint
912 setup
913 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
914 .await
915 .unwrap();
916
917 // Should eventually succeed despite initial failures
918 setup
919 .store
920 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
921 .await;
922
923 // Verify data was eventually committed
924 let data = setup
925 .store
926 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
927 .await;
928 assert_eq!(data, vec![1, 2]);
929 }
930}