1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::Processor;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::collector::collector;
23use crate::pipeline::concurrent::commit_watermark::commit_watermark;
24use crate::pipeline::concurrent::committer::committer;
25use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
26use crate::pipeline::concurrent::pruner::pruner;
27use crate::pipeline::concurrent::reader_watermark::reader_watermark;
28use crate::pipeline::processor::processor;
29use crate::store::ConcurrentStore;
30use crate::store::Store;
31
32mod collector;
33mod commit_watermark;
34mod committer;
35mod main_reader_lo;
36mod pruner;
37mod reader_watermark;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum BatchStatus {
42 Pending,
44 Ready,
46}
47
48#[async_trait]
68pub trait Handler: Processor {
69 type Store: ConcurrentStore;
70 type Batch: Default + Send + Sync + 'static;
71
72 const MIN_EAGER_ROWS: usize = 50;
74
75 const MAX_PENDING_ROWS: usize = 5000;
77
78 const MAX_WATERMARK_UPDATES: usize = 10_000;
82
83 fn batch(
92 &self,
93 batch: &mut Self::Batch,
94 values: &mut std::vec::IntoIter<Self::Value>,
95 ) -> BatchStatus;
96
97 async fn commit<'a>(
99 &self,
100 batch: &Self::Batch,
101 conn: &mut <Self::Store as Store>::Connection<'a>,
102 ) -> anyhow::Result<usize>;
103
104 async fn prune<'a>(
107 &self,
108 _from: u64,
109 _to_exclusive: u64,
110 _conn: &mut <Self::Store as Store>::Connection<'a>,
111 ) -> anyhow::Result<usize> {
112 Ok(0)
113 }
114}
115
116#[derive(Serialize, Deserialize, Debug, Clone, Default)]
118pub struct ConcurrentConfig {
119 pub committer: CommitterConfig,
121
122 pub pruner: Option<PrunerConfig>,
124
125 pub fanout: Option<ConcurrencyConfig>,
127
128 pub min_eager_rows: Option<usize>,
130
131 pub max_pending_rows: Option<usize>,
133
134 pub max_watermark_updates: Option<usize>,
136
137 pub processor_channel_size: Option<usize>,
139
140 pub collector_channel_size: Option<usize>,
142
143 pub committer_channel_size: Option<usize>,
145}
146
147#[derive(Serialize, Deserialize, Debug, Clone)]
148pub struct PrunerConfig {
149 pub interval_ms: u64,
151
152 pub delay_ms: u64,
155
156 pub retention: u64,
158
159 pub max_chunk_size: u64,
161
162 pub prune_concurrency: u64,
164}
165
166struct BatchedRows<H: Handler> {
172 batch: H::Batch,
174 batch_len: usize,
176 watermark: Vec<WatermarkPart>,
178}
179
180impl<H, V> BatchedRows<H>
181where
182 H: Handler<Batch = Vec<V>, Value = V>,
183{
184 #[cfg(test)]
185 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
186 let batch_len = batch.len();
187 Self {
188 batch,
189 batch_len,
190 watermark,
191 }
192 }
193}
194
195impl PrunerConfig {
196 pub fn interval(&self) -> Duration {
197 Duration::from_millis(self.interval_ms)
198 }
199
200 pub fn delay(&self) -> Duration {
201 Duration::from_millis(self.delay_ms)
202 }
203}
204
205impl Default for PrunerConfig {
206 fn default() -> Self {
207 Self {
208 interval_ms: 300_000,
209 delay_ms: 120_000,
210 retention: 4_000_000,
211 max_chunk_size: 2_000,
212 prune_concurrency: 1,
213 }
214 }
215}
216
217pub(crate) fn pipeline<H: Handler>(
237 handler: H,
238 next_checkpoint: u64,
239 config: ConcurrentConfig,
240 store: H::Store,
241 task: Option<Task>,
242 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
243 metrics: Arc<IndexerMetrics>,
244) -> Service {
245 info!(
246 pipeline = H::NAME,
247 "Starting pipeline with config: {config:#?}",
248 );
249
250 let ConcurrentConfig {
251 committer: committer_config,
252 pruner: pruner_config,
253 fanout,
254 min_eager_rows,
255 max_pending_rows,
256 max_watermark_updates,
257 processor_channel_size,
258 collector_channel_size,
259 committer_channel_size,
260 } = config;
261
262 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
263 initial: 1,
264 min: 1,
265 max: num_cpus::get(),
266 dead_band: None,
267 });
268 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
269 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
270 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
271
272 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
273 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
274
275 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
276 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
278 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
280 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
281 let main_reader_lo = Arc::new(SetOnce::new());
282
283 let handler = Arc::new(handler);
284
285 let s_processor = processor(
286 handler.clone(),
287 checkpoint_rx,
288 processor_tx,
289 metrics.clone(),
290 concurrency,
291 store.clone(),
292 );
293
294 let s_collector = collector::<H>(
295 handler.clone(),
296 committer_config.clone(),
297 collector_rx,
298 collector_tx,
299 main_reader_lo.clone(),
300 metrics.clone(),
301 min_eager_rows,
302 max_pending_rows,
303 max_watermark_updates,
304 );
305
306 let s_committer = committer::<H>(
307 handler.clone(),
308 committer_config.clone(),
309 committer_rx,
310 committer_tx,
311 store.clone(),
312 metrics.clone(),
313 );
314
315 let s_commit_watermark = commit_watermark::<H>(
316 next_checkpoint,
317 committer_config,
318 watermark_rx,
319 store.clone(),
320 task.as_ref().map(|t| t.task.clone()),
321 metrics.clone(),
322 );
323
324 let s_track_reader_lo = track_main_reader_lo::<H>(
325 main_reader_lo.clone(),
326 task.as_ref().map(|t| t.reader_interval),
327 store.clone(),
328 );
329
330 let s_reader_watermark =
331 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
332
333 let s_pruner = pruner(handler, pruner_config, store, metrics);
334
335 s_processor
336 .merge(s_collector)
337 .merge(s_committer)
338 .merge(s_commit_watermark)
339 .attach(s_track_reader_lo)
340 .attach(s_reader_watermark)
341 .attach(s_pruner)
342}
343
344#[cfg(test)]
345mod tests {
346 use std::sync::Arc;
347 use std::time::Duration;
348
349 use prometheus::Registry;
350 use sui_types::digests::CheckpointDigest;
351 use tokio::sync::mpsc;
352 use tokio::time::timeout;
353
354 use crate::FieldCount;
355 use crate::metrics::IndexerMetrics;
356 use crate::mocks::store::MockConnection;
357 use crate::mocks::store::MockStore;
358 use crate::pipeline::Processor;
359 use crate::types::full_checkpoint_content::Checkpoint;
360 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
361
362 use super::*;
363
364 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
365 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
368 struct TestValue {
369 checkpoint: u64,
370 data: u64,
371 }
372
373 struct DataPipeline;
374
375 #[async_trait]
376 impl Processor for DataPipeline {
377 const NAME: &'static str = "test_handler";
378 type Value = TestValue;
379
380 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
381 let cp_num = checkpoint.summary.sequence_number;
382
383 Ok(vec![
385 TestValue {
386 checkpoint: cp_num,
387 data: cp_num * 10 + 1,
388 },
389 TestValue {
390 checkpoint: cp_num,
391 data: cp_num * 10 + 2,
392 },
393 ])
394 }
395 }
396
397 #[async_trait]
398 impl Handler for DataPipeline {
399 type Store = MockStore;
400 type Batch = Vec<TestValue>;
401
402 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
407 &self,
408 batch: &mut Self::Batch,
409 values: &mut std::vec::IntoIter<Self::Value>,
410 ) -> BatchStatus {
411 batch.extend(values);
413 BatchStatus::Pending
414 }
415
416 async fn commit<'a>(
417 &self,
418 batch: &Self::Batch,
419 conn: &mut MockConnection<'a>,
420 ) -> anyhow::Result<usize> {
421 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
423 std::collections::HashMap::new();
424 for value in batch {
425 grouped
426 .entry(value.checkpoint)
427 .or_default()
428 .push(value.data);
429 }
430
431 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
433 }
434
435 async fn prune<'a>(
436 &self,
437 from: u64,
438 to_exclusive: u64,
439 conn: &mut MockConnection<'a>,
440 ) -> anyhow::Result<usize> {
441 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
442 }
443 }
444
445 struct TestSetup {
446 store: MockStore,
447 checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
448 #[allow(unused)]
449 pipeline: Service,
450 }
451
452 impl TestSetup {
453 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
454 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
455 let metrics = IndexerMetrics::new(None, &Registry::default());
456
457 let pipeline = pipeline(
458 DataPipeline,
459 next_checkpoint,
460 config,
461 store.clone(),
462 None,
463 checkpoint_rx,
464 metrics,
465 );
466
467 Self {
468 store,
469 checkpoint_tx,
470 pipeline,
471 }
472 }
473
474 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
475 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
476 checkpoint: Arc::new(
477 TestCheckpointBuilder::new(checkpoint)
478 .with_epoch(1)
479 .with_network_total_transactions(checkpoint * 2)
480 .with_timestamp_ms(1000000000 + checkpoint * 1000)
481 .build_checkpoint(),
482 ),
483 chain_id: CheckpointDigest::new([1; 32]).into(),
484 });
485 self.checkpoint_tx.send(checkpoint_envelope).await?;
486 Ok(())
487 }
488
489 async fn send_checkpoint_with_timeout(
490 &self,
491 checkpoint: u64,
492 timeout_duration: Duration,
493 ) -> anyhow::Result<()> {
494 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
495 }
496
497 async fn send_checkpoint_expect_timeout(
498 &self,
499 checkpoint: u64,
500 timeout_duration: Duration,
501 ) {
502 timeout(timeout_duration, self.send_checkpoint(checkpoint))
503 .await
504 .unwrap_err(); }
506 }
507
508 #[tokio::test]
509 async fn test_e2e_pipeline() {
510 let config = ConcurrentConfig {
511 pruner: Some(PrunerConfig {
512 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
516 }),
517 ..Default::default()
518 };
519 let store = MockStore::default();
520 let setup = TestSetup::new(config, store, 0).await;
521
522 for i in 0..3 {
524 setup
525 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
526 .await
527 .unwrap();
528 }
529
530 for i in 0..3 {
532 let data = setup
533 .store
534 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
535 .await;
536 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
537 }
538
539 for i in 3..6 {
541 setup
542 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
543 .await
544 .unwrap();
545 }
546
547 for i in 0..6 {
550 let data = setup
551 .store
552 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
553 .await;
554 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
555 }
556
557 let pruning_deadline = Duration::from_secs(15);
561 let start = tokio::time::Instant::now();
562 loop {
563 let pruned = {
564 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
565 !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
566 };
567 if pruned {
568 break;
569 }
570 assert!(
571 start.elapsed() < pruning_deadline,
572 "Timed out waiting for pruning to occur"
573 );
574 tokio::time::sleep(Duration::from_millis(100)).await;
575 }
576
577 {
579 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
580 assert!(data.contains_key(&3));
581 assert!(data.contains_key(&4));
582 assert!(data.contains_key(&5));
583 };
584 }
585
586 #[tokio::test]
587 async fn test_e2e_pipeline_without_pruning() {
588 let config = ConcurrentConfig {
589 pruner: None,
590 ..Default::default()
591 };
592 let store = MockStore::default();
593 let setup = TestSetup::new(config, store, 0).await;
594
595 for i in 0..10 {
597 setup
598 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
599 .await
600 .unwrap();
601 }
602
603 let watermark = setup
605 .store
606 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
607 .await;
608
609 for i in 0..10 {
611 let data = setup
612 .store
613 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
614 .await;
615 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
616 }
617
618 assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
620 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
625 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
626 data.len()
627 };
628 assert_eq!(total_checkpoints, 10);
629 }
630
631 #[tokio::test]
632 async fn test_out_of_order_processing() {
633 let config = ConcurrentConfig::default();
634 let store = MockStore::default();
635 let setup = TestSetup::new(config, store, 0).await;
636
637 let checkpoints = vec![2, 0, 4, 1, 3];
639 for cp in checkpoints {
640 setup
641 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
642 .await
643 .unwrap();
644 }
645
646 setup
648 .store
649 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
650 .await;
651
652 for i in 0..5 {
654 let data = setup
655 .store
656 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
657 .await;
658 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
659 }
660 }
661
662 #[tokio::test]
663 async fn test_watermark_progression_with_gaps() {
664 let config = ConcurrentConfig::default();
665 let store = MockStore::default();
666 let setup = TestSetup::new(config, store, 0).await;
667
668 for cp in [0, 1, 3, 4] {
670 setup
671 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
672 .await
673 .unwrap();
674 }
675
676 tokio::time::sleep(Duration::from_secs(1)).await;
678
679 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
681 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
682
683 setup
685 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
686 .await
687 .unwrap();
688
689 let watermark = setup
691 .store
692 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
693 .await;
694 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
695 }
696
697 #[tokio::test]
700 async fn test_back_pressure_collector_max_pending_rows() {
701 let config = ConcurrentConfig {
714 committer: CommitterConfig {
715 collect_interval_ms: 5_000, write_concurrency: 1,
717 ..Default::default()
718 },
719 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
720 processor_channel_size: Some(7),
721 collector_channel_size: Some(6),
722 ..Default::default()
723 };
724 let store = MockStore::default();
725 let setup = TestSetup::new(config, store, 0).await;
726
727 tokio::time::sleep(Duration::from_millis(200)).await;
729
730 for i in 0..14 {
743 setup
744 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
745 .await
746 .unwrap();
747 }
748
749 setup
751 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
752 .await;
753
754 setup
756 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
757 .await
758 .unwrap();
759
760 let data = setup
762 .store
763 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
764 .await;
765 assert_eq!(data, vec![1, 2]);
766 }
767
768 #[tokio::test]
769 async fn test_back_pressure_committer_slow_commits() {
770 let config = ConcurrentConfig {
783 committer: CommitterConfig {
784 write_concurrency: 1, collect_interval_ms: 10,
789 ..Default::default()
790 },
791 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
792 processor_channel_size: Some(7),
793 collector_channel_size: Some(6),
794 ..Default::default()
795 };
796 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
798
799 for i in 0..19 {
813 setup
814 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
815 .await
816 .unwrap();
817 }
818
819 let mut back_pressure_checkpoint = None;
824 for checkpoint in 19..22 {
825 if setup
826 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
827 .await
828 .is_err()
829 {
830 back_pressure_checkpoint = Some(checkpoint);
831 break;
832 }
833 }
834 assert!(
835 back_pressure_checkpoint.is_some(),
836 "Back pressure should occur between checkpoints 19-21"
837 );
838
839 setup
841 .store
842 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
843 .await;
844
845 setup
847 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
848 .await
849 .unwrap();
850 }
851
852 #[tokio::test]
855 async fn test_commit_failure_retry() {
856 let config = ConcurrentConfig::default();
857 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
859
860 setup
862 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
863 .await
864 .unwrap();
865
866 setup
868 .store
869 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
870 .await;
871
872 let data = setup
874 .store
875 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
876 .await;
877 assert_eq!(data, vec![1, 2]);
878 }
879
880 #[tokio::test]
881 async fn test_prune_failure_retry() {
882 let config = ConcurrentConfig {
883 pruner: Some(PrunerConfig {
884 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
888 }),
889 ..Default::default()
890 };
891
892 let store = MockStore::default().with_prune_failures(0, 2, 1);
894 let setup = TestSetup::new(config, store, 0).await;
895
896 for i in 0..4 {
898 setup
899 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
900 .await
901 .unwrap();
902 }
903
904 for i in 0..4 {
907 let data = setup
908 .store
909 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
910 .await;
911 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
912 }
913
914 setup
916 .store
917 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
918 .await;
919 {
920 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
921 for i in 0..4 {
922 assert!(data.contains_key(&i));
923 }
924 };
925
926 setup
928 .store
929 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
930 .await;
931 {
932 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
933 assert!(data.contains_key(&2));
935 assert!(data.contains_key(&3));
936
937 assert!(!data.contains_key(&0));
939 assert!(!data.contains_key(&1));
940 };
941 }
942
943 #[tokio::test]
944 async fn test_reader_watermark_failure_retry() {
945 let config = ConcurrentConfig {
946 pruner: Some(PrunerConfig {
947 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
951 }),
952 ..Default::default()
953 };
954
955 let store = MockStore::default().with_reader_watermark_failures(2);
957 let setup = TestSetup::new(config, store, 0).await;
958
959 for i in 0..6 {
961 setup
962 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
963 .await
964 .unwrap();
965 }
966
967 setup
969 .store
970 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
971 .await;
972
973 tokio::time::sleep(Duration::from_secs(2)).await;
975
976 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
978 assert_eq!(watermark.reader_lo, 3);
979 }
980
981 #[tokio::test]
982 async fn test_database_connection_failure_retry() {
983 let config = ConcurrentConfig::default();
984 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
986
987 setup
989 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
990 .await
991 .unwrap();
992
993 setup
995 .store
996 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
997 .await;
998
999 let data = setup
1001 .store
1002 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1003 .await;
1004 assert_eq!(data, vec![1, 2]);
1005 }
1006}