1use std::{sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_util::sync::CancellationToken;
10use tracing::info;
11
12use crate::{
13 FieldCount, metrics::IndexerMetrics, store::Store,
14 types::full_checkpoint_content::CheckpointData,
15};
16
17use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
18
19use self::{
20 collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
21 reader_watermark::reader_watermark,
22};
23
24mod collector;
25mod commit_watermark;
26mod committer;
27mod pruner;
28mod reader_watermark;
29
30#[async_trait]
50pub trait Handler: Processor<Value: FieldCount> {
51 type Store: Store;
52
53 const MIN_EAGER_ROWS: usize = 50;
55
56 const MAX_PENDING_ROWS: usize = 5000;
58
59 const MAX_WATERMARK_UPDATES: usize = 10_000;
63
64 async fn commit<'a>(
67 values: &[Self::Value],
68 conn: &mut <Self::Store as Store>::Connection<'a>,
69 ) -> anyhow::Result<usize>;
70
71 async fn prune<'a>(
74 &self,
75 _from: u64,
76 _to_exclusive: u64,
77 _conn: &mut <Self::Store as Store>::Connection<'a>,
78 ) -> anyhow::Result<usize> {
79 Ok(0)
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Default)]
85pub struct ConcurrentConfig {
86 pub committer: CommitterConfig,
88
89 pub pruner: Option<PrunerConfig>,
91}
92
93#[derive(Serialize, Deserialize, Debug, Clone)]
94pub struct PrunerConfig {
95 pub interval_ms: u64,
97
98 pub delay_ms: u64,
101
102 pub retention: u64,
104
105 pub max_chunk_size: u64,
107
108 pub prune_concurrency: u64,
110}
111
112struct BatchedRows<H: Handler> {
118 values: Vec<H::Value>,
120 watermark: Vec<WatermarkPart>,
122}
123
124impl PrunerConfig {
125 pub fn interval(&self) -> Duration {
126 Duration::from_millis(self.interval_ms)
127 }
128
129 pub fn delay(&self) -> Duration {
130 Duration::from_millis(self.delay_ms)
131 }
132}
133
134impl<H: Handler> BatchedRows<H> {
135 fn new() -> Self {
136 Self {
137 values: vec![],
138 watermark: vec![],
139 }
140 }
141
142 fn len(&self) -> usize {
144 self.values.len()
145 }
146
147 fn is_full(&self) -> bool {
150 self.values.len() >= max_chunk_rows::<H>()
151 || self.watermark.len() >= H::MAX_WATERMARK_UPDATES
152 }
153}
154
155impl Default for PrunerConfig {
156 fn default() -> Self {
157 Self {
158 interval_ms: 300_000,
159 delay_ms: 120_000,
160 retention: 4_000_000,
161 max_chunk_size: 2_000,
162 prune_concurrency: 1,
163 }
164 }
165}
166
167pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
188 handler: H,
189 next_checkpoint: u64,
190 config: ConcurrentConfig,
191 skip_watermark: bool,
192 store: H::Store,
193 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
194 metrics: Arc<IndexerMetrics>,
195 cancel: CancellationToken,
196) -> JoinHandle<()> {
197 info!(
198 pipeline = H::NAME,
199 "Starting pipeline with config: {:?}", config
200 );
201 let ConcurrentConfig {
202 committer: committer_config,
203 pruner: pruner_config,
204 } = config;
205
206 let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
207 let (collector_tx, committer_rx) =
209 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
210 let (committer_tx, watermark_rx) =
212 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
213
214 let pruner_cancel = cancel.child_token();
219 let handler = Arc::new(handler);
220
221 let processor = processor(
222 handler.clone(),
223 checkpoint_rx,
224 processor_tx,
225 metrics.clone(),
226 cancel.clone(),
227 );
228
229 let collector = collector::<H>(
230 committer_config.clone(),
231 collector_rx,
232 collector_tx,
233 metrics.clone(),
234 cancel.clone(),
235 );
236
237 let committer = committer::<H>(
238 committer_config.clone(),
239 skip_watermark,
240 committer_rx,
241 committer_tx,
242 store.clone(),
243 metrics.clone(),
244 cancel.clone(),
245 );
246
247 let commit_watermark = commit_watermark::<H>(
248 next_checkpoint,
249 committer_config,
250 skip_watermark,
251 watermark_rx,
252 store.clone(),
253 metrics.clone(),
254 cancel,
255 );
256
257 let reader_watermark = reader_watermark::<H>(
258 pruner_config.clone(),
259 store.clone(),
260 metrics.clone(),
261 pruner_cancel.clone(),
262 );
263
264 let pruner = pruner(
265 handler,
266 pruner_config,
267 store,
268 metrics,
269 pruner_cancel.clone(),
270 );
271
272 tokio::spawn(async move {
273 let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
274
275 pruner_cancel.cancel();
276 let _ = futures::join!(reader_watermark, pruner);
277 })
278}
279
280const fn max_chunk_rows<H: Handler>() -> usize {
281 if H::Value::FIELD_COUNT == 0 {
282 i16::MAX as usize
283 } else {
284 i16::MAX as usize / H::Value::FIELD_COUNT
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::{sync::Arc, time::Duration};
291
292 use prometheus::Registry;
293 use tokio::{sync::mpsc, time::timeout};
294 use tokio_util::sync::CancellationToken;
295
296 use crate::{
297 FieldCount,
298 metrics::IndexerMetrics,
299 mocks::store::{MockConnection, MockStore},
300 pipeline::Processor,
301 types::{
302 full_checkpoint_content::CheckpointData,
303 test_checkpoint_data_builder::TestCheckpointDataBuilder,
304 },
305 };
306
307 use super::*;
308
309 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
310 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
313 struct TestValue {
314 checkpoint: u64,
315 data: u64,
316 }
317
318 struct DataPipeline;
319
320 #[async_trait]
321 impl Processor for DataPipeline {
322 const NAME: &'static str = "test_handler";
323 const FANOUT: usize = 2;
324 type Value = TestValue;
325
326 async fn process(
327 &self,
328 checkpoint: &Arc<CheckpointData>,
329 ) -> anyhow::Result<Vec<Self::Value>> {
330 let cp_num = checkpoint.checkpoint_summary.sequence_number;
331
332 Ok(vec![
334 TestValue {
335 checkpoint: cp_num,
336 data: cp_num * 10 + 1,
337 },
338 TestValue {
339 checkpoint: cp_num,
340 data: cp_num * 10 + 2,
341 },
342 ])
343 }
344 }
345
346 #[async_trait]
347 impl Handler for DataPipeline {
348 type Store = MockStore;
349 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; async fn commit<'a>(
354 values: &[Self::Value],
355 conn: &mut MockConnection<'a>,
356 ) -> anyhow::Result<usize> {
357 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
359 std::collections::HashMap::new();
360 for value in values {
361 grouped
362 .entry(value.checkpoint)
363 .or_default()
364 .push(value.data);
365 }
366
367 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
369 }
370
371 async fn prune<'a>(
372 &self,
373 from: u64,
374 to_exclusive: u64,
375 conn: &mut MockConnection<'a>,
376 ) -> anyhow::Result<usize> {
377 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
378 }
379 }
380
381 struct TestSetup {
382 store: MockStore,
383 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
384 pipeline_handle: JoinHandle<()>,
385 cancel: CancellationToken,
386 }
387
388 impl TestSetup {
389 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
390 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
391 let metrics = IndexerMetrics::new(None, &Registry::default());
392 let cancel = CancellationToken::new();
393
394 let skip_watermark = false;
395 let pipeline_handle = pipeline(
396 DataPipeline,
397 next_checkpoint,
398 config,
399 skip_watermark,
400 store.clone(),
401 checkpoint_rx,
402 metrics,
403 cancel.clone(),
404 );
405
406 Self {
407 store,
408 checkpoint_tx,
409 pipeline_handle,
410 cancel,
411 }
412 }
413
414 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
415 let checkpoint = Arc::new(
416 TestCheckpointDataBuilder::new(checkpoint)
417 .with_epoch(1)
418 .with_network_total_transactions(checkpoint * 2)
419 .with_timestamp_ms(1000000000 + checkpoint * 1000)
420 .build_checkpoint(),
421 );
422 self.checkpoint_tx.send(checkpoint).await?;
423 Ok(())
424 }
425
426 async fn shutdown(self) {
427 drop(self.checkpoint_tx);
428 self.cancel.cancel();
429 let _ = self.pipeline_handle.await;
430 }
431
432 async fn send_checkpoint_with_timeout(
433 &self,
434 checkpoint: u64,
435 timeout_duration: Duration,
436 ) -> anyhow::Result<()> {
437 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
438 }
439
440 async fn send_checkpoint_expect_timeout(
441 &self,
442 checkpoint: u64,
443 timeout_duration: Duration,
444 ) {
445 timeout(timeout_duration, self.send_checkpoint(checkpoint))
446 .await
447 .unwrap_err(); }
449 }
450
451 #[tokio::test]
452 async fn test_e2e_pipeline() {
453 let config = ConcurrentConfig {
454 pruner: Some(PrunerConfig {
455 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
459 }),
460 ..Default::default()
461 };
462 let store = MockStore::default();
463 let setup = TestSetup::new(config, store, 0).await;
464
465 for i in 0..3 {
467 setup
468 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
469 .await
470 .unwrap();
471 }
472
473 for i in 0..3 {
475 let data = setup
476 .store
477 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
478 .await;
479 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
480 }
481
482 for i in 3..6 {
484 setup
485 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
486 .await
487 .unwrap();
488 }
489
490 for i in 0..6 {
493 let data = setup
494 .store
495 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
496 .await;
497 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
498 }
499
500 tokio::time::sleep(Duration::from_millis(5_200)).await;
502
503 {
505 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
506
507 assert!(data.contains_key(&3));
509 assert!(data.contains_key(&4));
510 assert!(data.contains_key(&5));
511
512 assert!(!data.contains_key(&0));
514 assert!(!data.contains_key(&1));
515 assert!(!data.contains_key(&2));
516 };
517
518 setup.shutdown().await;
519 }
520
521 #[tokio::test]
522 async fn test_e2e_pipeline_without_pruning() {
523 let config = ConcurrentConfig {
524 pruner: None,
525 ..Default::default()
526 };
527 let store = MockStore::default();
528 let setup = TestSetup::new(config, store, 0).await;
529
530 for i in 0..10 {
532 setup
533 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
534 .await
535 .unwrap();
536 }
537
538 let watermark = setup
540 .store
541 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
542 .await;
543
544 for i in 0..10 {
546 let data = setup
547 .store
548 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
549 .await;
550 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
551 }
552
553 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
555 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
560 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
561 data.len()
562 };
563 assert_eq!(total_checkpoints, 10);
564
565 setup.shutdown().await;
566 }
567
568 #[tokio::test]
569 async fn test_out_of_order_processing() {
570 let config = ConcurrentConfig::default();
571 let store = MockStore::default();
572 let setup = TestSetup::new(config, store, 0).await;
573
574 let checkpoints = vec![2, 0, 4, 1, 3];
576 for cp in checkpoints {
577 setup
578 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
579 .await
580 .unwrap();
581 }
582
583 setup
585 .store
586 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
587 .await;
588
589 for i in 0..5 {
591 let data = setup
592 .store
593 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
594 .await;
595 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
596 }
597
598 setup.shutdown().await;
599 }
600
601 #[tokio::test]
602 async fn test_watermark_progression_with_gaps() {
603 let config = ConcurrentConfig::default();
604 let store = MockStore::default();
605 let setup = TestSetup::new(config, store, 0).await;
606
607 for cp in [0, 1, 3, 4] {
609 setup
610 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
611 .await
612 .unwrap();
613 }
614
615 tokio::time::sleep(Duration::from_secs(1)).await;
617
618 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
620 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
621
622 setup
624 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
625 .await
626 .unwrap();
627
628 let watermark = setup
630 .store
631 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
632 .await;
633 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
634
635 setup.shutdown().await;
636 }
637
638 #[tokio::test]
641 async fn test_back_pressure_collector_max_pending_rows() {
642 let config = ConcurrentConfig {
655 committer: CommitterConfig {
656 collect_interval_ms: 5_000, write_concurrency: 1,
658 ..Default::default()
659 },
660 ..Default::default()
661 };
662 let store = MockStore::default();
663 let setup = TestSetup::new(config, store, 0).await;
664
665 tokio::time::sleep(Duration::from_millis(200)).await;
667
668 for i in 0..14 {
681 setup
682 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
683 .await
684 .unwrap();
685 }
686
687 setup
689 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
690 .await;
691
692 setup
694 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
695 .await
696 .unwrap();
697
698 let data = setup
700 .store
701 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
702 .await;
703 assert_eq!(data, vec![1, 2]);
704
705 setup.shutdown().await;
706 }
707
708 #[tokio::test]
709 async fn test_back_pressure_committer_slow_commits() {
710 let config = ConcurrentConfig {
723 committer: CommitterConfig {
724 write_concurrency: 1, ..Default::default()
726 },
727 ..Default::default()
728 };
729 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
731
732 for i in 0..19 {
746 setup
747 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
748 .await
749 .unwrap();
750 }
751
752 let mut back_pressure_checkpoint = None;
757 for checkpoint in 19..22 {
758 if setup
759 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
760 .await
761 .is_err()
762 {
763 back_pressure_checkpoint = Some(checkpoint);
764 break;
765 }
766 }
767 assert!(
768 back_pressure_checkpoint.is_some(),
769 "Back pressure should occur between checkpoints 19-21"
770 );
771
772 setup
774 .store
775 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
776 .await;
777
778 setup
780 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
781 .await
782 .unwrap();
783
784 setup.shutdown().await;
785 }
786
787 #[tokio::test]
790 async fn test_commit_failure_retry() {
791 let config = ConcurrentConfig::default();
792 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
794
795 setup
797 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
798 .await
799 .unwrap();
800
801 setup
803 .store
804 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
805 .await;
806
807 let data = setup
809 .store
810 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
811 .await;
812 assert_eq!(data, vec![1, 2]);
813
814 setup.shutdown().await;
815 }
816
817 #[tokio::test]
818 async fn test_prune_failure_retry() {
819 let config = ConcurrentConfig {
820 pruner: Some(PrunerConfig {
821 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
825 }),
826 ..Default::default()
827 };
828
829 let store = MockStore::default().with_prune_failures(0, 2, 1);
831 let setup = TestSetup::new(config, store, 0).await;
832
833 for i in 0..4 {
835 setup
836 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
837 .await
838 .unwrap();
839 }
840
841 for i in 0..4 {
844 let data = setup
845 .store
846 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
847 .await;
848 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
849 }
850
851 setup
853 .store
854 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
855 .await;
856 {
857 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
858 for i in 0..4 {
859 assert!(data.contains_key(&i));
860 }
861 };
862
863 setup
865 .store
866 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
867 .await;
868 {
869 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
870 assert!(data.contains_key(&2));
872 assert!(data.contains_key(&3));
873
874 assert!(!data.contains_key(&0));
876 assert!(!data.contains_key(&1));
877 };
878
879 setup.shutdown().await;
880 }
881
882 #[tokio::test]
883 async fn test_reader_watermark_failure_retry() {
884 let config = ConcurrentConfig {
885 pruner: Some(PrunerConfig {
886 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
890 }),
891 ..Default::default()
892 };
893
894 let store = MockStore::default().with_reader_watermark_failures(2);
896 let setup = TestSetup::new(config, store, 0).await;
897
898 for i in 0..6 {
900 setup
901 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
902 .await
903 .unwrap();
904 }
905
906 setup
908 .store
909 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
910 .await;
911
912 tokio::time::sleep(Duration::from_secs(2)).await;
914
915 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
917 assert_eq!(watermark.reader_lo, 3);
918
919 setup.shutdown().await;
920 }
921
922 #[tokio::test]
923 async fn test_database_connection_failure_retry() {
924 let config = ConcurrentConfig::default();
925 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
927
928 setup
930 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
931 .await
932 .unwrap();
933
934 setup
936 .store
937 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
938 .await;
939
940 let data = setup
942 .store
943 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
944 .await;
945 assert_eq!(data, vec![1, 2]);
946
947 setup.shutdown().await;
948 }
949}