1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::ConcurrentStore;
30use crate::store::Store;
31
32mod collector;
33mod commit_watermark;
34mod committer;
35mod main_reader_lo;
36mod pruner;
37mod reader_watermark;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum BatchStatus {
42 Pending,
44 Ready,
46}
47
48#[async_trait]
68pub trait Handler: Processor {
69 type Store: ConcurrentStore;
70 type Batch: Default + Send + Sync + 'static;
71
72 const MIN_EAGER_ROWS: usize = 50;
74
75 const MAX_PENDING_ROWS: usize = 5000;
77
78 const MAX_WATERMARK_UPDATES: usize = 10_000;
82
83 fn batch(
92 &self,
93 batch: &mut Self::Batch,
94 values: &mut std::vec::IntoIter<Self::Value>,
95 ) -> BatchStatus;
96
97 async fn commit<'a>(
99 &self,
100 batch: &Self::Batch,
101 conn: &mut <Self::Store as Store>::Connection<'a>,
102 ) -> anyhow::Result<usize>;
103
104 async fn prune<'a>(
107 &self,
108 _from: u64,
109 _to_exclusive: u64,
110 _conn: &mut <Self::Store as Store>::Connection<'a>,
111 ) -> anyhow::Result<usize> {
112 Ok(0)
113 }
114}
115
116#[derive(Serialize, Deserialize, Debug, Clone, Default)]
118pub struct ConcurrentConfig {
119 pub committer: CommitterConfig,
121
122 pub pruner: Option<PrunerConfig>,
124
125 pub fanout: Option<ConcurrencyConfig>,
127
128 pub min_eager_rows: Option<usize>,
130
131 pub max_pending_rows: Option<usize>,
133
134 pub max_watermark_updates: Option<usize>,
136
137 pub processor_channel_size: Option<usize>,
139
140 pub collector_channel_size: Option<usize>,
142
143 pub committer_channel_size: Option<usize>,
145}
146
147#[derive(Serialize, Deserialize, Debug, Clone)]
148pub struct PrunerConfig {
149 pub interval_ms: u64,
151
152 pub delay_ms: u64,
155
156 pub retention: u64,
158
159 pub max_chunk_size: u64,
161
162 pub prune_concurrency: u64,
164}
165
166struct BatchedRows<H: Handler> {
172 batch: H::Batch,
174 batch_len: usize,
176 watermark: Vec<WatermarkPart>,
178}
179
180impl<H, V> BatchedRows<H>
181where
182 H: Handler<Batch = Vec<V>, Value = V>,
183{
184 #[cfg(test)]
185 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
186 let batch_len = batch.len();
187 Self {
188 batch,
189 batch_len,
190 watermark,
191 }
192 }
193}
194
195impl PrunerConfig {
196 pub fn interval(&self) -> Duration {
197 Duration::from_millis(self.interval_ms)
198 }
199
200 pub fn delay(&self) -> Duration {
201 Duration::from_millis(self.delay_ms)
202 }
203}
204
205impl Default for PrunerConfig {
206 fn default() -> Self {
207 Self {
208 interval_ms: 300_000,
209 delay_ms: 120_000,
210 retention: 4_000_000,
211 max_chunk_size: 2_000,
212 prune_concurrency: 1,
213 }
214 }
215}
216
217pub(crate) fn pipeline<H: Handler>(
237 handler: H,
238 next_checkpoint: u64,
239 config: ConcurrentConfig,
240 store: H::Store,
241 task: Option<Task>,
242 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
243 metrics: Arc<IndexerMetrics>,
244) -> Service {
245 info!(
246 pipeline = H::NAME,
247 "Starting pipeline with config: {config:#?}",
248 );
249
250 let ConcurrentConfig {
251 committer: committer_config,
252 pruner: pruner_config,
253 fanout,
254 min_eager_rows,
255 max_pending_rows,
256 max_watermark_updates,
257 processor_channel_size,
258 collector_channel_size,
259 committer_channel_size,
260 } = config;
261
262 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
263 initial: 1,
264 min: 1,
265 max: num_cpus::get(),
266 dead_band: None,
267 });
268 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
269 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
270 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
271
272 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
273 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
274
275 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
276 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
278 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
280 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
281 let main_reader_lo = Arc::new(SetOnce::new());
282
283 let handler = Arc::new(handler);
284
285 let s_processor = processor(
286 handler.clone(),
287 checkpoint_rx,
288 processor_tx,
289 metrics.clone(),
290 concurrency,
291 store.clone(),
292 );
293
294 let s_collector = collector::<H>(
295 handler.clone(),
296 committer_config.clone(),
297 collector_rx,
298 collector_tx,
299 main_reader_lo.clone(),
300 metrics.clone(),
301 min_eager_rows,
302 max_pending_rows,
303 max_watermark_updates,
304 );
305
306 let s_committer = committer::<H>(
307 handler.clone(),
308 committer_config.clone(),
309 committer_rx,
310 committer_tx,
311 store.clone(),
312 metrics.clone(),
313 );
314
315 let s_commit_watermark = commit_watermark::<H>(
316 next_checkpoint,
317 committer_config,
318 watermark_rx,
319 store.clone(),
320 task.as_ref().map(|t| t.task.clone()),
321 metrics.clone(),
322 );
323
324 let s_track_reader_lo = track_main_reader_lo::<H>(
325 main_reader_lo.clone(),
326 task.as_ref().map(|t| t.reader_interval),
327 store.clone(),
328 );
329
330 let s_reader_watermark =
331 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
332
333 let s_pruner = pruner(handler, pruner_config, store, metrics);
334
335 s_processor
336 .merge(s_collector)
337 .merge(s_committer)
338 .merge(s_commit_watermark)
339 .attach(s_track_reader_lo)
340 .attach(s_reader_watermark)
341 .attach(s_pruner)
342}
343
344#[cfg(test)]
345mod tests {
346 use std::sync::Arc;
347 use std::time::Duration;
348
349 use prometheus::Registry;
350 use sui_types::digests::CheckpointDigest;
351 use tokio::sync::mpsc;
352 use tokio::time::timeout;
353
354 use crate::FieldCount;
355 use crate::metrics::IndexerMetrics;
356 use crate::mocks::store::MockConnection;
357 use crate::mocks::store::MockStore;
358 use crate::pipeline::Processor;
359 use crate::types::full_checkpoint_content::Checkpoint;
360 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
361
362 use super::*;
363
364 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
365 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
368 struct TestValue {
369 checkpoint: u64,
370 data: u64,
371 }
372
373 struct DataPipeline;
374
375 #[async_trait]
376 impl Processor for DataPipeline {
377 const NAME: &'static str = "test_handler";
378 type Value = TestValue;
379
380 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
381 let cp_num = checkpoint.summary.sequence_number;
382
383 Ok(vec![
385 TestValue {
386 checkpoint: cp_num,
387 data: cp_num * 10 + 1,
388 },
389 TestValue {
390 checkpoint: cp_num,
391 data: cp_num * 10 + 2,
392 },
393 ])
394 }
395 }
396
397 #[async_trait]
398 impl Handler for DataPipeline {
399 type Store = MockStore;
400 type Batch = Vec<TestValue>;
401
402 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
407 &self,
408 batch: &mut Self::Batch,
409 values: &mut std::vec::IntoIter<Self::Value>,
410 ) -> BatchStatus {
411 batch.extend(values);
413 BatchStatus::Pending
414 }
415
416 async fn commit<'a>(
417 &self,
418 batch: &Self::Batch,
419 conn: &mut MockConnection<'a>,
420 ) -> anyhow::Result<usize> {
421 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
423 std::collections::HashMap::new();
424 for value in batch {
425 grouped
426 .entry(value.checkpoint)
427 .or_default()
428 .push(value.data);
429 }
430
431 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
433 }
434
435 async fn prune<'a>(
436 &self,
437 from: u64,
438 to_exclusive: u64,
439 conn: &mut MockConnection<'a>,
440 ) -> anyhow::Result<usize> {
441 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
442 }
443 }
444
445 struct TestSetup {
446 store: MockStore,
447 checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
448 #[allow(unused)]
449 pipeline: Service,
450 }
451
452 impl TestSetup {
453 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
454 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
455 let metrics = IndexerMetrics::new(None, &Registry::default());
456
457 let pipeline = pipeline(
458 DataPipeline,
459 next_checkpoint,
460 config,
461 store.clone(),
462 None,
463 checkpoint_rx,
464 metrics,
465 );
466
467 Self {
468 store,
469 checkpoint_tx,
470 pipeline,
471 }
472 }
473
474 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
475 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
476 checkpoint: Arc::new(
477 TestCheckpointBuilder::new(checkpoint)
478 .with_epoch(1)
479 .with_network_total_transactions(checkpoint * 2)
480 .with_timestamp_ms(1000000000 + checkpoint * 1000)
481 .build_checkpoint(),
482 ),
483 chain_id: CheckpointDigest::new([1; 32]).into(),
484 });
485 self.checkpoint_tx.send(checkpoint_envelope).await?;
486 Ok(())
487 }
488
489 async fn send_checkpoint_with_timeout(
490 &self,
491 checkpoint: u64,
492 timeout_duration: Duration,
493 ) -> anyhow::Result<()> {
494 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
495 }
496
497 async fn send_checkpoint_expect_timeout(
498 &self,
499 checkpoint: u64,
500 timeout_duration: Duration,
501 ) {
502 timeout(timeout_duration, self.send_checkpoint(checkpoint))
503 .await
504 .unwrap_err(); }
506 }
507
508 #[tokio::test]
509 async fn test_e2e_pipeline() {
510 let config = ConcurrentConfig {
511 pruner: Some(PrunerConfig {
512 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
516 }),
517 ..Default::default()
518 };
519 let store = MockStore::default();
520 let setup = TestSetup::new(config, store, 0).await;
521
522 for i in 0..3 {
524 setup
525 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
526 .await
527 .unwrap();
528 }
529
530 for i in 0..3 {
532 let data = setup
533 .store
534 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
535 .await;
536 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
537 }
538
539 for i in 3..6 {
541 setup
542 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
543 .await
544 .unwrap();
545 }
546
547 for i in 0..6 {
550 let data = setup
551 .store
552 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
553 .await;
554 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
555 }
556
557 tokio::time::sleep(Duration::from_millis(5_200)).await;
559
560 {
562 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
563
564 assert!(data.contains_key(&3));
566 assert!(data.contains_key(&4));
567 assert!(data.contains_key(&5));
568
569 assert!(!data.contains_key(&0));
571 assert!(!data.contains_key(&1));
572 assert!(!data.contains_key(&2));
573 };
574 }
575
576 #[tokio::test]
577 async fn test_e2e_pipeline_without_pruning() {
578 let config = ConcurrentConfig {
579 pruner: None,
580 ..Default::default()
581 };
582 let store = MockStore::default();
583 let setup = TestSetup::new(config, store, 0).await;
584
585 for i in 0..10 {
587 setup
588 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
589 .await
590 .unwrap();
591 }
592
593 let watermark = setup
595 .store
596 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
597 .await;
598
599 for i in 0..10 {
601 let data = setup
602 .store
603 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
604 .await;
605 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
606 }
607
608 assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
610 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
615 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
616 data.len()
617 };
618 assert_eq!(total_checkpoints, 10);
619 }
620
621 #[tokio::test]
622 async fn test_out_of_order_processing() {
623 let config = ConcurrentConfig::default();
624 let store = MockStore::default();
625 let setup = TestSetup::new(config, store, 0).await;
626
627 let checkpoints = vec![2, 0, 4, 1, 3];
629 for cp in checkpoints {
630 setup
631 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
632 .await
633 .unwrap();
634 }
635
636 setup
638 .store
639 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
640 .await;
641
642 for i in 0..5 {
644 let data = setup
645 .store
646 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
647 .await;
648 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
649 }
650 }
651
652 #[tokio::test]
653 async fn test_watermark_progression_with_gaps() {
654 let config = ConcurrentConfig::default();
655 let store = MockStore::default();
656 let setup = TestSetup::new(config, store, 0).await;
657
658 for cp in [0, 1, 3, 4] {
660 setup
661 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
662 .await
663 .unwrap();
664 }
665
666 tokio::time::sleep(Duration::from_secs(1)).await;
668
669 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
671 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
672
673 setup
675 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
676 .await
677 .unwrap();
678
679 let watermark = setup
681 .store
682 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
683 .await;
684 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
685 }
686
687 #[tokio::test]
690 async fn test_back_pressure_collector_max_pending_rows() {
691 let config = ConcurrentConfig {
704 committer: CommitterConfig {
705 collect_interval_ms: 5_000, write_concurrency: 1,
707 ..Default::default()
708 },
709 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
710 processor_channel_size: Some(7),
711 collector_channel_size: Some(6),
712 ..Default::default()
713 };
714 let store = MockStore::default();
715 let setup = TestSetup::new(config, store, 0).await;
716
717 tokio::time::sleep(Duration::from_millis(200)).await;
719
720 for i in 0..14 {
733 setup
734 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
735 .await
736 .unwrap();
737 }
738
739 setup
741 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
742 .await;
743
744 setup
746 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
747 .await
748 .unwrap();
749
750 let data = setup
752 .store
753 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
754 .await;
755 assert_eq!(data, vec![1, 2]);
756 }
757
758 #[tokio::test]
759 async fn test_back_pressure_committer_slow_commits() {
760 let config = ConcurrentConfig {
773 committer: CommitterConfig {
774 write_concurrency: 1, collect_interval_ms: 10,
779 ..Default::default()
780 },
781 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
782 processor_channel_size: Some(7),
783 collector_channel_size: Some(6),
784 ..Default::default()
785 };
786 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
788
789 for i in 0..19 {
803 setup
804 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
805 .await
806 .unwrap();
807 }
808
809 let mut back_pressure_checkpoint = None;
814 for checkpoint in 19..22 {
815 if setup
816 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
817 .await
818 .is_err()
819 {
820 back_pressure_checkpoint = Some(checkpoint);
821 break;
822 }
823 }
824 assert!(
825 back_pressure_checkpoint.is_some(),
826 "Back pressure should occur between checkpoints 19-21"
827 );
828
829 setup
831 .store
832 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
833 .await;
834
835 setup
837 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
838 .await
839 .unwrap();
840 }
841
842 #[tokio::test]
845 async fn test_commit_failure_retry() {
846 let config = ConcurrentConfig::default();
847 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
849
850 setup
852 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
853 .await
854 .unwrap();
855
856 setup
858 .store
859 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
860 .await;
861
862 let data = setup
864 .store
865 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
866 .await;
867 assert_eq!(data, vec![1, 2]);
868 }
869
870 #[tokio::test]
871 async fn test_prune_failure_retry() {
872 let config = ConcurrentConfig {
873 pruner: Some(PrunerConfig {
874 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
878 }),
879 ..Default::default()
880 };
881
882 let store = MockStore::default().with_prune_failures(0, 2, 1);
884 let setup = TestSetup::new(config, store, 0).await;
885
886 for i in 0..4 {
888 setup
889 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
890 .await
891 .unwrap();
892 }
893
894 for i in 0..4 {
897 let data = setup
898 .store
899 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
900 .await;
901 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
902 }
903
904 setup
906 .store
907 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
908 .await;
909 {
910 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
911 for i in 0..4 {
912 assert!(data.contains_key(&i));
913 }
914 };
915
916 setup
918 .store
919 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
920 .await;
921 {
922 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
923 assert!(data.contains_key(&2));
925 assert!(data.contains_key(&3));
926
927 assert!(!data.contains_key(&0));
929 assert!(!data.contains_key(&1));
930 };
931 }
932
933 #[tokio::test]
934 async fn test_reader_watermark_failure_retry() {
935 let config = ConcurrentConfig {
936 pruner: Some(PrunerConfig {
937 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
941 }),
942 ..Default::default()
943 };
944
945 let store = MockStore::default().with_reader_watermark_failures(2);
947 let setup = TestSetup::new(config, store, 0).await;
948
949 for i in 0..6 {
951 setup
952 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
953 .await
954 .unwrap();
955 }
956
957 setup
959 .store
960 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
961 .await;
962
963 tokio::time::sleep(Duration::from_secs(2)).await;
965
966 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
968 assert_eq!(watermark.reader_lo, 3);
969 }
970
971 #[tokio::test]
972 async fn test_database_connection_failure_retry() {
973 let config = ConcurrentConfig::default();
974 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
976
977 setup
979 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
980 .await
981 .unwrap();
982
983 setup
985 .store
986 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
987 .await;
988
989 let data = setup
991 .store
992 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
993 .await;
994 assert_eq!(data, vec![1, 2]);
995 }
996}