sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use backoff::ExponentialBackoff;
9use sui_futures::service::Service;
10use sui_futures::stream::Break;
11use sui_futures::stream::TrySpawnStreamExt;
12use sui_indexer_alt_framework_store_traits::Connection;
13use sui_indexer_alt_framework_store_traits::Store;
14use sui_types::digests::ChainIdentifier;
15use sui_types::full_checkpoint_content::Checkpoint;
16use tokio::sync::OnceCell;
17use tokio::sync::mpsc;
18use tokio_stream::wrappers::ReceiverStream;
19use tracing::debug;
20use tracing::error;
21use tracing::info;
22
23use crate::config::ConcurrencyConfig;
24use crate::ingestion::ingestion_client::CheckpointEnvelope;
25use crate::metrics::CheckpointLagMetricReporter;
26use crate::metrics::IndexerMetrics;
27use crate::pipeline::IndexedCheckpoint;
28
29/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
30const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
31
32/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
33const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
34
35/// Implementors of this trait are responsible for transforming checkpoint into rows for their
36/// table.
37#[async_trait]
38pub trait Processor: Send + Sync + 'static {
39    /// Used to identify the pipeline in logs and metrics.
40    const NAME: &'static str;
41
42    /// The type of value being inserted by the handler.
43    type Value: Send + Sync + 'static;
44
45    /// The processing logic for turning a checkpoint into rows of the table.
46    ///
47    /// All errors returned from this method are treated as transient and will be retried
48    /// indefinitely with exponential backoff.
49    ///
50    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
51    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
52    /// operators that manual intervention is required. Do not return permanent errors as they will
53    /// cause infinite retries and block the pipeline.
54    ///
55    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
56    /// let the framework retry automatically.
57    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
58}
59
60/// The processor task is responsible for taking checkpoint data and breaking it down into rows
61/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
62/// distributes them among workers whose concurrency is governed by `concurrency`.
63///
64/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
65/// channel.
66pub(super) fn processor<P: Processor, S: Store>(
67    processor: Arc<P>,
68    rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
69    tx: mpsc::Sender<IndexedCheckpoint<P>>,
70    metrics: Arc<IndexerMetrics>,
71    concurrency: ConcurrencyConfig,
72    store: S,
73) -> Service {
74    Service::new().spawn_aborting(async move {
75        info!(pipeline = P::NAME, "Starting processor");
76        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
77            &metrics.processed_checkpoint_timestamp_lag,
78            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
79            &metrics.latest_processed_checkpoint,
80        );
81
82        let report_metrics = metrics.clone();
83        // Caches this pipeline's chain_id after the first time it has been read from the database.
84        let chain_id_accepted: Arc<OnceCell<(bool, ChainIdentifier)>> = Arc::default();
85        match ReceiverStream::new(rx)
86            .try_for_each_send_spawned(
87                concurrency.into(),
88                |checkpoint_envelope| {
89                    let metrics = metrics.clone();
90                    let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
91                    let processor = processor.clone();
92                    let store = store.clone();
93                    let chain_id_accepted = chain_id_accepted.clone();
94
95                    async move {
96
97                        metrics
98                            .total_handler_checkpoints_received
99                            .with_label_values(&[P::NAME])
100                            .inc();
101
102                        let guard = metrics
103                            .handler_checkpoint_latency
104                            .with_label_values(&[P::NAME])
105                            .start_timer();
106
107                        // Retry processing with exponential backoff
108                        let backoff = ExponentialBackoff {
109                            initial_interval: INITIAL_RETRY_INTERVAL,
110                            current_interval: INITIAL_RETRY_INTERVAL,
111                            max_interval: MAX_RETRY_INTERVAL,
112                            max_elapsed_time: None,
113                            ..Default::default()
114                        };
115
116                        let chain_id = checkpoint_envelope.chain_id;
117                        let checkpoint = &checkpoint_envelope.checkpoint;
118                        let checkpoint_sequence_number = checkpoint.summary.sequence_number;
119                        let retry_metrics = metrics.clone();
120                        let values = backoff::future::retry_notify(
121                            backoff,
122                            || async {
123                                let (accepted, accepted_chain_id) = chain_id_accepted.get_or_try_init(async || {
124                                    let mut conn = store.connect().await
125                                        .map_err(backoff::Error::transient)?;
126                                    let accepted = conn.accepts_chain_id(P::NAME, *chain_id.as_bytes()).await
127                                        .map_err(backoff::Error::transient)?;
128                                    Ok::<_, backoff::Error<anyhow::Error>>((accepted, chain_id))
129                                }).await?;
130                                if !accepted || *accepted_chain_id != chain_id {
131                                    return Err(backoff::Error::permanent(anyhow::anyhow!(
132                                        "checkpoint chain_id={chain_id:?} does not match stored chain_id",
133                                    )))
134                                }
135
136                                processor
137                                    .process(checkpoint)
138                                    .await
139                                    .map_err(backoff::Error::transient)
140                            },
141                            move |error: anyhow::Error, delay| {
142                                retry_metrics.inc_processor_retry::<P>(
143                                    checkpoint_sequence_number,
144                                    &error,
145                                    delay,
146                                );
147                            },
148                        )
149                        .await?;
150
151                        let elapsed = guard.stop_and_record();
152
153                        let epoch = checkpoint.summary.epoch;
154                        let cp_sequence_number = checkpoint.summary.sequence_number;
155                        let tx_hi = checkpoint.summary.network_total_transactions;
156                        let timestamp_ms = checkpoint.summary.timestamp_ms;
157
158                        debug!(
159                            pipeline = P::NAME,
160                            checkpoint = cp_sequence_number,
161                            elapsed_ms = elapsed * 1000.0,
162                            "Processed checkpoint",
163                        );
164
165                        checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
166
167                        metrics
168                            .total_handler_checkpoints_processed
169                            .with_label_values(&[P::NAME])
170                            .inc();
171
172                        metrics
173                            .total_handler_rows_created
174                            .with_label_values(&[P::NAME])
175                            .inc_by(values.len() as u64);
176
177                        Ok(IndexedCheckpoint::new(
178                            epoch,
179                            cp_sequence_number,
180                            tx_hi,
181                            timestamp_ms,
182                            values,
183                        ))
184                    }
185                },
186                tx,
187                move |stats| {
188                    report_metrics
189                        .processor_concurrency_limit
190                        .with_label_values(&[P::NAME])
191                        .set(stats.limit as i64);
192                    report_metrics
193                        .processor_concurrency_inflight
194                        .with_label_values(&[P::NAME])
195                        .set(stats.inflight as i64);
196                },
197            )
198            .await
199        {
200            Ok(()) => {
201                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
202            }
203
204            Err(Break::Break) => {
205                info!(pipeline = P::NAME, "Channel closed, stopping processor");
206            }
207
208            Err(Break::Err(e)) => {
209                error!(pipeline = P::NAME, "Error from handler: {e}");
210                return Err(e.context(format!("Error from processor {}", P::NAME)));
211            }
212        };
213
214        Ok(())
215    })
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::Arc;
221    use std::sync::atomic::AtomicU32;
222    use std::sync::atomic::Ordering;
223    use std::time::Duration;
224
225    use anyhow::ensure;
226    use sui_futures::service;
227    use sui_types::digests::ChainIdentifier;
228    use sui_types::digests::CheckpointDigest;
229    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
230    use tokio::sync::mpsc;
231    use tokio::time::timeout;
232
233    use crate::metrics::IndexerMetrics;
234    use crate::mocks::store::MockStore;
235    use crate::mocks::store::MockWatermark;
236
237    use super::*;
238
239    pub struct StoredData {
240        pub value: u64,
241    }
242
243    pub struct DataPipeline;
244
245    #[async_trait]
246    impl Processor for DataPipeline {
247        const NAME: &'static str = "data";
248
249        type Value = StoredData;
250
251        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
252            Ok(vec![
253                StoredData {
254                    value: checkpoint.summary.sequence_number * 10 + 1,
255                },
256                StoredData {
257                    value: checkpoint.summary.sequence_number * 10 + 2,
258                },
259            ])
260        }
261    }
262
263    #[tokio::test]
264    async fn test_processor_process_checkpoints() {
265        // Build two checkpoints using the test builder
266        let checkpoint_envelope_1 = Arc::new(CheckpointEnvelope {
267            checkpoint: Arc::new(
268                TestCheckpointBuilder::new(1)
269                    .with_epoch(2)
270                    .with_network_total_transactions(5)
271                    .with_timestamp_ms(1000000001)
272                    .build_checkpoint(),
273            ),
274            chain_id: ChainIdentifier::default(),
275        });
276        let checkpoint_envelope_2 = Arc::new(CheckpointEnvelope {
277            checkpoint: Arc::new(
278                TestCheckpointBuilder::new(2)
279                    .with_epoch(2)
280                    .with_network_total_transactions(10)
281                    .with_timestamp_ms(1000000002)
282                    .build_checkpoint(),
283            ),
284            chain_id: ChainIdentifier::default(),
285        });
286
287        // Set up the processor, channels, and metrics
288        let processor = Arc::new(DataPipeline);
289        let (data_tx, data_rx) = mpsc::channel(2);
290        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
291        let metrics = IndexerMetrics::new(None, &Default::default());
292
293        // Spawn the processor task
294        let _svc = super::processor(
295            processor,
296            data_rx,
297            indexed_tx,
298            metrics,
299            ConcurrencyConfig::Fixed { value: 10 },
300            MockStore::default(),
301        );
302
303        // Send both checkpoints
304        data_tx.send(checkpoint_envelope_1).await.unwrap();
305        data_tx.send(checkpoint_envelope_2).await.unwrap();
306
307        // Receive and verify first checkpoint
308        let indexed1 = indexed_rx
309            .recv()
310            .await
311            .expect("Should receive first IndexedCheckpoint");
312        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
313        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
314        assert_eq!(indexed1.watermark.tx_hi, 5);
315        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
316        assert_eq!(indexed1.values.len(), 2);
317        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
318        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
319
320        // Receive and verify second checkpoint
321        let indexed2 = indexed_rx
322            .recv()
323            .await
324            .expect("Should receive second IndexedCheckpoint");
325        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
326        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
327        assert_eq!(indexed2.watermark.tx_hi, 10);
328        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
329        assert_eq!(indexed2.values.len(), 2);
330        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
331        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
332
333        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
334        assert!(
335            timeout_result.is_err(),
336            "Should timeout waiting for more checkpoints"
337        );
338    }
339
340    #[tokio::test]
341    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
342        // Build two checkpoints using the test builder
343        let checkpoint_envelope_1 = Arc::new(CheckpointEnvelope {
344            checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
345            chain_id: ChainIdentifier::default(),
346        });
347        let checkpoint_envelope_2 = Arc::new(CheckpointEnvelope {
348            checkpoint: Arc::new(TestCheckpointBuilder::new(2).build_checkpoint()),
349            chain_id: ChainIdentifier::default(),
350        });
351
352        // Set up the processor, channels, and metrics
353        let processor = Arc::new(DataPipeline);
354        let (data_tx, data_rx) = mpsc::channel(2);
355        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
356        let metrics = IndexerMetrics::new(None, &Default::default());
357
358        // Spawn the processor task
359        let svc = super::processor(
360            processor,
361            data_rx,
362            indexed_tx,
363            metrics,
364            ConcurrencyConfig::Fixed { value: 10 },
365            MockStore::default(),
366        );
367
368        // Send first checkpoint.
369        data_tx.send(checkpoint_envelope_1).await.unwrap();
370
371        // Receive and verify first checkpoint
372        let indexed1 = indexed_rx
373            .recv()
374            .await
375            .expect("Should receive first IndexedCheckpoint");
376        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
377
378        // Shutdown the processor
379        svc.shutdown().await.unwrap();
380
381        // Sending second checkpoint after shutdown should fail, because the data_rx channel is
382        // closed.
383        data_tx.send(checkpoint_envelope_2).await.unwrap_err();
384
385        // Indexed channel is closed, and indexed_rx receives the last None result.
386        let next_result = indexed_rx.recv().await;
387        assert!(
388            next_result.is_none(),
389            "Channel should be closed after shutdown"
390        );
391    }
392
393    #[tokio::test]
394    async fn test_processor_error_retry_behavior() {
395        struct RetryTestPipeline {
396            attempt_count: Arc<AtomicU32>,
397        }
398
399        #[async_trait]
400        impl Processor for RetryTestPipeline {
401            const NAME: &'static str = "retry_test";
402            type Value = StoredData;
403            async fn process(
404                &self,
405                checkpoint: &Arc<Checkpoint>,
406            ) -> anyhow::Result<Vec<Self::Value>> {
407                if checkpoint.summary.sequence_number == 1 {
408                    Ok(vec![])
409                } else {
410                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
411                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
412                    Ok(vec![])
413                }
414            }
415        }
416
417        // Set up test data
418        let checkpoint1 = Arc::new(CheckpointEnvelope {
419            checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
420            chain_id: ChainIdentifier::default(),
421        });
422        let checkpoint2 = Arc::new(CheckpointEnvelope {
423            checkpoint: Arc::new(TestCheckpointBuilder::new(2).build_checkpoint()),
424            chain_id: ChainIdentifier::default(),
425        });
426
427        let attempt_count = Arc::new(AtomicU32::new(0));
428        let processor = Arc::new(RetryTestPipeline {
429            attempt_count: attempt_count.clone(),
430        });
431
432        let (data_tx, data_rx) = mpsc::channel(2);
433        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
434
435        let metrics = IndexerMetrics::new(None, &Default::default());
436
437        // Spawn the processor task
438        let _svc = super::processor(
439            processor,
440            data_rx,
441            indexed_tx,
442            metrics.clone(),
443            ConcurrencyConfig::Fixed { value: 10 },
444            MockStore::default(),
445        );
446
447        // Send and verify first checkpoint (should succeed immediately)
448        data_tx.send(checkpoint1.clone()).await.unwrap();
449        let indexed1 = indexed_rx
450            .recv()
451            .await
452            .expect("Should receive first IndexedCheckpoint");
453        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
454
455        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
456        data_tx.send(checkpoint2.clone()).await.unwrap();
457
458        let indexed2 = indexed_rx
459            .recv()
460            .await
461            .expect("Should receive second IndexedCheckpoint after retries");
462        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
463
464        // Verify that exactly 3 attempts were made (2 failures + 1 success)
465        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
466        assert_eq!(
467            metrics
468                .total_handler_processor_retries
469                .with_label_values(&[RetryTestPipeline::NAME])
470                .get(),
471            2
472        );
473    }
474
475    async fn test_chain_id(
476        store: MockStore,
477        checkpoint_chain_id: ChainIdentifier,
478    ) -> (
479        Option<IndexedCheckpoint<DataPipeline>>,
480        Result<(), service::Error>,
481    ) {
482        let checkpoint_envelope = Arc::new(CheckpointEnvelope {
483            checkpoint: Arc::new(TestCheckpointBuilder::new(1).build_checkpoint()),
484            chain_id: checkpoint_chain_id,
485        });
486
487        let processor = Arc::new(DataPipeline);
488        let (data_tx, data_rx) = mpsc::channel(1);
489        let (indexed_tx, mut indexed_rx) = mpsc::channel(1);
490        let metrics = IndexerMetrics::new(None, &Default::default());
491
492        let service = super::processor(
493            processor,
494            data_rx,
495            indexed_tx,
496            metrics,
497            ConcurrencyConfig::Fixed { value: 1 },
498            store,
499        );
500
501        // Send the checkpoint then close the input channel.
502        data_tx.try_send(checkpoint_envelope).unwrap();
503        drop(data_tx);
504
505        let indexed_checkpoint = indexed_rx.recv().await;
506        let shutdown_result = service.shutdown().await;
507
508        (indexed_checkpoint, shutdown_result)
509    }
510
511    #[tokio::test]
512    async fn test_chain_id_stored_when_none_exists() {
513        let store = MockStore::default();
514        let chain_id = ChainIdentifier::default();
515
516        let (indexed_checkpoint, shutdown_result) = test_chain_id(store.clone(), chain_id).await;
517        assert!(shutdown_result.is_ok());
518
519        let indexed = indexed_checkpoint.expect("Should receive IndexedCheckpoint");
520        assert_eq!(indexed.watermark.checkpoint_hi_inclusive, 1);
521
522        let watermark = store.watermark(DataPipeline::NAME).unwrap();
523        assert_eq!(
524            watermark.chain_id,
525            Some(*chain_id.as_bytes()),
526            "chain_id should be stored after first checkpoint"
527        );
528    }
529
530    #[tokio::test]
531    async fn test_chain_id_matches_existing() {
532        let chain_id = ChainIdentifier::default();
533        let store = MockStore::default().with_watermark(
534            DataPipeline::NAME,
535            MockWatermark {
536                chain_id: Some(*chain_id.as_bytes()),
537                ..Default::default()
538            },
539        );
540
541        let (indexed_checkpoint, shutdown_result) = test_chain_id(store, chain_id).await;
542        assert!(shutdown_result.is_ok());
543
544        let indexed =
545            indexed_checkpoint.expect("Should receive IndexedCheckpoint when chain_id matches");
546        assert_eq!(indexed.watermark.checkpoint_hi_inclusive, 1);
547    }
548
549    #[tokio::test]
550    async fn test_chain_id_mismatch_returns_error() {
551        let stored_chain_id = ChainIdentifier::default();
552        let different_chain_id: ChainIdentifier = CheckpointDigest::from([1u8; 32]).into();
553        let store = MockStore::default().with_watermark(
554            DataPipeline::NAME,
555            MockWatermark {
556                chain_id: Some(*stored_chain_id.as_bytes()),
557                ..Default::default()
558            },
559        );
560
561        let (indexed_checkpoint, shutdown_result) = test_chain_id(store, different_chain_id).await;
562        let shutdown_err = shutdown_result.unwrap_err();
563        assert!(
564            format!("{shutdown_err:#}").contains("does not match"),
565            "Error should indicate chain_id mismatch, got: {shutdown_err:#}"
566        );
567
568        // The processor should fail and drop indexed_tx, so recv returns None.
569        assert!(
570            indexed_checkpoint.is_none(),
571            "Channel should close without producing a result on chain_id mismatch"
572        );
573    }
574
575    #[tokio::test]
576    async fn test_processor_concurrency() {
577        // Create a processor that simulates work by sleeping
578        struct SlowProcessor;
579        #[async_trait]
580        impl Processor for SlowProcessor {
581            const NAME: &'static str = "slow";
582            type Value = StoredData;
583
584            async fn process(
585                &self,
586                checkpoint: &Arc<Checkpoint>,
587            ) -> anyhow::Result<Vec<Self::Value>> {
588                // Use tokio::time::sleep rather than std::thread::sleep to avoid
589                // starving tasks woken by the chain_id RwLock. std::thread::sleep
590                // blocks the worker thread, preventing woken tasks queued on the
591                // same thread from making progress until the sleep finishes.
592                tokio::time::sleep(Duration::from_millis(500)).await;
593                Ok(vec![StoredData {
594                    value: checkpoint.summary.sequence_number,
595                }])
596            }
597        }
598
599        // Set up test data
600        let checkpoints: Vec<Arc<CheckpointEnvelope>> = (0..5)
601            .map(|i| {
602                Arc::new(CheckpointEnvelope {
603                    checkpoint: Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()),
604                    chain_id: ChainIdentifier::default(),
605                })
606            })
607            .collect();
608
609        // Set up channels and metrics
610        let processor = Arc::new(SlowProcessor);
611        let (data_tx, data_rx) = mpsc::channel(10);
612        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
613        let metrics = IndexerMetrics::new(None, &Default::default());
614
615        // Spawn processor task
616        let _svc = super::processor(
617            processor,
618            data_rx,
619            indexed_tx,
620            metrics,
621            ConcurrencyConfig::Fixed { value: 3 },
622            MockStore::default(),
623        );
624
625        // Send all checkpoints and measure time
626        let start = std::time::Instant::now();
627        for checkpoint in checkpoints {
628            data_tx.send(checkpoint).await.unwrap();
629        }
630        drop(data_tx);
631
632        // Receive all results
633        let mut received = Vec::new();
634        while let Some(indexed) = indexed_rx.recv().await {
635            received.push(indexed);
636        }
637
638        // Verify concurrency: total time should be less than sequential processing
639        // With concurrency=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
640        // Adding small 200ms for some processing overhead.
641        let elapsed = start.elapsed();
642        assert!(elapsed < Duration::from_millis(1200));
643
644        // Verify results
645        assert_eq!(received.len(), 5);
646    }
647}