1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use backoff::ExponentialBackoff;
9use sui_futures::service::Service;
10use sui_futures::stream::Break;
11use sui_futures::stream::TrySpawnStreamExt;
12use sui_indexer_alt_framework_store_traits::Connection;
13use sui_indexer_alt_framework_store_traits::Store;
14use sui_types::digests::ChainIdentifier;
15use sui_types::full_checkpoint_content::Checkpoint;
16use tokio::sync::OnceCell;
17use tokio::sync::mpsc;
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::debug;
20use tracing::error;
21use tracing::info;
22
23use crate::config::ConcurrencyConfig;
24use crate::ingestion::ingestion_client::CheckpointEnvelope;
25use crate::metrics::CheckpointLagMetricReporter;
26use crate::metrics::IndexerMetrics;
27use crate::pipeline::IndexedCheckpoint;
28
29const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
31
32const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
34
35#[async_trait]
38pub trait Processor: Send + Sync + 'static {
39 const NAME: &'static str;
41
42 type Value: Send + Sync + 'static;
44
45 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
58}
59
60pub(super) fn processor<P: Processor, S: Store>(
67 processor: Arc<P>,
68 rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
69 tx: mpsc::Sender<IndexedCheckpoint<P>>,
70 metrics: Arc<IndexerMetrics>,
71 concurrency: ConcurrencyConfig,
72 store: S,
73) -> Service {
74 Service::new().spawn_aborting(async move {
75 info!(pipeline = P::NAME, "Starting processor");
76 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
77 &metrics.processed_checkpoint_timestamp_lag,
78 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
79 &metrics.latest_processed_checkpoint,
80 );
81
82 let report_metrics = metrics.clone();
83 let chain_id_accepted: Arc<OnceCell<(bool, ChainIdentifier)>> = Arc::default();
85 match ReceiverStream::new(rx)
86 .try_for_each_send_spawned(
87 concurrency.into(),
88 |checkpoint_envelope| {
89 let metrics = metrics.clone();
90 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
91 let processor = processor.clone();
92 let store = store.clone();
93 let chain_id_accepted = chain_id_accepted.clone();
94
95 async move {
96
97 metrics
98 .total_handler_checkpoints_received
99 .with_label_values(&[P::NAME])
100 .inc();
101
102 let guard = metrics
103 .handler_checkpoint_latency
104 .with_label_values(&[P::NAME])
105 .start_timer();
106
107 let backoff = ExponentialBackoff {
109 initial_interval: INITIAL_RETRY_INTERVAL,
110 current_interval: INITIAL_RETRY_INTERVAL,
111 max_interval: MAX_RETRY_INTERVAL,
112 max_elapsed_time: None,
113 ..Default::default()
114 };
115
116 let chain_id = checkpoint_envelope.chain_id;
117 let checkpoint = &checkpoint_envelope.checkpoint;
118 let checkpoint_sequence_number = checkpoint.summary.sequence_number;
119 let retry_metrics = metrics.clone();
120 let values = backoff::future::retry_notify(
121 backoff,
122 || async {
123 let (accepted, accepted_chain_id) = chain_id_accepted.get_or_try_init(async || {
124 let mut conn = store.connect().await
125 .map_err(backoff::Error::transient)?;
126 let accepted = conn.accepts_chain_id(P::NAME, *chain_id.as_bytes()).await
127 .map_err(backoff::Error::transient)?;
128 Ok::<_, backoff::Error<anyhow::Error>>((accepted, chain_id))
129 }).await?;
130 if !accepted || *accepted_chain_id != chain_id {
131 return Err(backoff::Error::permanent(anyhow::anyhow!(
132 "checkpoint chain_id={chain_id:?} does not match stored chain_id",
133 )))
134 }
135
136 processor
137 .process(checkpoint)
138 .await
139 .map_err(backoff::Error::transient)
140 },
141 move |error: anyhow::Error, delay| {
142 retry_metrics.inc_processor_retry::<P>(
143 checkpoint_sequence_number,
144 &error,
145 delay,
146 );
147 },
148 )
149 .await?;
150
151 let elapsed = guard.stop_and_record();
152
153 let epoch = checkpoint.summary.epoch;
154 let cp_sequence_number = checkpoint.summary.sequence_number;
155 let tx_hi = checkpoint.summary.network_total_transactions;
156 let timestamp_ms = checkpoint.summary.timestamp_ms;
157
158 debug!(
159 pipeline = P::NAME,
160 checkpoint = cp_sequence_number,
161 elapsed_ms = elapsed * 1000.0,
162 "Processed checkpoint",
163 );
164
165 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
166
167 metrics
168 .total_handler_checkpoints_processed
169 .with_label_values(&[P::NAME])
170 .inc();
171
172 metrics
173 .total_handler_rows_created
174 .with_label_values(&[P::NAME])
175 .inc_by(values.len() as u64);
176
177 Ok(IndexedCheckpoint::new(
178 epoch,
179 cp_sequence_number,
180 tx_hi,
181 timestamp_ms,
182 values,
183 ))
184 }
185 },
186 tx,
187 move |stats| {
188 report_metrics
189 .processor_concurrency_limit
190 .with_label_values(&[P::NAME])
191 .set(stats.limit as i64);
192 report_metrics
193 .processor_concurrency_inflight
194 .with_label_values(&[P::NAME])
195 .set(stats.inflight as i64);
196 },
197 )
198 .await
199 {
200 Ok(()) => {
201 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
202 }
203
204 Err(Break::Break) => {
205 info!(pipeline = P::NAME, "Channel closed, stopping processor");
206 }
207
208 Err(Break::Err(e)) => {
209 error!(pipeline = P::NAME, "Error from handler: {e}");
210 return Err(e.context(format!("Error from processor {}", P::NAME)));
211 }
212 };
213
214 Ok(())
215 })
216}
217
218#[cfg(test)]
219mod tests {
220 use std::sync::Arc;
221 use std::sync::atomic::AtomicU32;
222 use std::sync::atomic::Ordering;
223 use std::time::Duration;
224
225 use anyhow::ensure;
226 use sui_futures::service;
227 use sui_types::digests::ChainIdentifier;
228 use sui_types::digests::CheckpointDigest;
229 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
230 use tokio::sync::mpsc;
231 use tokio::time::timeout;
232
233 use crate::metrics::IndexerMetrics;
234 use crate::mocks::store::MockStore;
235 use crate::mocks::store::MockWatermark;
236
237 use super::*;
238
239 pub struct StoredData {
240 pub value: u64,
241 }
242
243 pub struct DataPipeline;
244
245 #[async_trait]
246 impl Processor for DataPipeline {
247 const NAME: &'static str = "data";
248
249 type Value = StoredData;
250
251 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
252 Ok(vec![
253 StoredData {
254 value: checkpoint.summary.sequence_number * 10 + 1,
255 },
256 StoredData {
257 value: checkpoint.summary.sequence_number * 10 + 2,
258 },
259 ])
260 }
261 }
262
263 #[tokio::test]
264 async fn test_processor_process_checkpoints() {
265 let checkpoint_envelope_1 = Arc::new(CheckpointEnvelope {
267 checkpoint: Arc::new(
268 TestCheckpointBuilder::new(1)
269 .with_epoch(2)
270 .with_network_total_transactions(5)
271 .with_timestamp_ms(1000000001)
272 .build_checkpoint(),
273 ),
274 chain_id: ChainIdentifier::default(),
275 });
276 let checkpoint_envelope_2 = Arc::new(CheckpointEnvelope {
277 checkpoint: Arc::new(
278 TestCheckpointBuilder::new(2)
279 .with_epoch(2)
280 .with_network_total_transactions(10)
281 .with_timestamp_ms(1000000002)
282 .build_checkpoint(),
283 ),
284 chain_id: ChainIdentifier::default(),
285 });
286
287 let processor = Arc::new(DataPipeline);
289 let (data_tx, data_rx) = mpsc::channel(2);
290 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
291 let metrics = IndexerMetrics::new(None, &Default::default());
292
293 let _svc = super::processor(
295 processor,
296 data_rx,
297 indexed_tx,
298 metrics,
299 ConcurrencyConfig::Fixed { value: 10 },
300 MockStore::default(),
301 );
302
303 data_tx.send(checkpoint_envelope_1).await.unwrap();
305 data_tx.send(checkpoint_envelope_2).await.unwrap();
306
307 let indexed1 = indexed_rx
309 .recv()
310 .await
311 .expect("Should receive first IndexedCheckpoint");
312 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
313 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
314 assert_eq!(indexed1.watermark.tx_hi, 5);
315 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
316 assert_eq!(indexed1.values.len(), 2);
317 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
322 .recv()
323 .await
324 .expect("Should receive second IndexedCheckpoint");
325 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
326 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
327 assert_eq!(indexed2.watermark.tx_hi, 10);
328 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
329 assert_eq!(indexed2.values.len(), 2);
330 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
334 assert!(
335 timeout_result.is_err(),
336 "Should timeout waiting for more checkpoints"
337 );
338 }
339
340 #[tokio::test]
341 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
342 let checkpoint_envelope_1 = Arc::new(CheckpointEnvelope {
344 checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
345 chain_id: ChainIdentifier::default(),
346 });
347 let checkpoint_envelope_2 = Arc::new(CheckpointEnvelope {
348 checkpoint: Arc::new(TestCheckpointBuilder::new(2).build_checkpoint()),
349 chain_id: ChainIdentifier::default(),
350 });
351
352 let processor = Arc::new(DataPipeline);
354 let (data_tx, data_rx) = mpsc::channel(2);
355 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
356 let metrics = IndexerMetrics::new(None, &Default::default());
357
358 let svc = super::processor(
360 processor,
361 data_rx,
362 indexed_tx,
363 metrics,
364 ConcurrencyConfig::Fixed { value: 10 },
365 MockStore::default(),
366 );
367
368 data_tx.send(checkpoint_envelope_1).await.unwrap();
370
371 let indexed1 = indexed_rx
373 .recv()
374 .await
375 .expect("Should receive first IndexedCheckpoint");
376 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
377
378 svc.shutdown().await.unwrap();
380
381 data_tx.send(checkpoint_envelope_2).await.unwrap_err();
384
385 let next_result = indexed_rx.recv().await;
387 assert!(
388 next_result.is_none(),
389 "Channel should be closed after shutdown"
390 );
391 }
392
393 #[tokio::test]
394 async fn test_processor_error_retry_behavior() {
395 struct RetryTestPipeline {
396 attempt_count: Arc<AtomicU32>,
397 }
398
399 #[async_trait]
400 impl Processor for RetryTestPipeline {
401 const NAME: &'static str = "retry_test";
402 type Value = StoredData;
403 async fn process(
404 &self,
405 checkpoint: &Arc<Checkpoint>,
406 ) -> anyhow::Result<Vec<Self::Value>> {
407 if checkpoint.summary.sequence_number == 1 {
408 Ok(vec![])
409 } else {
410 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
411 ensure!(attempt > 2, "Transient error - attempt {attempt}");
412 Ok(vec![])
413 }
414 }
415 }
416
417 let checkpoint1 = Arc::new(CheckpointEnvelope {
419 checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
420 chain_id: ChainIdentifier::default(),
421 });
422 let checkpoint2 = Arc::new(CheckpointEnvelope {
423 checkpoint: Arc::new(TestCheckpointBuilder::new(2).build_checkpoint()),
424 chain_id: ChainIdentifier::default(),
425 });
426
427 let attempt_count = Arc::new(AtomicU32::new(0));
428 let processor = Arc::new(RetryTestPipeline {
429 attempt_count: attempt_count.clone(),
430 });
431
432 let (data_tx, data_rx) = mpsc::channel(2);
433 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
434
435 let metrics = IndexerMetrics::new(None, &Default::default());
436
437 let _svc = super::processor(
439 processor,
440 data_rx,
441 indexed_tx,
442 metrics.clone(),
443 ConcurrencyConfig::Fixed { value: 10 },
444 MockStore::default(),
445 );
446
447 data_tx.send(checkpoint1.clone()).await.unwrap();
449 let indexed1 = indexed_rx
450 .recv()
451 .await
452 .expect("Should receive first IndexedCheckpoint");
453 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
454
455 data_tx.send(checkpoint2.clone()).await.unwrap();
457
458 let indexed2 = indexed_rx
459 .recv()
460 .await
461 .expect("Should receive second IndexedCheckpoint after retries");
462 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
463
464 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
466 assert_eq!(
467 metrics
468 .total_handler_processor_retries
469 .with_label_values(&[RetryTestPipeline::NAME])
470 .get(),
471 2
472 );
473 }
474
475 async fn test_chain_id(
476 store: MockStore,
477 checkpoint_chain_id: ChainIdentifier,
478 ) -> (
479 Option<IndexedCheckpoint<DataPipeline>>,
480 Result<(), service::Error>,
481 ) {
482 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
483 checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
484 chain_id: checkpoint_chain_id,
485 });
486
487 let processor = Arc::new(DataPipeline);
488 let (data_tx, data_rx) = mpsc::channel(1);
489 let (indexed_tx, mut indexed_rx) = mpsc::channel(1);
490 let metrics = IndexerMetrics::new(None, &Default::default());
491
492 let service = super::processor(
493 processor,
494 data_rx,
495 indexed_tx,
496 metrics,
497 ConcurrencyConfig::Fixed { value: 1 },
498 store,
499 );
500
501 data_tx.try_send(checkpoint_envelope).unwrap();
503 drop(data_tx);
504
505 let indexed_checkpoint = indexed_rx.recv().await;
506 let shutdown_result = service.shutdown().await;
507
508 (indexed_checkpoint, shutdown_result)
509 }
510
511 #[tokio::test]
512 async fn test_chain_id_stored_when_none_exists() {
513 let store = MockStore::default();
514 let chain_id = ChainIdentifier::default();
515
516 let (indexed_checkpoint, shutdown_result) = test_chain_id(store.clone(), chain_id).await;
517 assert!(shutdown_result.is_ok());
518
519 let indexed = indexed_checkpoint.expect("Should receive IndexedCheckpoint");
520 assert_eq!(indexed.watermark.checkpoint_hi_inclusive, 1);
521
522 let watermark = store.watermark(DataPipeline::NAME).unwrap();
523 assert_eq!(
524 watermark.chain_id,
525 Some(*chain_id.as_bytes()),
526 "chain_id should be stored after first checkpoint"
527 );
528 }
529
530 #[tokio::test]
531 async fn test_chain_id_matches_existing() {
532 let chain_id = ChainIdentifier::default();
533 let store = MockStore::default().with_watermark(
534 DataPipeline::NAME,
535 MockWatermark {
536 chain_id: Some(*chain_id.as_bytes()),
537 ..Default::default()
538 },
539 );
540
541 let (indexed_checkpoint, shutdown_result) = test_chain_id(store, chain_id).await;
542 assert!(shutdown_result.is_ok());
543
544 let indexed =
545 indexed_checkpoint.expect("Should receive IndexedCheckpoint when chain_id matches");
546 assert_eq!(indexed.watermark.checkpoint_hi_inclusive, 1);
547 }
548
549 #[tokio::test]
550 async fn test_chain_id_mismatch_returns_error() {
551 let stored_chain_id = ChainIdentifier::default();
552 let different_chain_id: ChainIdentifier = CheckpointDigest::from([1u8; 32]).into();
553 let store = MockStore::default().with_watermark(
554 DataPipeline::NAME,
555 MockWatermark {
556 chain_id: Some(*stored_chain_id.as_bytes()),
557 ..Default::default()
558 },
559 );
560
561 let (indexed_checkpoint, shutdown_result) = test_chain_id(store, different_chain_id).await;
562 let shutdown_err = shutdown_result.unwrap_err();
563 assert!(
564 format!("{shutdown_err:#}").contains("does not match"),
565 "Error should indicate chain_id mismatch, got: {shutdown_err:#}"
566 );
567
568 assert!(
570 indexed_checkpoint.is_none(),
571 "Channel should close without producing a result on chain_id mismatch"
572 );
573 }
574
575 #[tokio::test]
576 async fn test_processor_concurrency() {
577 struct SlowProcessor;
579 #[async_trait]
580 impl Processor for SlowProcessor {
581 const NAME: &'static str = "slow";
582 type Value = StoredData;
583
584 async fn process(
585 &self,
586 checkpoint: &Arc<Checkpoint>,
587 ) -> anyhow::Result<Vec<Self::Value>> {
588 tokio::time::sleep(Duration::from_millis(500)).await;
593 Ok(vec![StoredData {
594 value: checkpoint.summary.sequence_number,
595 }])
596 }
597 }
598
599 let checkpoints: Vec<Arc<CheckpointEnvelope>> = (0..5)
601 .map(|i| {
602 Arc::new(CheckpointEnvelope {
603 checkpoint: Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()),
604 chain_id: ChainIdentifier::default(),
605 })
606 })
607 .collect();
608
609 let processor = Arc::new(SlowProcessor);
611 let (data_tx, data_rx) = mpsc::channel(10);
612 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
613 let metrics = IndexerMetrics::new(None, &Default::default());
614
615 let _svc = super::processor(
617 processor,
618 data_rx,
619 indexed_tx,
620 metrics,
621 ConcurrencyConfig::Fixed { value: 3 },
622 MockStore::default(),
623 );
624
625 let start = std::time::Instant::now();
627 for checkpoint in checkpoints {
628 data_tx.send(checkpoint).await.unwrap();
629 }
630 drop(data_tx);
631
632 let mut received = Vec::new();
634 while let Some(indexed) = indexed_rx.recv().await {
635 received.push(indexed);
636 }
637
638 let elapsed = start.elapsed();
642 assert!(elapsed < Duration::from_millis(1200));
643
644 assert_eq!(received.len(), 5);
646 }
647}