1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::Store;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41 Pending,
43 Ready,
45}
46
47#[async_trait]
67pub trait Handler: Processor {
68 type Store: Store;
69 type Batch: Default + Send + Sync + 'static;
70
71 const MIN_EAGER_ROWS: usize = 50;
73
74 const MAX_PENDING_ROWS: usize = 5000;
76
77 const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82 fn batch(
91 &self,
92 batch: &mut Self::Batch,
93 values: &mut std::vec::IntoIter<Self::Value>,
94 ) -> BatchStatus;
95
96 async fn commit<'a>(
98 &self,
99 batch: &Self::Batch,
100 conn: &mut <Self::Store as Store>::Connection<'a>,
101 ) -> anyhow::Result<usize>;
102
103 async fn prune<'a>(
106 &self,
107 _from: u64,
108 _to_exclusive: u64,
109 _conn: &mut <Self::Store as Store>::Connection<'a>,
110 ) -> anyhow::Result<usize> {
111 Ok(0)
112 }
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118 pub committer: CommitterConfig,
120
121 pub pruner: Option<PrunerConfig>,
123
124 pub fanout: Option<ConcurrencyConfig>,
126
127 pub min_eager_rows: Option<usize>,
129
130 pub max_pending_rows: Option<usize>,
132
133 pub max_watermark_updates: Option<usize>,
135
136 pub processor_channel_size: Option<usize>,
138
139 pub collector_channel_size: Option<usize>,
141
142 pub committer_channel_size: Option<usize>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147pub struct PrunerConfig {
148 pub interval_ms: u64,
150
151 pub delay_ms: u64,
154
155 pub retention: u64,
157
158 pub max_chunk_size: u64,
160
161 pub prune_concurrency: u64,
163}
164
165struct BatchedRows<H: Handler> {
171 batch: H::Batch,
173 batch_len: usize,
175 watermark: Vec<WatermarkPart>,
177}
178
179impl<H, V> BatchedRows<H>
180where
181 H: Handler<Batch = Vec<V>, Value = V>,
182{
183 #[cfg(test)]
184 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
185 let batch_len = batch.len();
186 Self {
187 batch,
188 batch_len,
189 watermark,
190 }
191 }
192}
193
194impl PrunerConfig {
195 pub fn interval(&self) -> Duration {
196 Duration::from_millis(self.interval_ms)
197 }
198
199 pub fn delay(&self) -> Duration {
200 Duration::from_millis(self.delay_ms)
201 }
202}
203
204impl Default for PrunerConfig {
205 fn default() -> Self {
206 Self {
207 interval_ms: 300_000,
208 delay_ms: 120_000,
209 retention: 4_000_000,
210 max_chunk_size: 2_000,
211 prune_concurrency: 1,
212 }
213 }
214}
215
216pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
236 handler: H,
237 next_checkpoint: u64,
238 config: ConcurrentConfig,
239 store: H::Store,
240 task: Option<Task>,
241 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
242 metrics: Arc<IndexerMetrics>,
243) -> Service {
244 info!(
245 pipeline = H::NAME,
246 "Starting pipeline with config: {config:#?}",
247 );
248
249 let ConcurrentConfig {
250 committer: committer_config,
251 pruner: pruner_config,
252 fanout,
253 min_eager_rows,
254 max_pending_rows,
255 max_watermark_updates,
256 processor_channel_size,
257 collector_channel_size,
258 committer_channel_size,
259 } = config;
260
261 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
262 initial: 1,
263 min: 1,
264 max: num_cpus::get(),
265 dead_band: None,
266 });
267 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
268 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
269 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
270
271 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
272 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
273
274 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
275 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
277 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
279 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
280 let main_reader_lo = Arc::new(SetOnce::new());
281
282 let handler = Arc::new(handler);
283
284 let s_processor = processor(
285 handler.clone(),
286 checkpoint_rx,
287 processor_tx,
288 metrics.clone(),
289 concurrency,
290 );
291
292 let s_collector = collector::<H>(
293 handler.clone(),
294 committer_config.clone(),
295 collector_rx,
296 collector_tx,
297 main_reader_lo.clone(),
298 metrics.clone(),
299 min_eager_rows,
300 max_pending_rows,
301 max_watermark_updates,
302 );
303
304 let s_committer = committer::<H>(
305 handler.clone(),
306 committer_config.clone(),
307 committer_rx,
308 committer_tx,
309 store.clone(),
310 metrics.clone(),
311 );
312
313 let s_commit_watermark = commit_watermark::<H>(
314 next_checkpoint,
315 committer_config,
316 watermark_rx,
317 store.clone(),
318 task.as_ref().map(|t| t.task.clone()),
319 metrics.clone(),
320 );
321
322 let s_track_reader_lo = track_main_reader_lo::<H>(
323 main_reader_lo.clone(),
324 task.as_ref().map(|t| t.reader_interval),
325 store.clone(),
326 );
327
328 let s_reader_watermark =
329 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
330
331 let s_pruner = pruner(handler, pruner_config, store, metrics);
332
333 s_processor
334 .merge(s_collector)
335 .merge(s_committer)
336 .merge(s_commit_watermark)
337 .attach(s_track_reader_lo)
338 .attach(s_reader_watermark)
339 .attach(s_pruner)
340}
341
342#[cfg(test)]
343mod tests {
344 use std::sync::Arc;
345 use std::time::Duration;
346
347 use prometheus::Registry;
348 use sui_types::digests::CheckpointDigest;
349 use tokio::sync::mpsc;
350 use tokio::time::timeout;
351
352 use crate::FieldCount;
353 use crate::metrics::IndexerMetrics;
354 use crate::mocks::store::MockConnection;
355 use crate::mocks::store::MockStore;
356 use crate::pipeline::Processor;
357 use crate::types::full_checkpoint_content::Checkpoint;
358 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
359
360 use super::*;
361
362 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
363 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
366 struct TestValue {
367 checkpoint: u64,
368 data: u64,
369 }
370
371 struct DataPipeline;
372
373 #[async_trait]
374 impl Processor for DataPipeline {
375 const NAME: &'static str = "test_handler";
376 type Value = TestValue;
377
378 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
379 let cp_num = checkpoint.summary.sequence_number;
380
381 Ok(vec![
383 TestValue {
384 checkpoint: cp_num,
385 data: cp_num * 10 + 1,
386 },
387 TestValue {
388 checkpoint: cp_num,
389 data: cp_num * 10 + 2,
390 },
391 ])
392 }
393 }
394
395 #[async_trait]
396 impl Handler for DataPipeline {
397 type Store = MockStore;
398 type Batch = Vec<TestValue>;
399
400 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
405 &self,
406 batch: &mut Self::Batch,
407 values: &mut std::vec::IntoIter<Self::Value>,
408 ) -> BatchStatus {
409 batch.extend(values);
411 BatchStatus::Pending
412 }
413
414 async fn commit<'a>(
415 &self,
416 batch: &Self::Batch,
417 conn: &mut MockConnection<'a>,
418 ) -> anyhow::Result<usize> {
419 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
421 std::collections::HashMap::new();
422 for value in batch {
423 grouped
424 .entry(value.checkpoint)
425 .or_default()
426 .push(value.data);
427 }
428
429 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
431 }
432
433 async fn prune<'a>(
434 &self,
435 from: u64,
436 to_exclusive: u64,
437 conn: &mut MockConnection<'a>,
438 ) -> anyhow::Result<usize> {
439 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
440 }
441 }
442
443 struct TestSetup {
444 store: MockStore,
445 checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
446 #[allow(unused)]
447 pipeline: Service,
448 }
449
450 impl TestSetup {
451 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
452 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
453 let metrics = IndexerMetrics::new(None, &Registry::default());
454
455 let pipeline = pipeline(
456 DataPipeline,
457 next_checkpoint,
458 config,
459 store.clone(),
460 None,
461 checkpoint_rx,
462 metrics,
463 );
464
465 Self {
466 store,
467 checkpoint_tx,
468 pipeline,
469 }
470 }
471
472 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
473 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
474 checkpoint: Arc::new(
475 TestCheckpointBuilder::new(checkpoint)
476 .with_epoch(1)
477 .with_network_total_transactions(checkpoint * 2)
478 .with_timestamp_ms(1000000000 + checkpoint * 1000)
479 .build_checkpoint(),
480 ),
481 chain_id: CheckpointDigest::new([1; 32]).into(),
482 });
483 self.checkpoint_tx.send(checkpoint_envelope).await?;
484 Ok(())
485 }
486
487 async fn send_checkpoint_with_timeout(
488 &self,
489 checkpoint: u64,
490 timeout_duration: Duration,
491 ) -> anyhow::Result<()> {
492 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
493 }
494
495 async fn send_checkpoint_expect_timeout(
496 &self,
497 checkpoint: u64,
498 timeout_duration: Duration,
499 ) {
500 timeout(timeout_duration, self.send_checkpoint(checkpoint))
501 .await
502 .unwrap_err(); }
504 }
505
506 #[tokio::test]
507 async fn test_e2e_pipeline() {
508 let config = ConcurrentConfig {
509 pruner: Some(PrunerConfig {
510 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
514 }),
515 ..Default::default()
516 };
517 let store = MockStore::default();
518 let setup = TestSetup::new(config, store, 0).await;
519
520 for i in 0..3 {
522 setup
523 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
524 .await
525 .unwrap();
526 }
527
528 for i in 0..3 {
530 let data = setup
531 .store
532 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
533 .await;
534 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
535 }
536
537 for i in 3..6 {
539 setup
540 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
541 .await
542 .unwrap();
543 }
544
545 for i in 0..6 {
548 let data = setup
549 .store
550 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
551 .await;
552 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
553 }
554
555 tokio::time::sleep(Duration::from_millis(5_200)).await;
557
558 {
560 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
561
562 assert!(data.contains_key(&3));
564 assert!(data.contains_key(&4));
565 assert!(data.contains_key(&5));
566
567 assert!(!data.contains_key(&0));
569 assert!(!data.contains_key(&1));
570 assert!(!data.contains_key(&2));
571 };
572 }
573
574 #[tokio::test]
575 async fn test_e2e_pipeline_without_pruning() {
576 let config = ConcurrentConfig {
577 pruner: None,
578 ..Default::default()
579 };
580 let store = MockStore::default();
581 let setup = TestSetup::new(config, store, 0).await;
582
583 for i in 0..10 {
585 setup
586 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
587 .await
588 .unwrap();
589 }
590
591 let watermark = setup
593 .store
594 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
595 .await;
596
597 for i in 0..10 {
599 let data = setup
600 .store
601 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
602 .await;
603 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
604 }
605
606 assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
608 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
613 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
614 data.len()
615 };
616 assert_eq!(total_checkpoints, 10);
617 }
618
619 #[tokio::test]
620 async fn test_out_of_order_processing() {
621 let config = ConcurrentConfig::default();
622 let store = MockStore::default();
623 let setup = TestSetup::new(config, store, 0).await;
624
625 let checkpoints = vec![2, 0, 4, 1, 3];
627 for cp in checkpoints {
628 setup
629 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
630 .await
631 .unwrap();
632 }
633
634 setup
636 .store
637 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
638 .await;
639
640 for i in 0..5 {
642 let data = setup
643 .store
644 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
645 .await;
646 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
647 }
648 }
649
650 #[tokio::test]
651 async fn test_watermark_progression_with_gaps() {
652 let config = ConcurrentConfig::default();
653 let store = MockStore::default();
654 let setup = TestSetup::new(config, store, 0).await;
655
656 for cp in [0, 1, 3, 4] {
658 setup
659 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
660 .await
661 .unwrap();
662 }
663
664 tokio::time::sleep(Duration::from_secs(1)).await;
666
667 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
669 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
670
671 setup
673 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
674 .await
675 .unwrap();
676
677 let watermark = setup
679 .store
680 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
681 .await;
682 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
683 }
684
685 #[tokio::test]
688 async fn test_back_pressure_collector_max_pending_rows() {
689 let config = ConcurrentConfig {
702 committer: CommitterConfig {
703 collect_interval_ms: 5_000, write_concurrency: 1,
705 ..Default::default()
706 },
707 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
708 processor_channel_size: Some(7),
709 collector_channel_size: Some(6),
710 ..Default::default()
711 };
712 let store = MockStore::default();
713 let setup = TestSetup::new(config, store, 0).await;
714
715 tokio::time::sleep(Duration::from_millis(200)).await;
717
718 for i in 0..14 {
731 setup
732 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
733 .await
734 .unwrap();
735 }
736
737 setup
739 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
740 .await;
741
742 setup
744 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
745 .await
746 .unwrap();
747
748 let data = setup
750 .store
751 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
752 .await;
753 assert_eq!(data, vec![1, 2]);
754 }
755
756 #[tokio::test]
757 async fn test_back_pressure_committer_slow_commits() {
758 let config = ConcurrentConfig {
771 committer: CommitterConfig {
772 write_concurrency: 1, collect_interval_ms: 10,
777 ..Default::default()
778 },
779 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
780 processor_channel_size: Some(7),
781 collector_channel_size: Some(6),
782 ..Default::default()
783 };
784 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
786
787 for i in 0..19 {
801 setup
802 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
803 .await
804 .unwrap();
805 }
806
807 let mut back_pressure_checkpoint = None;
812 for checkpoint in 19..22 {
813 if setup
814 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
815 .await
816 .is_err()
817 {
818 back_pressure_checkpoint = Some(checkpoint);
819 break;
820 }
821 }
822 assert!(
823 back_pressure_checkpoint.is_some(),
824 "Back pressure should occur between checkpoints 19-21"
825 );
826
827 setup
829 .store
830 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
831 .await;
832
833 setup
835 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
836 .await
837 .unwrap();
838 }
839
840 #[tokio::test]
843 async fn test_commit_failure_retry() {
844 let config = ConcurrentConfig::default();
845 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
847
848 setup
850 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
851 .await
852 .unwrap();
853
854 setup
856 .store
857 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
858 .await;
859
860 let data = setup
862 .store
863 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
864 .await;
865 assert_eq!(data, vec![1, 2]);
866 }
867
868 #[tokio::test]
869 async fn test_prune_failure_retry() {
870 let config = ConcurrentConfig {
871 pruner: Some(PrunerConfig {
872 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
876 }),
877 ..Default::default()
878 };
879
880 let store = MockStore::default().with_prune_failures(0, 2, 1);
882 let setup = TestSetup::new(config, store, 0).await;
883
884 for i in 0..4 {
886 setup
887 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
888 .await
889 .unwrap();
890 }
891
892 for i in 0..4 {
895 let data = setup
896 .store
897 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
898 .await;
899 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
900 }
901
902 setup
904 .store
905 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
906 .await;
907 {
908 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
909 for i in 0..4 {
910 assert!(data.contains_key(&i));
911 }
912 };
913
914 setup
916 .store
917 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
918 .await;
919 {
920 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
921 assert!(data.contains_key(&2));
923 assert!(data.contains_key(&3));
924
925 assert!(!data.contains_key(&0));
927 assert!(!data.contains_key(&1));
928 };
929 }
930
931 #[tokio::test]
932 async fn test_reader_watermark_failure_retry() {
933 let config = ConcurrentConfig {
934 pruner: Some(PrunerConfig {
935 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
939 }),
940 ..Default::default()
941 };
942
943 let store = MockStore::default().with_reader_watermark_failures(2);
945 let setup = TestSetup::new(config, store, 0).await;
946
947 for i in 0..6 {
949 setup
950 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
951 .await
952 .unwrap();
953 }
954
955 setup
957 .store
958 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
959 .await;
960
961 tokio::time::sleep(Duration::from_secs(2)).await;
963
964 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
966 assert_eq!(watermark.reader_lo, 3);
967 }
968
969 #[tokio::test]
970 async fn test_database_connection_failure_retry() {
971 let config = ConcurrentConfig::default();
972 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
974
975 setup
977 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
978 .await
979 .unwrap();
980
981 setup
983 .store
984 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
985 .await;
986
987 let data = setup
989 .store
990 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
991 .await;
992 assert_eq!(data, vec![1, 2]);
993 }
994}