1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IngestionConfig;
21use crate::pipeline::Processor;
22use crate::pipeline::WatermarkPart;
23use crate::pipeline::concurrent::collector::collector;
24use crate::pipeline::concurrent::commit_watermark::commit_watermark;
25use crate::pipeline::concurrent::committer::committer;
26use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
27use crate::pipeline::concurrent::pruner::pruner;
28use crate::pipeline::concurrent::reader_watermark::reader_watermark;
29use crate::pipeline::processor::processor;
30use crate::store::ConcurrentStore;
31use crate::store::Store;
32
33mod collector;
34mod commit_watermark;
35mod committer;
36mod main_reader_lo;
37mod pruner;
38mod reader_watermark;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum BatchStatus {
43 Pending,
45 Ready,
47}
48
49#[async_trait]
69pub trait Handler: Processor {
70 type Store: ConcurrentStore;
71 type Batch: Default + Send + Sync + 'static;
72
73 const MIN_EAGER_ROWS: usize = 50;
75
76 const MAX_PENDING_ROWS: usize = 5000;
78
79 const MAX_WATERMARK_UPDATES: usize = 10_000;
83
84 fn batch(
93 &self,
94 batch: &mut Self::Batch,
95 values: &mut std::vec::IntoIter<Self::Value>,
96 ) -> BatchStatus;
97
98 async fn commit<'a>(
100 &self,
101 batch: &Self::Batch,
102 conn: &mut <Self::Store as Store>::Connection<'a>,
103 ) -> anyhow::Result<usize>;
104
105 async fn prune<'a>(
108 &self,
109 _from: u64,
110 _to_exclusive: u64,
111 _conn: &mut <Self::Store as Store>::Connection<'a>,
112 ) -> anyhow::Result<usize> {
113 Ok(0)
114 }
115}
116
117#[derive(Serialize, Deserialize, Debug, Clone, Default)]
119pub struct ConcurrentConfig {
120 pub committer: CommitterConfig,
122
123 pub ingestion: IngestionConfig,
125
126 pub pruner: Option<PrunerConfig>,
128
129 pub fanout: Option<ConcurrencyConfig>,
131
132 pub min_eager_rows: Option<usize>,
134
135 pub max_pending_rows: Option<usize>,
137
138 pub max_watermark_updates: Option<usize>,
140
141 pub processor_channel_size: Option<usize>,
143
144 pub collector_channel_size: Option<usize>,
146
147 pub committer_channel_size: Option<usize>,
149}
150
151#[derive(Serialize, Deserialize, Debug, Clone)]
152pub struct PrunerConfig {
153 pub interval_ms: u64,
155
156 pub delay_ms: u64,
159
160 pub retention: u64,
162
163 pub max_chunk_size: u64,
165
166 pub prune_concurrency: u64,
168}
169
170struct BatchedRows<H: Handler> {
176 batch: H::Batch,
178 batch_len: usize,
180 watermark: Vec<WatermarkPart>,
182}
183
184impl<H, V> BatchedRows<H>
185where
186 H: Handler<Batch = Vec<V>, Value = V>,
187{
188 #[cfg(test)]
189 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
190 let batch_len = batch.len();
191 Self {
192 batch,
193 batch_len,
194 watermark,
195 }
196 }
197}
198
199impl PrunerConfig {
200 pub fn interval(&self) -> Duration {
201 Duration::from_millis(self.interval_ms)
202 }
203
204 pub fn delay(&self) -> Duration {
205 Duration::from_millis(self.delay_ms)
206 }
207}
208
209impl Default for PrunerConfig {
210 fn default() -> Self {
211 Self {
212 interval_ms: 300_000,
213 delay_ms: 120_000,
214 retention: 4_000_000,
215 max_chunk_size: 2_000,
216 prune_concurrency: 1,
217 }
218 }
219}
220
221pub(crate) fn pipeline<H: Handler>(
241 handler: H,
242 next_checkpoint: u64,
243 config: ConcurrentConfig,
244 store: H::Store,
245 task: Option<Task>,
246 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
247 metrics: Arc<IndexerMetrics>,
248) -> Service {
249 info!(
250 pipeline = H::NAME,
251 "Starting pipeline with config: {config:#?}",
252 );
253
254 let ConcurrentConfig {
255 committer: committer_config,
256 ingestion: _,
257 pruner: pruner_config,
258 fanout,
259 min_eager_rows,
260 max_pending_rows,
261 max_watermark_updates,
262 processor_channel_size,
263 collector_channel_size,
264 committer_channel_size,
265 } = config;
266
267 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
268 initial: 1,
269 min: 1,
270 max: num_cpus::get(),
271 dead_band: None,
272 });
273 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
274 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
275 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
276
277 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
278 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
279
280 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
281 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
283 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
285 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
286 let main_reader_lo = Arc::new(SetOnce::new());
287
288 let handler = Arc::new(handler);
289
290 let s_processor = processor(
291 handler.clone(),
292 checkpoint_rx,
293 processor_tx,
294 metrics.clone(),
295 concurrency,
296 store.clone(),
297 );
298
299 let s_collector = collector::<H>(
300 handler.clone(),
301 committer_config.clone(),
302 collector_rx,
303 collector_tx,
304 main_reader_lo.clone(),
305 metrics.clone(),
306 min_eager_rows,
307 max_pending_rows,
308 max_watermark_updates,
309 );
310
311 let s_committer = committer::<H>(
312 handler.clone(),
313 committer_config.clone(),
314 committer_rx,
315 committer_tx,
316 store.clone(),
317 metrics.clone(),
318 );
319
320 let s_commit_watermark = commit_watermark::<H>(
321 next_checkpoint,
322 committer_config,
323 watermark_rx,
324 store.clone(),
325 task.as_ref().map(|t| t.task.clone()),
326 metrics.clone(),
327 );
328
329 let s_track_reader_lo = track_main_reader_lo::<H>(
330 main_reader_lo.clone(),
331 task.as_ref().map(|t| t.reader_interval),
332 store.clone(),
333 );
334
335 let s_reader_watermark =
336 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
337
338 let s_pruner = pruner(handler, pruner_config, store, metrics);
339
340 s_processor
341 .merge(s_collector)
342 .merge(s_committer)
343 .merge(s_commit_watermark)
344 .attach(s_track_reader_lo)
345 .attach(s_reader_watermark)
346 .attach(s_pruner)
347}
348
349#[cfg(test)]
350mod tests {
351 use std::sync::Arc;
352 use std::time::Duration;
353
354 use prometheus::Registry;
355 use sui_types::digests::CheckpointDigest;
356 use tokio::sync::mpsc;
357 use tokio::time::timeout;
358
359 use crate::FieldCount;
360 use crate::metrics::IndexerMetrics;
361 use crate::mocks::store::MockConnection;
362 use crate::mocks::store::MockStore;
363 use crate::pipeline::Processor;
364 use crate::types::full_checkpoint_content::Checkpoint;
365 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
366
367 use super::*;
368
369 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
370 const TEST_SUBSCRIBER_CHANNEL_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
373 struct TestValue {
374 checkpoint: u64,
375 data: u64,
376 }
377
378 struct DataPipeline;
379
380 #[async_trait]
381 impl Processor for DataPipeline {
382 const NAME: &'static str = "test_handler";
383 type Value = TestValue;
384
385 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
386 let cp_num = checkpoint.summary.sequence_number;
387
388 Ok(vec![
390 TestValue {
391 checkpoint: cp_num,
392 data: cp_num * 10 + 1,
393 },
394 TestValue {
395 checkpoint: cp_num,
396 data: cp_num * 10 + 2,
397 },
398 ])
399 }
400 }
401
402 #[async_trait]
403 impl Handler for DataPipeline {
404 type Store = MockStore;
405 type Batch = Vec<TestValue>;
406
407 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
412 &self,
413 batch: &mut Self::Batch,
414 values: &mut std::vec::IntoIter<Self::Value>,
415 ) -> BatchStatus {
416 batch.extend(values);
418 BatchStatus::Pending
419 }
420
421 async fn commit<'a>(
422 &self,
423 batch: &Self::Batch,
424 conn: &mut MockConnection<'a>,
425 ) -> anyhow::Result<usize> {
426 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
428 std::collections::HashMap::new();
429 for value in batch {
430 grouped
431 .entry(value.checkpoint)
432 .or_default()
433 .push(value.data);
434 }
435
436 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
438 }
439
440 async fn prune<'a>(
441 &self,
442 from: u64,
443 to_exclusive: u64,
444 conn: &mut MockConnection<'a>,
445 ) -> anyhow::Result<usize> {
446 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
447 }
448 }
449
450 struct TestSetup {
451 store: MockStore,
452 checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
453 #[allow(unused)]
454 pipeline: Service,
455 }
456
457 impl TestSetup {
458 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
459 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_SUBSCRIBER_CHANNEL_SIZE);
460 let metrics = IndexerMetrics::new(None, &Registry::default());
461
462 let pipeline = pipeline(
463 DataPipeline,
464 next_checkpoint,
465 config,
466 store.clone(),
467 None,
468 checkpoint_rx,
469 metrics,
470 );
471
472 Self {
473 store,
474 checkpoint_tx,
475 pipeline,
476 }
477 }
478
479 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
480 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
481 checkpoint: Arc::new(
482 TestCheckpointBuilder::new(checkpoint)
483 .with_epoch(1)
484 .with_network_total_transactions(checkpoint * 2)
485 .with_timestamp_ms(1000000000 + checkpoint * 1000)
486 .build_checkpoint(),
487 ),
488 chain_id: CheckpointDigest::new([1; 32]).into(),
489 });
490 self.checkpoint_tx.send(checkpoint_envelope).await?;
491 Ok(())
492 }
493
494 async fn send_checkpoint_with_timeout(
495 &self,
496 checkpoint: u64,
497 timeout_duration: Duration,
498 ) -> anyhow::Result<()> {
499 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
500 }
501
502 async fn send_checkpoint_expect_timeout(
503 &self,
504 checkpoint: u64,
505 timeout_duration: Duration,
506 ) {
507 timeout(timeout_duration, self.send_checkpoint(checkpoint))
508 .await
509 .unwrap_err(); }
511 }
512
513 #[tokio::test]
514 async fn test_e2e_pipeline() {
515 let config = ConcurrentConfig {
516 pruner: Some(PrunerConfig {
517 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
521 }),
522 ..Default::default()
523 };
524 let store = MockStore::default();
525 let setup = TestSetup::new(config, store, 0).await;
526
527 for i in 0..3 {
529 setup
530 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
531 .await
532 .unwrap();
533 }
534
535 for i in 0..3 {
537 let data = setup
538 .store
539 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
540 .await;
541 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
542 }
543
544 for i in 3..6 {
546 setup
547 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
548 .await
549 .unwrap();
550 }
551
552 for i in 0..6 {
555 let data = setup
556 .store
557 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
558 .await;
559 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
560 }
561
562 let pruning_deadline = Duration::from_secs(15);
566 let start = tokio::time::Instant::now();
567 loop {
568 let pruned = {
569 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
570 !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
571 };
572 if pruned {
573 break;
574 }
575 assert!(
576 start.elapsed() < pruning_deadline,
577 "Timed out waiting for pruning to occur"
578 );
579 tokio::time::sleep(Duration::from_millis(100)).await;
580 }
581
582 {
584 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
585 assert!(data.contains_key(&3));
586 assert!(data.contains_key(&4));
587 assert!(data.contains_key(&5));
588 };
589 }
590
591 #[tokio::test]
592 async fn test_e2e_pipeline_without_pruning() {
593 let config = ConcurrentConfig {
594 pruner: None,
595 ..Default::default()
596 };
597 let store = MockStore::default();
598 let setup = TestSetup::new(config, store, 0).await;
599
600 for i in 0..10 {
602 setup
603 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
604 .await
605 .unwrap();
606 }
607
608 let watermark = setup
610 .store
611 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
612 .await;
613
614 for i in 0..10 {
616 let data = setup
617 .store
618 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
619 .await;
620 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
621 }
622
623 assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
625 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
630 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
631 data.len()
632 };
633 assert_eq!(total_checkpoints, 10);
634 }
635
636 #[tokio::test]
637 async fn test_out_of_order_processing() {
638 let config = ConcurrentConfig::default();
639 let store = MockStore::default();
640 let setup = TestSetup::new(config, store, 0).await;
641
642 let checkpoints = vec![2, 0, 4, 1, 3];
644 for cp in checkpoints {
645 setup
646 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
647 .await
648 .unwrap();
649 }
650
651 setup
653 .store
654 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
655 .await;
656
657 for i in 0..5 {
659 let data = setup
660 .store
661 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
662 .await;
663 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
664 }
665 }
666
667 #[tokio::test]
668 async fn test_watermark_progression_with_gaps() {
669 let config = ConcurrentConfig::default();
670 let store = MockStore::default();
671 let setup = TestSetup::new(config, store, 0).await;
672
673 for cp in [0, 1, 3, 4] {
675 setup
676 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
677 .await
678 .unwrap();
679 }
680
681 tokio::time::sleep(Duration::from_secs(1)).await;
683
684 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
686 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
687
688 setup
690 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
691 .await
692 .unwrap();
693
694 let watermark = setup
696 .store
697 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
698 .await;
699 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
700 }
701
702 #[tokio::test]
705 async fn test_back_pressure_collector_max_pending_rows() {
706 let config = ConcurrentConfig {
719 committer: CommitterConfig {
720 collect_interval_ms: 5_000, write_concurrency: 1,
722 ..Default::default()
723 },
724 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
725 processor_channel_size: Some(7),
726 collector_channel_size: Some(6),
727 ..Default::default()
728 };
729 let store = MockStore::default();
730 let setup = TestSetup::new(config, store, 0).await;
731
732 tokio::time::sleep(Duration::from_millis(200)).await;
734
735 for i in 0..14 {
748 setup
749 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
750 .await
751 .unwrap();
752 }
753
754 setup
756 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
757 .await;
758
759 setup
761 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
762 .await
763 .unwrap();
764
765 let data = setup
767 .store
768 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
769 .await;
770 assert_eq!(data, vec![1, 2]);
771 }
772
773 #[tokio::test]
774 async fn test_back_pressure_committer_slow_commits() {
775 let config = ConcurrentConfig {
788 committer: CommitterConfig {
789 write_concurrency: 1, collect_interval_ms: 10,
794 ..Default::default()
795 },
796 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
797 processor_channel_size: Some(7),
798 collector_channel_size: Some(6),
799 ..Default::default()
800 };
801 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
803
804 for i in 0..19 {
818 setup
819 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
820 .await
821 .unwrap();
822 }
823
824 let mut back_pressure_checkpoint = None;
829 for checkpoint in 19..22 {
830 if setup
831 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
832 .await
833 .is_err()
834 {
835 back_pressure_checkpoint = Some(checkpoint);
836 break;
837 }
838 }
839 assert!(
840 back_pressure_checkpoint.is_some(),
841 "Back pressure should occur between checkpoints 19-21"
842 );
843
844 setup
846 .store
847 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
848 .await;
849
850 setup
852 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
853 .await
854 .unwrap();
855 }
856
857 #[tokio::test]
860 async fn test_commit_failure_retry() {
861 let config = ConcurrentConfig::default();
862 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
864
865 setup
867 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
868 .await
869 .unwrap();
870
871 setup
873 .store
874 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
875 .await;
876
877 let data = setup
879 .store
880 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
881 .await;
882 assert_eq!(data, vec![1, 2]);
883 }
884
885 #[tokio::test]
886 async fn test_prune_failure_retry() {
887 let config = ConcurrentConfig {
888 pruner: Some(PrunerConfig {
889 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
893 }),
894 ..Default::default()
895 };
896
897 let store = MockStore::default().with_prune_failures(0, 2, 1);
899 let setup = TestSetup::new(config, store, 0).await;
900
901 for i in 0..4 {
903 setup
904 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
905 .await
906 .unwrap();
907 }
908
909 for i in 0..4 {
912 let data = setup
913 .store
914 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
915 .await;
916 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
917 }
918
919 setup
921 .store
922 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
923 .await;
924 {
925 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
926 for i in 0..4 {
927 assert!(data.contains_key(&i));
928 }
929 };
930
931 setup
933 .store
934 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
935 .await;
936 {
937 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
938 assert!(data.contains_key(&2));
940 assert!(data.contains_key(&3));
941
942 assert!(!data.contains_key(&0));
944 assert!(!data.contains_key(&1));
945 };
946 }
947
948 #[tokio::test]
949 async fn test_reader_watermark_failure_retry() {
950 let config = ConcurrentConfig {
951 pruner: Some(PrunerConfig {
952 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
956 }),
957 ..Default::default()
958 };
959
960 let store = MockStore::default().with_reader_watermark_failures(2);
962 let setup = TestSetup::new(config, store, 0).await;
963
964 for i in 0..6 {
966 setup
967 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
968 .await
969 .unwrap();
970 }
971
972 setup
974 .store
975 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
976 .await;
977
978 tokio::time::sleep(Duration::from_secs(2)).await;
980
981 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
983 assert_eq!(watermark.reader_lo, 3);
984 }
985
986 #[tokio::test]
987 async fn test_database_connection_failure_retry() {
988 let config = ConcurrentConfig::default();
989 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
991
992 setup
994 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
995 .await
996 .unwrap();
997
998 setup
1000 .store
1001 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
1002 .await;
1003
1004 let data = setup
1006 .store
1007 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1008 .await;
1009 assert_eq!(data, vec![1, 2]);
1010 }
1011}