1use std::{
5 sync::{Arc, atomic::AtomicU64},
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 sync::{SetOnce, mpsc},
13 task::JoinHandle,
14};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17
18use crate::{
19 Task, metrics::IndexerMetrics, store::Store, types::full_checkpoint_content::Checkpoint,
20};
21
22use super::{CommitterConfig, PIPELINE_BUFFER, Processor, WatermarkPart, processor::processor};
23
24use self::{
25 collector::collector, commit_watermark::commit_watermark, committer::committer,
26 main_reader_lo::track_main_reader_lo, pruner::pruner, reader_watermark::reader_watermark,
27};
28
29mod collector;
30mod commit_watermark;
31mod committer;
32mod main_reader_lo;
33mod pruner;
34mod reader_watermark;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum BatchStatus {
39 Pending,
41 Ready,
43}
44
45#[async_trait]
65pub trait Handler: Processor {
66 type Store: Store;
67 type Batch: Default + Send + Sync + 'static;
68
69 const MIN_EAGER_ROWS: usize = 50;
71
72 const MAX_PENDING_ROWS: usize = 5000;
74
75 const MAX_WATERMARK_UPDATES: usize = 10_000;
79
80 fn batch(
89 &self,
90 batch: &mut Self::Batch,
91 values: &mut std::vec::IntoIter<Self::Value>,
92 ) -> BatchStatus;
93
94 async fn commit<'a>(
96 &self,
97 batch: &Self::Batch,
98 conn: &mut <Self::Store as Store>::Connection<'a>,
99 ) -> anyhow::Result<usize>;
100
101 async fn prune<'a>(
104 &self,
105 _from: u64,
106 _to_exclusive: u64,
107 _conn: &mut <Self::Store as Store>::Connection<'a>,
108 ) -> anyhow::Result<usize> {
109 Ok(0)
110 }
111}
112
113#[derive(Serialize, Deserialize, Debug, Clone, Default)]
115pub struct ConcurrentConfig {
116 pub committer: CommitterConfig,
118
119 pub pruner: Option<PrunerConfig>,
121}
122
123#[derive(Serialize, Deserialize, Debug, Clone)]
124pub struct PrunerConfig {
125 pub interval_ms: u64,
127
128 pub delay_ms: u64,
131
132 pub retention: u64,
134
135 pub max_chunk_size: u64,
137
138 pub prune_concurrency: u64,
140}
141
142struct BatchedRows<H: Handler> {
148 batch: H::Batch,
150 batch_len: usize,
152 watermark: Vec<WatermarkPart>,
154}
155
156impl<H, V> BatchedRows<H>
157where
158 H: Handler<Batch = Vec<V>, Value = V>,
159{
160 #[cfg(test)]
161 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
162 let batch_len = batch.len();
163 Self {
164 batch,
165 batch_len,
166 watermark,
167 }
168 }
169}
170
171impl PrunerConfig {
172 pub fn interval(&self) -> Duration {
173 Duration::from_millis(self.interval_ms)
174 }
175
176 pub fn delay(&self) -> Duration {
177 Duration::from_millis(self.delay_ms)
178 }
179}
180
181impl Default for PrunerConfig {
182 fn default() -> Self {
183 Self {
184 interval_ms: 300_000,
185 delay_ms: 120_000,
186 retention: 4_000_000,
187 max_chunk_size: 2_000,
188 prune_concurrency: 1,
189 }
190 }
191}
192
193pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
213 handler: H,
214 next_checkpoint: u64,
215 config: ConcurrentConfig,
216 store: H::Store,
217 task: Option<Task>,
218 checkpoint_rx: mpsc::Receiver<Arc<Checkpoint>>,
219 metrics: Arc<IndexerMetrics>,
220 cancel: CancellationToken,
221) -> JoinHandle<()> {
222 info!(
223 pipeline = H::NAME,
224 "Starting pipeline with config: {:?}", config
225 );
226 let ConcurrentConfig {
227 committer: committer_config,
228 pruner: pruner_config,
229 } = config;
230
231 let (processor_tx, collector_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);
232 let (collector_tx, committer_rx) =
234 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
235 let (committer_tx, watermark_rx) =
237 mpsc::channel(committer_config.write_concurrency + PIPELINE_BUFFER);
238
239 let pruner_cancel = cancel.child_token();
244 let handler = Arc::new(handler);
245
246 let main_reader_lo = Arc::new(SetOnce::<AtomicU64>::new());
247
248 let main_reader_lo_task = track_main_reader_lo::<H>(
249 main_reader_lo.clone(),
250 task.as_ref().map(|t| t.reader_interval),
251 pruner_cancel.clone(),
252 store.clone(),
253 );
254
255 let processor = processor(
256 handler.clone(),
257 checkpoint_rx,
258 processor_tx,
259 metrics.clone(),
260 cancel.clone(),
261 );
262
263 let collector = collector::<H>(
264 handler.clone(),
265 committer_config.clone(),
266 collector_rx,
267 collector_tx,
268 main_reader_lo.clone(),
269 metrics.clone(),
270 cancel.clone(),
271 );
272
273 let committer = committer::<H>(
274 handler.clone(),
275 committer_config.clone(),
276 committer_rx,
277 committer_tx,
278 store.clone(),
279 metrics.clone(),
280 cancel.clone(),
281 );
282
283 let commit_watermark = commit_watermark::<H>(
284 next_checkpoint,
285 committer_config,
286 watermark_rx,
287 store.clone(),
288 task.as_ref().map(|t| t.task.clone()),
289 metrics.clone(),
290 cancel,
291 );
292
293 let reader_watermark = reader_watermark::<H>(
294 pruner_config.clone(),
295 store.clone(),
296 metrics.clone(),
297 pruner_cancel.clone(),
298 );
299
300 let pruner = pruner(
301 handler,
302 pruner_config,
303 store,
304 metrics,
305 pruner_cancel.clone(),
306 );
307
308 tokio::spawn(async move {
309 let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);
310
311 pruner_cancel.cancel();
312 let _ = futures::join!(main_reader_lo_task, reader_watermark, pruner);
313 })
314}
315
316#[cfg(test)]
317mod tests {
318 use std::{sync::Arc, time::Duration};
319
320 use prometheus::Registry;
321 use tokio::{sync::mpsc, time::timeout};
322 use tokio_util::sync::CancellationToken;
323
324 use crate::{
325 FieldCount,
326 metrics::IndexerMetrics,
327 mocks::store::{MockConnection, MockStore},
328 pipeline::Processor,
329 types::{
330 full_checkpoint_content::Checkpoint,
331 test_checkpoint_data_builder::TestCheckpointBuilder,
332 },
333 };
334
335 use super::*;
336
337 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
338 const TEST_CHECKPOINT_BUFFER_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
341 struct TestValue {
342 checkpoint: u64,
343 data: u64,
344 }
345
346 struct DataPipeline;
347
348 #[async_trait]
349 impl Processor for DataPipeline {
350 const NAME: &'static str = "test_handler";
351 const FANOUT: usize = 2;
352 type Value = TestValue;
353
354 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
355 let cp_num = checkpoint.summary.sequence_number;
356
357 Ok(vec![
359 TestValue {
360 checkpoint: cp_num,
361 data: cp_num * 10 + 1,
362 },
363 TestValue {
364 checkpoint: cp_num,
365 data: cp_num * 10 + 2,
366 },
367 ])
368 }
369 }
370
371 #[async_trait]
372 impl Handler for DataPipeline {
373 type Store = MockStore;
374 type Batch = Vec<TestValue>;
375
376 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
381 &self,
382 batch: &mut Self::Batch,
383 values: &mut std::vec::IntoIter<Self::Value>,
384 ) -> BatchStatus {
385 batch.extend(values);
387 BatchStatus::Pending
388 }
389
390 async fn commit<'a>(
391 &self,
392 batch: &Self::Batch,
393 conn: &mut MockConnection<'a>,
394 ) -> anyhow::Result<usize> {
395 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
397 std::collections::HashMap::new();
398 for value in batch {
399 grouped
400 .entry(value.checkpoint)
401 .or_default()
402 .push(value.data);
403 }
404
405 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
407 }
408
409 async fn prune<'a>(
410 &self,
411 from: u64,
412 to_exclusive: u64,
413 conn: &mut MockConnection<'a>,
414 ) -> anyhow::Result<usize> {
415 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
416 }
417 }
418
419 struct TestSetup {
420 store: MockStore,
421 checkpoint_tx: mpsc::Sender<Arc<Checkpoint>>,
422 pipeline_handle: JoinHandle<()>,
423 cancel: CancellationToken,
424 }
425
426 impl TestSetup {
427 async fn new(config: ConcurrentConfig, store: MockStore, next_checkpoint: u64) -> Self {
428 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_CHECKPOINT_BUFFER_SIZE);
429 let metrics = IndexerMetrics::new(None, &Registry::default());
430 let cancel = CancellationToken::new();
431
432 let pipeline_handle = pipeline(
433 DataPipeline,
434 next_checkpoint,
435 config,
436 store.clone(),
437 None,
438 checkpoint_rx,
439 metrics,
440 cancel.clone(),
441 );
442
443 Self {
444 store,
445 checkpoint_tx,
446 pipeline_handle,
447 cancel,
448 }
449 }
450
451 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
452 let checkpoint = Arc::new(
453 TestCheckpointBuilder::new(checkpoint)
454 .with_epoch(1)
455 .with_network_total_transactions(checkpoint * 2)
456 .with_timestamp_ms(1000000000 + checkpoint * 1000)
457 .build_checkpoint(),
458 );
459 self.checkpoint_tx.send(checkpoint).await?;
460 Ok(())
461 }
462
463 async fn shutdown(self) {
464 drop(self.checkpoint_tx);
465 self.cancel.cancel();
466 let _ = self.pipeline_handle.await;
467 }
468
469 async fn send_checkpoint_with_timeout(
470 &self,
471 checkpoint: u64,
472 timeout_duration: Duration,
473 ) -> anyhow::Result<()> {
474 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
475 }
476
477 async fn send_checkpoint_expect_timeout(
478 &self,
479 checkpoint: u64,
480 timeout_duration: Duration,
481 ) {
482 timeout(timeout_duration, self.send_checkpoint(checkpoint))
483 .await
484 .unwrap_err(); }
486 }
487
488 #[tokio::test]
489 async fn test_e2e_pipeline() {
490 let config = ConcurrentConfig {
491 pruner: Some(PrunerConfig {
492 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
496 }),
497 ..Default::default()
498 };
499 let store = MockStore::default();
500 let setup = TestSetup::new(config, store, 0).await;
501
502 for i in 0..3 {
504 setup
505 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
506 .await
507 .unwrap();
508 }
509
510 for i in 0..3 {
512 let data = setup
513 .store
514 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
515 .await;
516 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
517 }
518
519 for i in 3..6 {
521 setup
522 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
523 .await
524 .unwrap();
525 }
526
527 for i in 0..6 {
530 let data = setup
531 .store
532 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
533 .await;
534 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
535 }
536
537 tokio::time::sleep(Duration::from_millis(5_200)).await;
539
540 {
542 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
543
544 assert!(data.contains_key(&3));
546 assert!(data.contains_key(&4));
547 assert!(data.contains_key(&5));
548
549 assert!(!data.contains_key(&0));
551 assert!(!data.contains_key(&1));
552 assert!(!data.contains_key(&2));
553 };
554
555 setup.shutdown().await;
556 }
557
558 #[tokio::test]
559 async fn test_e2e_pipeline_without_pruning() {
560 let config = ConcurrentConfig {
561 pruner: None,
562 ..Default::default()
563 };
564 let store = MockStore::default();
565 let setup = TestSetup::new(config, store, 0).await;
566
567 for i in 0..10 {
569 setup
570 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
571 .await
572 .unwrap();
573 }
574
575 let watermark = setup
577 .store
578 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
579 .await;
580
581 for i in 0..10 {
583 let data = setup
584 .store
585 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
586 .await;
587 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
588 }
589
590 assert_eq!(watermark.checkpoint_hi_inclusive, 9);
592 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
597 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
598 data.len()
599 };
600 assert_eq!(total_checkpoints, 10);
601
602 setup.shutdown().await;
603 }
604
605 #[tokio::test]
606 async fn test_out_of_order_processing() {
607 let config = ConcurrentConfig::default();
608 let store = MockStore::default();
609 let setup = TestSetup::new(config, store, 0).await;
610
611 let checkpoints = vec![2, 0, 4, 1, 3];
613 for cp in checkpoints {
614 setup
615 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
616 .await
617 .unwrap();
618 }
619
620 setup
622 .store
623 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
624 .await;
625
626 for i in 0..5 {
628 let data = setup
629 .store
630 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
631 .await;
632 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
633 }
634
635 setup.shutdown().await;
636 }
637
638 #[tokio::test]
639 async fn test_watermark_progression_with_gaps() {
640 let config = ConcurrentConfig::default();
641 let store = MockStore::default();
642 let setup = TestSetup::new(config, store, 0).await;
643
644 for cp in [0, 1, 3, 4] {
646 setup
647 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
648 .await
649 .unwrap();
650 }
651
652 tokio::time::sleep(Duration::from_secs(1)).await;
654
655 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
657 assert_eq!(watermark.checkpoint_hi_inclusive, 1);
658
659 setup
661 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
662 .await
663 .unwrap();
664
665 let watermark = setup
667 .store
668 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
669 .await;
670 assert_eq!(watermark.checkpoint_hi_inclusive, 4);
671
672 setup.shutdown().await;
673 }
674
675 #[tokio::test]
678 async fn test_back_pressure_collector_max_pending_rows() {
679 let config = ConcurrentConfig {
692 committer: CommitterConfig {
693 collect_interval_ms: 5_000, write_concurrency: 1,
695 ..Default::default()
696 },
697 ..Default::default()
698 };
699 let store = MockStore::default();
700 let setup = TestSetup::new(config, store, 0).await;
701
702 tokio::time::sleep(Duration::from_millis(200)).await;
704
705 for i in 0..14 {
718 setup
719 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
720 .await
721 .unwrap();
722 }
723
724 setup
726 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
727 .await;
728
729 setup
731 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
732 .await
733 .unwrap();
734
735 let data = setup
737 .store
738 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
739 .await;
740 assert_eq!(data, vec![1, 2]);
741
742 setup.shutdown().await;
743 }
744
745 #[tokio::test]
746 async fn test_back_pressure_committer_slow_commits() {
747 let config = ConcurrentConfig {
760 committer: CommitterConfig {
761 write_concurrency: 1, ..Default::default()
763 },
764 ..Default::default()
765 };
766 let store = MockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
768
769 for i in 0..19 {
783 setup
784 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
785 .await
786 .unwrap();
787 }
788
789 let mut back_pressure_checkpoint = None;
794 for checkpoint in 19..22 {
795 if setup
796 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
797 .await
798 .is_err()
799 {
800 back_pressure_checkpoint = Some(checkpoint);
801 break;
802 }
803 }
804 assert!(
805 back_pressure_checkpoint.is_some(),
806 "Back pressure should occur between checkpoints 19-21"
807 );
808
809 setup
811 .store
812 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
813 .await;
814
815 setup
817 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
818 .await
819 .unwrap();
820
821 setup.shutdown().await;
822 }
823
824 #[tokio::test]
827 async fn test_commit_failure_retry() {
828 let config = ConcurrentConfig::default();
829 let store = MockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
831
832 setup
834 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
835 .await
836 .unwrap();
837
838 setup
840 .store
841 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
842 .await;
843
844 let data = setup
846 .store
847 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
848 .await;
849 assert_eq!(data, vec![1, 2]);
850
851 setup.shutdown().await;
852 }
853
854 #[tokio::test]
855 async fn test_prune_failure_retry() {
856 let config = ConcurrentConfig {
857 pruner: Some(PrunerConfig {
858 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
862 }),
863 ..Default::default()
864 };
865
866 let store = MockStore::default().with_prune_failures(0, 2, 1);
868 let setup = TestSetup::new(config, store, 0).await;
869
870 for i in 0..4 {
872 setup
873 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
874 .await
875 .unwrap();
876 }
877
878 for i in 0..4 {
881 let data = setup
882 .store
883 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
884 .await;
885 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
886 }
887
888 setup
890 .store
891 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
892 .await;
893 {
894 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
895 for i in 0..4 {
896 assert!(data.contains_key(&i));
897 }
898 };
899
900 setup
902 .store
903 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
904 .await;
905 {
906 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
907 assert!(data.contains_key(&2));
909 assert!(data.contains_key(&3));
910
911 assert!(!data.contains_key(&0));
913 assert!(!data.contains_key(&1));
914 };
915
916 setup.shutdown().await;
917 }
918
919 #[tokio::test]
920 async fn test_reader_watermark_failure_retry() {
921 let config = ConcurrentConfig {
922 pruner: Some(PrunerConfig {
923 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
927 }),
928 ..Default::default()
929 };
930
931 let store = MockStore::default().with_reader_watermark_failures(2);
933 let setup = TestSetup::new(config, store, 0).await;
934
935 for i in 0..6 {
937 setup
938 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
939 .await
940 .unwrap();
941 }
942
943 setup
945 .store
946 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
947 .await;
948
949 tokio::time::sleep(Duration::from_secs(2)).await;
951
952 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
954 assert_eq!(watermark.reader_lo, 3);
955
956 setup.shutdown().await;
957 }
958
959 #[tokio::test]
960 async fn test_database_connection_failure_retry() {
961 let config = ConcurrentConfig::default();
962 let store = MockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
964
965 setup
967 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
968 .await
969 .unwrap();
970
971 setup
973 .store
974 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
975 .await;
976
977 let data = setup
979 .store
980 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
981 .await;
982 assert_eq!(data, vec![1, 2]);
983
984 setup.shutdown().await;
985 }
986}