1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::metrics::IndexerMetrics;
17use crate::pipeline::CommitterConfig;
18use crate::pipeline::PIPELINE_BUFFER;
19use crate::pipeline::Processor;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::collector::collector;
22use crate::pipeline::concurrent::commit_watermark::commit_watermark;
23use crate::pipeline::concurrent::committer::committer;
24use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
25use crate::pipeline::concurrent::pruner::pruner;
26use crate::pipeline::concurrent::reader_watermark::reader_watermark;
27use crate::pipeline::processor::processor;
28use crate::store::Store;
29use crate::types::full_checkpoint_content::Checkpoint;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41 Pending,
43 Ready,
45}
46
47#[async_trait]
67pub trait Handler: Processor {
68 type Store: Store;
69 type Batch: Default + Send + Sync + 'static;
70
71 const MIN_EAGER_ROWS: usize = 50;
73
74 const MAX_PENDING_ROWS: usize = 5000;
76
77 const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82 fn batch(
91 &self,
92 batch: &mut Self::Batch,
93 values: &mut std::vec::IntoIter<Self::Value>,
94 ) -> BatchStatus;
95
96 async fn commit<'a>(
98 &self,
99 batch: &Self::Batch,
100 conn: &mut <Self::Store as Store>::Connection<'a>,
101 ) -> anyhow::Result<usize>;
102
103 async fn prune<'a>(
106 &self,
107 _from: u64,
108 _to_exclusive: u64,
109 _conn: &mut <Self::Store as Store>::Connection<'a>,
110 ) -> anyhow::Result<usize> {
111 Ok(0)
112 }
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118 pub committer: CommitterConfig,
120
121 pub pruner: Option<PrunerConfig>,
123}
124
125#[derive(Serialize, Deserialize, Debug, Clone)]
126pub struct PrunerConfig {
127 pub interval_ms: u64,
129
130 pub delay_ms: u64,
133
134 pub retention: u64,
136
137 pub max_chunk_size: u64,
139
140 pub prune_concurrency: u64,
142}
143
144struct BatchedRows<H: Handler> {
150 batch: H::Batch,
152 batch_len: usize,
154 watermark: Vec<WatermarkPart>,
156}
157
158impl<H, V> BatchedRows<H>
159where
160 H: Handler<Batch = Vec<V>, Value = V>,
161{
162 #[cfg(test)]
163 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
164 let batch_len = batch.len();
165 Self {
166 batch,
167 batch_len,
168 watermark,
169 }
170 }
171}
172
173impl PrunerConfig {
174 pub fn interval(&self) -> Duration {
175 Duration::from_millis(self.interval_ms)
176 }
177
178 pub fn delay(&self) -> Duration {
179 Duration::from_millis(self.delay_ms)
180 }
181}
182
183impl Default for PrunerConfig {
184 fn default() -> Self {
185 Self {
186 interval_ms: 300_000,
187 delay_ms: 120_000,
188 retention: 4_000_000,
189 max_chunk_size: 2_000,
190 prune_concurrency: 1,
191 }
192 }
193}
194
195pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
215 handler: H,
216 next_checkpoint: u64,
217 config: ConcurrentConfig,
218 store: H::Store,
219 task: Option<Task>,
220 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
221 metrics: Arc<IndexerMetrics>,
222) -> Service {
223 info!(
224 pipeline = H::NAME,
225 "Starting pipeline with config: {config:#?}",
226 );
227
228 let ConcurrentConfig {
229 committer: committer_config,
230 pruner: pruner_config,
231 } = config;
232
233 let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
234 let (collector_tx, committer_rx) =
236 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
237 let (committer_tx, watermark_rx) =
239 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
240 let main_reader_lo = Arc::new(SetOnce::new());
241
242 let handler = Arc::new(handler);
243
244 let s_processor = processor(
245 handler.clone(),
246 checkpoint_rx,
247 processor_tx,
248 metrics.clone(),
249 );
250
251 let s_collector = collector::<H>(
252 handler.clone(),
253 committer_config.clone(),
254 collector_rx,
255 collector_tx,
256 main_reader_lo.clone(),
257 metrics.clone(),
258 );
259
260 let s_committer = committer::<H>(
261 handler.clone(),
262 committer_config.clone(),
263 committer_rx,
264 committer_tx,
265 store.clone(),
266 metrics.clone(),
267 );
268
269 let s_commit_watermark = commit_watermark::<H>(
270 next_checkpoint,
271 committer_config,
272 watermark_rx,
273 store.clone(),
274 task.as_ref().map(|t| t.task.clone()),
275 metrics.clone(),
276 );
277
278 let s_track_reader_lo = track_main_reader_lo::<H>(
279 main_reader_lo.clone(),
280 task.as_ref().map(|t| t.reader_interval),
281 store.clone(),
282 );
283
284 let s_reader_watermark =
285 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
286
287 let s_pruner = pruner(handler, pruner_config, store, metrics);
288
289 s_processor
290 .merge(s_collector)
291 .merge(s_committer)
292 .merge(s_commit_watermark)
293 .attach(s_track_reader_lo)
294 .attach(s_reader_watermark)
295 .attach(s_pruner)
296}
297
298#[cfg(test)]
299mod tests {
300 use std::sync::Arc;
301 use std::time::Duration;
302
303 use prometheus::Registry;
304 use tokio::sync::mpsc;
305 use tokio::time::timeout;
306
307 use crate::FieldCount;
308 use crate::metrics::IndexerMetrics;
309 use crate::mocks::store::MockConnection;
310 use crate::mocks::store::MockStore;
311 use crate::pipeline::Processor;
312 use crate::types::full_checkpoint_content::Checkpoint;
313 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
314
315 use super::*;
316
317 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
318 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
321 struct TestValue {
322 checkpoint: u64,
323 data: u64,
324 }
325
326 struct DataPipeline;
327
328 #[async_trait]
329 impl Processor for DataPipeline {
330 const NAME: &'static str = "test_handler";
331 const FANOUT: usize = 2;
332 type Value = TestValue;
333
334 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
335 let cp_num = checkpoint.summary.sequence_number;
336
337 Ok(vec![
339 TestValue {
340 checkpoint: cp_num,
341 data: cp_num * 10 + 1,
342 },
343 TestValue {
344 checkpoint: cp_num,
345 data: cp_num * 10 + 2,
346 },
347 ])
348 }
349 }
350
351 #[async_trait]
352 impl Handler for DataPipeline {
353 type Store = MockStore;
354 type Batch = Vec<TestValue>;
355
356 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
361 &self,
362 batch: &mut Self::Batch,
363 values: &mut std::vec::IntoIter<Self::Value>,
364 ) -> BatchStatus {
365 batch.extend(values);
367 BatchStatus::Pending
368 }
369
370 async fn commit<'a>(
371 &self,
372 batch: &Self::Batch,
373 conn: &mut MockConnection<'a>,
374 ) -> anyhow::Result<usize> {
375 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
377 std::collections::HashMap::new();
378 for value in batch {
379 grouped
380 .entry(value.checkpoint)
381 .or_default()
382 .push(value.data);
383 }
384
385 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
387 }
388
389 async fn prune<'a>(
390 &self,
391 from: u64,
392 to_exclusive: u64,
393 conn: &mut MockConnection<'a>,
394 ) -> anyhow::Result<usize> {
395 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
396 }
397 }
398
399 struct TestSetup {
400 store: MockStore,
401 checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
402 #[allow(unused)]
403 pipeline: Service,
404 }
405
406 impl TestSetup {
407 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
408 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
409 let metrics = IndexerMetrics::new(None, &Registry::default());
410
411 let pipeline = pipeline(
412 DataPipeline,
413 next_checkpoint,
414 config,
415 store.clone(),
416 None,
417 checkpoint_rx,
418 metrics,
419 );
420
421 Self {
422 store,
423 checkpoint_tx,
424 pipeline,
425 }
426 }
427
428 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
429 let checkpoint = Arc::new(
430 TestCheckpointBuilder::new(checkpoint)
431 .with_epoch(1)
432 .with_network_total_transactions(checkpoint * 2)
433 .with_timestamp_ms(1000000000 + checkpoint * 1000)
434 .build_checkpoint(),
435 );
436 self.checkpoint_tx.send(checkpoint).await?;
437 Ok(())
438 }
439
440 async fn send_checkpoint_with_timeout(
441 &self,
442 checkpoint: u64,
443 timeout_duration: Duration,
444 ) -> anyhow::Result<()> {
445 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
446 }
447
448 async fn send_checkpoint_expect_timeout(
449 &self,
450 checkpoint: u64,
451 timeout_duration: Duration,
452 ) {
453 timeout(timeout_duration, self.send_checkpoint(checkpoint))
454 .await
455 .unwrap_err(); }
457 }
458
459 #[tokio::test]
460 async fn test_e2e_pipeline() {
461 let config = ConcurrentConfig {
462 pruner: Some(PrunerConfig {
463 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
467 }),
468 ..Default::default()
469 };
470 let store = MockStore::default();
471 let setup = TestSetup::new(config, store, 0).await;
472
473 for i in 0..3 {
475 setup
476 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
477 .await
478 .unwrap();
479 }
480
481 for i in 0..3 {
483 let data = setup
484 .store
485 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
486 .await;
487 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
488 }
489
490 for i in 3..6 {
492 setup
493 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
494 .await
495 .unwrap();
496 }
497
498 for i in 0..6 {
501 let data = setup
502 .store
503 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
504 .await;
505 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
506 }
507
508 tokio::time::sleep(Duration::from_millis(5_200)).await;
510
511 {
513 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
514
515 assert!(data.contains_key(&3));
517 assert!(data.contains_key(&4));
518 assert!(data.contains_key(&5));
519
520 assert!(!data.contains_key(&0));
522 assert!(!data.contains_key(&1));
523 assert!(!data.contains_key(&2));
524 };
525 }
526
527 #[tokio::test]
528 async fn test_e2e_pipeline_without_pruning() {
529 let config = ConcurrentConfig {
530 pruner: None,
531 ..Default::default()
532 };
533 let store = MockStore::default();
534 let setup = TestSetup::new(config, store, 0).await;
535
536 for i in 0..10 {
538 setup
539 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
540 .await
541 .unwrap();
542 }
543
544 let watermark = setup
546 .store
547 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
548 .await;
549
550 for i in 0..10 {
552 let data = setup
553 .store
554 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
555 .await;
556 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
557 }
558
559 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
561 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
566 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
567 data.len()
568 };
569 assert_eq!(total_checkpoints, 10);
570 }
571
572 #[tokio::test]
573 async fn test_out_of_order_processing() {
574 let config = ConcurrentConfig::default();
575 let store = MockStore::default();
576 let setup = TestSetup::new(config, store, 0).await;
577
578 let checkpoints = vec![2, 0, 4, 1, 3];
580 for cp in checkpoints {
581 setup
582 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
583 .await
584 .unwrap();
585 }
586
587 setup
589 .store
590 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
591 .await;
592
593 for i in 0..5 {
595 let data = setup
596 .store
597 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
598 .await;
599 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
600 }
601 }
602
603 #[tokio::test]
604 async fn test_watermark_progression_with_gaps() {
605 let config = ConcurrentConfig::default();
606 let store = MockStore::default();
607 let setup = TestSetup::new(config, store, 0).await;
608
609 for cp in [0, 1, 3, 4] {
611 setup
612 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
613 .await
614 .unwrap();
615 }
616
617 tokio::time::sleep(Duration::from_secs(1)).await;
619
620 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
622 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
623
624 setup
626 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
627 .await
628 .unwrap();
629
630 let watermark = setup
632 .store
633 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
634 .await;
635 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
636 }
637
638 #[tokio::test]
641 async fn test_back_pressure_collector_max_pending_rows() {
642 let config = ConcurrentConfig {
655 committer: CommitterConfig {
656 collect_interval_ms: 5_000, write_concurrency: 1,
658 ..Default::default()
659 },
660 ..Default::default()
661 };
662 let store = MockStore::default();
663 let setup = TestSetup::new(config, store, 0).await;
664
665 tokio::time::sleep(Duration::from_millis(200)).await;
667
668 for i in 0..14 {
681 setup
682 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
683 .await
684 .unwrap();
685 }
686
687 setup
689 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
690 .await;
691
692 setup
694 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
695 .await
696 .unwrap();
697
698 let data = setup
700 .store
701 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
702 .await;
703 assert_eq!(data, vec![1, 2]);
704 }
705
706 #[tokio::test]
707 async fn test_back_pressure_committer_slow_commits() {
708 let config = ConcurrentConfig {
721 committer: CommitterConfig {
722 write_concurrency: 1, ..Default::default()
724 },
725 ..Default::default()
726 };
727 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
729
730 for i in 0..19 {
744 setup
745 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
746 .await
747 .unwrap();
748 }
749
750 let mut back_pressure_checkpoint = None;
755 for checkpoint in 19..22 {
756 if setup
757 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
758 .await
759 .is_err()
760 {
761 back_pressure_checkpoint = Some(checkpoint);
762 break;
763 }
764 }
765 assert!(
766 back_pressure_checkpoint.is_some(),
767 "Back pressure should occur between checkpoints 19-21"
768 );
769
770 setup
772 .store
773 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
774 .await;
775
776 setup
778 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
779 .await
780 .unwrap();
781 }
782
783 #[tokio::test]
786 async fn test_commit_failure_retry() {
787 let config = ConcurrentConfig::default();
788 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
790
791 setup
793 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
794 .await
795 .unwrap();
796
797 setup
799 .store
800 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
801 .await;
802
803 let data = setup
805 .store
806 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
807 .await;
808 assert_eq!(data, vec![1, 2]);
809 }
810
811 #[tokio::test]
812 async fn test_prune_failure_retry() {
813 let config = ConcurrentConfig {
814 pruner: Some(PrunerConfig {
815 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
819 }),
820 ..Default::default()
821 };
822
823 let store = MockStore::default().with_prune_failures(0, 2, 1);
825 let setup = TestSetup::new(config, store, 0).await;
826
827 for i in 0..4 {
829 setup
830 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
831 .await
832 .unwrap();
833 }
834
835 for i in 0..4 {
838 let data = setup
839 .store
840 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
841 .await;
842 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
843 }
844
845 setup
847 .store
848 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
849 .await;
850 {
851 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
852 for i in 0..4 {
853 assert!(data.contains_key(&i));
854 }
855 };
856
857 setup
859 .store
860 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
861 .await;
862 {
863 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
864 assert!(data.contains_key(&2));
866 assert!(data.contains_key(&3));
867
868 assert!(!data.contains_key(&0));
870 assert!(!data.contains_key(&1));
871 };
872 }
873
874 #[tokio::test]
875 async fn test_reader_watermark_failure_retry() {
876 let config = ConcurrentConfig {
877 pruner: Some(PrunerConfig {
878 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
882 }),
883 ..Default::default()
884 };
885
886 let store = MockStore::default().with_reader_watermark_failures(2);
888 let setup = TestSetup::new(config, store, 0).await;
889
890 for i in 0..6 {
892 setup
893 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
894 .await
895 .unwrap();
896 }
897
898 setup
900 .store
901 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
902 .await;
903
904 tokio::time::sleep(Duration::from_secs(2)).await;
906
907 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
909 assert_eq!(watermark.reader_lo, 3);
910 }
911
912 #[tokio::test]
913 async fn test_database_connection_failure_retry() {
914 let config = ConcurrentConfig::default();
915 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
917
918 setup
920 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
921 .await
922 .unwrap();
923
924 setup
926 .store
927 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
928 .await;
929
930 let data = setup
932 .store
933 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
934 .await;
935 assert_eq!(data, vec![1, 2]);
936 }
937}