1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::metrics::IndexerMetrics;
18use crate::pipeline::CommitterConfig;
19use crate::pipeline::Processor;
20use crate::pipeline::WatermarkPart;
21use crate::pipeline::concurrent::collector::collector;
22use crate::pipeline::concurrent::commit_watermark::commit_watermark;
23use crate::pipeline::concurrent::committer::committer;
24use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
25use crate::pipeline::concurrent::pruner::pruner;
26use crate::pipeline::concurrent::reader_watermark::reader_watermark;
27use crate::pipeline::processor::processor;
28use crate::store::Store;
29use crate::types::full_checkpoint_content::Checkpoint;
30
31mod collector;
32mod commit_watermark;
33mod committer;
34mod main_reader_lo;
35mod pruner;
36mod reader_watermark;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum BatchStatus {
41 Pending,
43 Ready,
45}
46
47#[async_trait]
67pub trait Handler: Processor {
68 type Store: Store;
69 type Batch: Default + Send + Sync + 'static;
70
71 const MIN_EAGER_ROWS: usize = 50;
73
74 const MAX_PENDING_ROWS: usize = 5000;
76
77 const MAX_WATERMARK_UPDATES: usize = 10_000;
81
82 fn batch(
91 &self,
92 batch: &mut Self::Batch,
93 values: &mut std::vec::IntoIter<Self::Value>,
94 ) -> BatchStatus;
95
96 async fn commit<'a>(
98 &self,
99 batch: &Self::Batch,
100 conn: &mut <Self::Store as Store>::Connection<'a>,
101 ) -> anyhow::Result<usize>;
102
103 async fn prune<'a>(
106 &self,
107 _from: u64,
108 _to_exclusive: u64,
109 _conn: &mut <Self::Store as Store>::Connection<'a>,
110 ) -> anyhow::Result<usize> {
111 Ok(0)
112 }
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, Default)]
117pub struct ConcurrentConfig {
118 pub committer: CommitterConfig,
120
121 pub pruner: Option<PrunerConfig>,
123
124 pub fanout: Option<ConcurrencyConfig>,
126
127 pub min_eager_rows: Option<usize>,
129
130 pub max_pending_rows: Option<usize>,
132
133 pub max_watermark_updates: Option<usize>,
135
136 pub processor_channel_size: Option<usize>,
138
139 pub collector_channel_size: Option<usize>,
141
142 pub committer_channel_size: Option<usize>,
144}
145
146#[derive(Serialize, Deserialize, Debug, Clone)]
147pub struct PrunerConfig {
148 pub interval_ms: u64,
150
151 pub delay_ms: u64,
154
155 pub retention: u64,
157
158 pub max_chunk_size: u64,
160
161 pub prune_concurrency: u64,
163}
164
165struct BatchedRows<H: Handler> {
171 batch: H::Batch,
173 batch_len: usize,
175 watermark: Vec<WatermarkPart>,
177}
178
179impl<H, V> BatchedRows<H>
180where
181 H: Handler<Batch = Vec<V>, Value = V>,
182{
183 #[cfg(test)]
184 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
185 let batch_len = batch.len();
186 Self {
187 batch,
188 batch_len,
189 watermark,
190 }
191 }
192}
193
194impl PrunerConfig {
195 pub fn interval(&self) -> Duration {
196 Duration::from_millis(self.interval_ms)
197 }
198
199 pub fn delay(&self) -> Duration {
200 Duration::from_millis(self.delay_ms)
201 }
202}
203
204impl Default for PrunerConfig {
205 fn default() -> Self {
206 Self {
207 interval_ms: 300_000,
208 delay_ms: 120_000,
209 retention: 4_000_000,
210 max_chunk_size: 2_000,
211 prune_concurrency: 1,
212 }
213 }
214}
215
216pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
236 handler: H,
237 next_checkpoint: u64,
238 config: ConcurrentConfig,
239 store: H::Store,
240 task: Option<Task>,
241 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
242 metrics: Arc<IndexerMetrics>,
243) -> Service {
244 info!(
245 pipeline = H::NAME,
246 "Starting pipeline with config: {config:#?}",
247 );
248
249 let ConcurrentConfig {
250 committer: committer_config,
251 pruner: pruner_config,
252 fanout,
253 min_eager_rows,
254 max_pending_rows,
255 max_watermark_updates,
256 processor_channel_size,
257 collector_channel_size,
258 committer_channel_size,
259 } = config;
260
261 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
262 initial: 1,
263 min: 1,
264 max: num_cpus::get(),
265 dead_band: None,
266 });
267 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
268 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
269 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
270
271 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
272 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
273
274 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
275 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
277 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
279 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
280 let main_reader_lo = Arc::new(SetOnce::new());
281
282 let handler = Arc::new(handler);
283
284 let s_processor = processor(
285 handler.clone(),
286 checkpoint_rx,
287 processor_tx,
288 metrics.clone(),
289 concurrency,
290 );
291
292 let s_collector = collector::<H>(
293 handler.clone(),
294 committer_config.clone(),
295 collector_rx,
296 collector_tx,
297 main_reader_lo.clone(),
298 metrics.clone(),
299 min_eager_rows,
300 max_pending_rows,
301 max_watermark_updates,
302 );
303
304 let s_committer = committer::<H>(
305 handler.clone(),
306 committer_config.clone(),
307 committer_rx,
308 committer_tx,
309 store.clone(),
310 metrics.clone(),
311 );
312
313 let s_commit_watermark = commit_watermark::<H>(
314 next_checkpoint,
315 committer_config,
316 watermark_rx,
317 store.clone(),
318 task.as_ref().map(|t| t.task.clone()),
319 metrics.clone(),
320 );
321
322 let s_track_reader_lo = track_main_reader_lo::<H>(
323 main_reader_lo.clone(),
324 task.as_ref().map(|t| t.reader_interval),
325 store.clone(),
326 );
327
328 let s_reader_watermark =
329 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
330
331 let s_pruner = pruner(handler, pruner_config, store, metrics);
332
333 s_processor
334 .merge(s_collector)
335 .merge(s_committer)
336 .merge(s_commit_watermark)
337 .attach(s_track_reader_lo)
338 .attach(s_reader_watermark)
339 .attach(s_pruner)
340}
341
342#[cfg(test)]
343mod tests {
344 use std::sync::Arc;
345 use std::time::Duration;
346
347 use prometheus::Registry;
348 use tokio::sync::mpsc;
349 use tokio::time::timeout;
350
351 use crate::FieldCount;
352 use crate::metrics::IndexerMetrics;
353 use crate::mocks::store::MockConnection;
354 use crate::mocks::store::MockStore;
355 use crate::pipeline::Processor;
356 use crate::types::full_checkpoint_content::Checkpoint;
357 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
358
359 use super::*;
360
361 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
362 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
365 struct TestValue {
366 checkpoint: u64,
367 data: u64,
368 }
369
370 struct DataPipeline;
371
372 #[async_trait]
373 impl Processor for DataPipeline {
374 const NAME: &'static str = "test_handler";
375 type Value = TestValue;
376
377 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
378 let cp_num = checkpoint.summary.sequence_number;
379
380 Ok(vec![
382 TestValue {
383 checkpoint: cp_num,
384 data: cp_num * 10 + 1,
385 },
386 TestValue {
387 checkpoint: cp_num,
388 data: cp_num * 10 + 2,
389 },
390 ])
391 }
392 }
393
394 #[async_trait]
395 impl Handler for DataPipeline {
396 type Store = MockStore;
397 type Batch = Vec<TestValue>;
398
399 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
404 &self,
405 batch: &mut Self::Batch,
406 values: &mut std::vec::IntoIter<Self::Value>,
407 ) -> BatchStatus {
408 batch.extend(values);
410 BatchStatus::Pending
411 }
412
413 async fn commit<'a>(
414 &self,
415 batch: &Self::Batch,
416 conn: &mut MockConnection<'a>,
417 ) -> anyhow::Result<usize> {
418 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
420 std::collections::HashMap::new();
421 for value in batch {
422 grouped
423 .entry(value.checkpoint)
424 .or_default()
425 .push(value.data);
426 }
427
428 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
430 }
431
432 async fn prune<'a>(
433 &self,
434 from: u64,
435 to_exclusive: u64,
436 conn: &mut MockConnection<'a>,
437 ) -> anyhow::Result<usize> {
438 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
439 }
440 }
441
442 struct TestSetup {
443 store: MockStore,
444 checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
445 #[allow(unused)]
446 pipeline: Service,
447 }
448
449 impl TestSetup {
450 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
451 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
452 let metrics = IndexerMetrics::new(None, &Registry::default());
453
454 let pipeline = pipeline(
455 DataPipeline,
456 next_checkpoint,
457 config,
458 store.clone(),
459 None,
460 checkpoint_rx,
461 metrics,
462 );
463
464 Self {
465 store,
466 checkpoint_tx,
467 pipeline,
468 }
469 }
470
471 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
472 let checkpoint = Arc::new(
473 TestCheckpointBuilder::new(checkpoint)
474 .with_epoch(1)
475 .with_network_total_transactions(checkpoint * 2)
476 .with_timestamp_ms(1000000000 + checkpoint * 1000)
477 .build_checkpoint(),
478 );
479 self.checkpoint_tx.send(checkpoint).await?;
480 Ok(())
481 }
482
483 async fn send_checkpoint_with_timeout(
484 &self,
485 checkpoint: u64,
486 timeout_duration: Duration,
487 ) -> anyhow::Result<()> {
488 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
489 }
490
491 async fn send_checkpoint_expect_timeout(
492 &self,
493 checkpoint: u64,
494 timeout_duration: Duration,
495 ) {
496 timeout(timeout_duration, self.send_checkpoint(checkpoint))
497 .await
498 .unwrap_err(); }
500 }
501
502 #[tokio::test]
503 async fn test_e2e_pipeline() {
504 let config = ConcurrentConfig {
505 pruner: Some(PrunerConfig {
506 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
510 }),
511 ..Default::default()
512 };
513 let store = MockStore::default();
514 let setup = TestSetup::new(config, store, 0).await;
515
516 for i in 0..3 {
518 setup
519 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
520 .await
521 .unwrap();
522 }
523
524 for i in 0..3 {
526 let data = setup
527 .store
528 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
529 .await;
530 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
531 }
532
533 for i in 3..6 {
535 setup
536 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
537 .await
538 .unwrap();
539 }
540
541 for i in 0..6 {
544 let data = setup
545 .store
546 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
547 .await;
548 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
549 }
550
551 tokio::time::sleep(Duration::from_millis(5_200)).await;
553
554 {
556 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
557
558 assert!(data.contains_key(&3));
560 assert!(data.contains_key(&4));
561 assert!(data.contains_key(&5));
562
563 assert!(!data.contains_key(&0));
565 assert!(!data.contains_key(&1));
566 assert!(!data.contains_key(&2));
567 };
568 }
569
570 #[tokio::test]
571 async fn test_e2e_pipeline_without_pruning() {
572 let config = ConcurrentConfig {
573 pruner: None,
574 ..Default::default()
575 };
576 let store = MockStore::default();
577 let setup = TestSetup::new(config, store, 0).await;
578
579 for i in 0..10 {
581 setup
582 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
583 .await
584 .unwrap();
585 }
586
587 let watermark = setup
589 .store
590 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
591 .await;
592
593 for i in 0..10 {
595 let data = setup
596 .store
597 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
598 .await;
599 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
600 }
601
602 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
604 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
609 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
610 data.len()
611 };
612 assert_eq!(total_checkpoints, 10);
613 }
614
615 #[tokio::test]
616 async fn test_out_of_order_processing() {
617 let config = ConcurrentConfig::default();
618 let store = MockStore::default();
619 let setup = TestSetup::new(config, store, 0).await;
620
621 let checkpoints = vec![2, 0, 4, 1, 3];
623 for cp in checkpoints {
624 setup
625 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
626 .await
627 .unwrap();
628 }
629
630 setup
632 .store
633 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
634 .await;
635
636 for i in 0..5 {
638 let data = setup
639 .store
640 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
641 .await;
642 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
643 }
644 }
645
646 #[tokio::test]
647 async fn test_watermark_progression_with_gaps() {
648 let config = ConcurrentConfig::default();
649 let store = MockStore::default();
650 let setup = TestSetup::new(config, store, 0).await;
651
652 for cp in [0, 1, 3, 4] {
654 setup
655 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
656 .await
657 .unwrap();
658 }
659
660 tokio::time::sleep(Duration::from_secs(1)).await;
662
663 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
665 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
666
667 setup
669 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
670 .await
671 .unwrap();
672
673 let watermark = setup
675 .store
676 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
677 .await;
678 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
679 }
680
681 #[tokio::test]
684 async fn test_back_pressure_collector_max_pending_rows() {
685 let config = ConcurrentConfig {
698 committer: CommitterConfig {
699 collect_interval_ms: 5_000, write_concurrency: 1,
701 ..Default::default()
702 },
703 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
704 processor_channel_size: Some(7),
705 collector_channel_size: Some(6),
706 ..Default::default()
707 };
708 let store = MockStore::default();
709 let setup = TestSetup::new(config, store, 0).await;
710
711 tokio::time::sleep(Duration::from_millis(200)).await;
713
714 for i in 0..14 {
727 setup
728 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
729 .await
730 .unwrap();
731 }
732
733 setup
735 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
736 .await;
737
738 setup
740 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
741 .await
742 .unwrap();
743
744 let data = setup
746 .store
747 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
748 .await;
749 assert_eq!(data, vec![1, 2]);
750 }
751
752 #[tokio::test]
753 async fn test_back_pressure_committer_slow_commits() {
754 let config = ConcurrentConfig {
767 committer: CommitterConfig {
768 write_concurrency: 1, collect_interval_ms: 10,
773 ..Default::default()
774 },
775 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
776 processor_channel_size: Some(7),
777 collector_channel_size: Some(6),
778 ..Default::default()
779 };
780 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
782
783 for i in 0..19 {
797 setup
798 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
799 .await
800 .unwrap();
801 }
802
803 let mut back_pressure_checkpoint = None;
808 for checkpoint in 19..22 {
809 if setup
810 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
811 .await
812 .is_err()
813 {
814 back_pressure_checkpoint = Some(checkpoint);
815 break;
816 }
817 }
818 assert!(
819 back_pressure_checkpoint.is_some(),
820 "Back pressure should occur between checkpoints 19-21"
821 );
822
823 setup
825 .store
826 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
827 .await;
828
829 setup
831 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
832 .await
833 .unwrap();
834 }
835
836 #[tokio::test]
839 async fn test_commit_failure_retry() {
840 let config = ConcurrentConfig::default();
841 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
843
844 setup
846 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
847 .await
848 .unwrap();
849
850 setup
852 .store
853 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
854 .await;
855
856 let data = setup
858 .store
859 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
860 .await;
861 assert_eq!(data, vec![1, 2]);
862 }
863
864 #[tokio::test]
865 async fn test_prune_failure_retry() {
866 let config = ConcurrentConfig {
867 pruner: Some(PrunerConfig {
868 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
872 }),
873 ..Default::default()
874 };
875
876 let store = MockStore::default().with_prune_failures(0, 2, 1);
878 let setup = TestSetup::new(config, store, 0).await;
879
880 for i in 0..4 {
882 setup
883 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
884 .await
885 .unwrap();
886 }
887
888 for i in 0..4 {
891 let data = setup
892 .store
893 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
894 .await;
895 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
896 }
897
898 setup
900 .store
901 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
902 .await;
903 {
904 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
905 for i in 0..4 {
906 assert!(data.contains_key(&i));
907 }
908 };
909
910 setup
912 .store
913 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
914 .await;
915 {
916 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
917 assert!(data.contains_key(&2));
919 assert!(data.contains_key(&3));
920
921 assert!(!data.contains_key(&0));
923 assert!(!data.contains_key(&1));
924 };
925 }
926
927 #[tokio::test]
928 async fn test_reader_watermark_failure_retry() {
929 let config = ConcurrentConfig {
930 pruner: Some(PrunerConfig {
931 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
935 }),
936 ..Default::default()
937 };
938
939 let store = MockStore::default().with_reader_watermark_failures(2);
941 let setup = TestSetup::new(config, store, 0).await;
942
943 for i in 0..6 {
945 setup
946 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
947 .await
948 .unwrap();
949 }
950
951 setup
953 .store
954 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
955 .await;
956
957 tokio::time::sleep(Duration::from_secs(2)).await;
959
960 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
962 assert_eq!(watermark.reader_lo, 3);
963 }
964
965 #[tokio::test]
966 async fn test_database_connection_failure_retry() {
967 let config = ConcurrentConfig::default();
968 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
970
971 setup
973 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
974 .await
975 .unwrap();
976
977 setup
979 .store
980 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
981 .await;
982
983 let data = setup
985 .store
986 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
987 .await;
988 assert_eq!(data, vec![1, 2]);
989 }
990}