sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use sui_types::full_checkpoint_content::Checkpoint;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tracing::debug;
15use tracing::error;
16use tracing::info;
17
18use async_trait::async_trait;
19
20use crate::metrics::CheckpointLagMetricReporter;
21use crate::metrics::IndexerMetrics;
22use crate::pipeline::IndexedCheckpoint;
23
24/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
25const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
26
27/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
28const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
29
30/// Implementors of this trait are responsible for transforming checkpoint into rows for their
31/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
32/// process checkpoint information.
33#[async_trait]
34pub trait Processor: Send + Sync + 'static {
35    /// Used to identify the pipeline in logs and metrics.
36    const NAME: &'static str;
37
38    /// How much concurrency to use when processing checkpoint data.
39    const FANOUT: usize = 10;
40
41    /// The type of value being inserted by the handler.
42    type Value: Send + Sync + 'static;
43
44    /// The processing logic for turning a checkpoint into rows of the table.
45    ///
46    /// All errors returned from this method are treated as transient and will be retried
47    /// indefinitely with exponential backoff.
48    ///
49    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
50    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
51    /// operators that manual intervention is required. Do not return permanent errors as they will
52    /// cause infinite retries and block the pipeline.
53    ///
54    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
55    /// let the framework retry automatically.
56    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
57}
58
59/// The processor task is responsible for taking checkpoint data and breaking it down into rows
60/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
61/// distributes them among `H::FANOUT` workers.
62///
63/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
64/// channel.
65pub(super) fn processor<P: Processor>(
66    processor: Arc<P>,
67    rx: mpsc::Receiver<Arc<Checkpoint>>,
68    tx: mpsc::Sender<IndexedCheckpoint<P>>,
69    metrics: Arc<IndexerMetrics>,
70) -> Service {
71    Service::new().spawn_aborting(async move {
72        info!(pipeline = P::NAME, "Starting processor");
73        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
74            &metrics.processed_checkpoint_timestamp_lag,
75            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
76            &metrics.latest_processed_checkpoint,
77        );
78
79        match ReceiverStream::new(rx)
80            .try_for_each_spawned(P::FANOUT, |checkpoint| {
81                let tx = tx.clone();
82                let metrics = metrics.clone();
83                let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
84                let processor = processor.clone();
85
86                async move {
87                    metrics
88                        .total_handler_checkpoints_received
89                        .with_label_values(&[P::NAME])
90                        .inc();
91
92                    let guard = metrics
93                        .handler_checkpoint_latency
94                        .with_label_values(&[P::NAME])
95                        .start_timer();
96
97                    // Retry processing with exponential backoff - treat all errors as transient
98                    let backoff = ExponentialBackoff {
99                        initial_interval: INITIAL_RETRY_INTERVAL,
100                        current_interval: INITIAL_RETRY_INTERVAL,
101                        max_interval: MAX_RETRY_INTERVAL,
102                        max_elapsed_time: None, // Retry indefinitely
103                        ..Default::default()
104                    };
105
106                    let values = backoff::future::retry(backoff, || async {
107                        processor
108                            .process(&checkpoint)
109                            .await
110                            .map_err(backoff::Error::transient)
111                    })
112                    .await?;
113
114                    let elapsed = guard.stop_and_record();
115
116                    let epoch = checkpoint.summary.epoch;
117                    let cp_sequence_number = checkpoint.summary.sequence_number;
118                    let tx_hi = checkpoint.summary.network_total_transactions;
119                    let timestamp_ms = checkpoint.summary.timestamp_ms;
120
121                    debug!(
122                        pipeline = P::NAME,
123                        checkpoint = cp_sequence_number,
124                        elapsed_ms = elapsed * 1000.0,
125                        "Processed checkpoint",
126                    );
127
128                    checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
129
130                    metrics
131                        .total_handler_checkpoints_processed
132                        .with_label_values(&[P::NAME])
133                        .inc();
134
135                    metrics
136                        .total_handler_rows_created
137                        .with_label_values(&[P::NAME])
138                        .inc_by(values.len() as u64);
139
140                    tx.send(IndexedCheckpoint::new(
141                        epoch,
142                        cp_sequence_number,
143                        tx_hi,
144                        timestamp_ms,
145                        values,
146                    ))
147                    .await
148                    .map_err(|_| Break::Break)?;
149
150                    Ok(())
151                }
152            })
153            .await
154        {
155            Ok(()) => {
156                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
157            }
158
159            Err(Break::Break) => {
160                info!(pipeline = P::NAME, "Channel closed, stopping processor");
161            }
162
163            Err(Break::Err(e)) => {
164                error!(pipeline = P::NAME, "Error from handler: {e}");
165                return Err(e.context(format!("Error from processor {}", P::NAME)));
166            }
167        };
168
169        Ok(())
170    })
171}
172
173#[cfg(test)]
174mod tests {
175    use std::sync::Arc;
176    use std::sync::atomic::AtomicU32;
177    use std::sync::atomic::Ordering;
178    use std::time::Duration;
179
180    use anyhow::ensure;
181    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
182    use tokio::sync::mpsc;
183    use tokio::time::timeout;
184
185    use crate::metrics::IndexerMetrics;
186
187    use super::*;
188
189    pub struct StoredData {
190        pub value: u64,
191    }
192
193    pub struct DataPipeline;
194
195    #[async_trait]
196    impl Processor for DataPipeline {
197        const NAME: &'static str = "data";
198
199        type Value = StoredData;
200
201        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
202            Ok(vec![
203                StoredData {
204                    value: checkpoint.summary.sequence_number * 10 + 1,
205                },
206                StoredData {
207                    value: checkpoint.summary.sequence_number * 10 + 2,
208                },
209            ])
210        }
211    }
212
213    #[tokio::test]
214    async fn test_processor_process_checkpoints() {
215        // Build two checkpoints using the test builder
216        let checkpoint1: Arc<Checkpoint> = Arc::new(
217            TestCheckpointBuilder::new(1)
218                .with_epoch(2)
219                .with_network_total_transactions(5)
220                .with_timestamp_ms(1000000001)
221                .build_checkpoint(),
222        );
223        let checkpoint2: Arc<Checkpoint> = Arc::new(
224            TestCheckpointBuilder::new(2)
225                .with_epoch(2)
226                .with_network_total_transactions(10)
227                .with_timestamp_ms(1000000002)
228                .build_checkpoint(),
229        );
230
231        // Set up the processor, channels, and metrics
232        let processor = Arc::new(DataPipeline);
233        let (data_tx, data_rx) = mpsc::channel(2);
234        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
235        let metrics = IndexerMetrics::new(None, &Default::default());
236
237        // Spawn the processor task
238        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
239
240        // Send both checkpoints
241        data_tx.send(checkpoint1.clone()).await.unwrap();
242        data_tx.send(checkpoint2.clone()).await.unwrap();
243
244        // Receive and verify first checkpoint
245        let indexed1 = indexed_rx
246            .recv()
247            .await
248            .expect("Should receive first IndexedCheckpoint");
249        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
250        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
251        assert_eq!(indexed1.watermark.tx_hi, 5);
252        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
253        assert_eq!(indexed1.values.len(), 2);
254        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
255        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
256
257        // Receive and verify second checkpoint
258        let indexed2 = indexed_rx
259            .recv()
260            .await
261            .expect("Should receive second IndexedCheckpoint");
262        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
263        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
264        assert_eq!(indexed2.watermark.tx_hi, 10);
265        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
266        assert_eq!(indexed2.values.len(), 2);
267        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
268        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
269
270        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
271        assert!(
272            timeout_result.is_err(),
273            "Should timeout waiting for more checkpoints"
274        );
275    }
276
277    #[tokio::test]
278    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
279        // Build two checkpoints using the test builder
280        let checkpoint1: Arc<Checkpoint> =
281            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
282        let checkpoint2: Arc<Checkpoint> =
283            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
284
285        // Set up the processor, channels, and metrics
286        let processor = Arc::new(DataPipeline);
287        let (data_tx, data_rx) = mpsc::channel(2);
288        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
289        let metrics = IndexerMetrics::new(None, &Default::default());
290
291        // Spawn the processor task
292        let svc = super::processor(processor, data_rx, indexed_tx, metrics);
293
294        // Send first checkpoint.
295        data_tx.send(checkpoint1.clone()).await.unwrap();
296
297        // Receive and verify first checkpoint
298        let indexed1 = indexed_rx
299            .recv()
300            .await
301            .expect("Should receive first IndexedCheckpoint");
302        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
303
304        // Shutdown the processor
305        svc.shutdown().await.unwrap();
306
307        // Sending second checkpoint after shutdown should fail, because the data_rx channel is
308        // closed.
309        data_tx.send(checkpoint2.clone()).await.unwrap_err();
310
311        // Indexed channel is closed, and indexed_rx receives the last None result.
312        let next_result = indexed_rx.recv().await;
313        assert!(
314            next_result.is_none(),
315            "Channel should be closed after shutdown"
316        );
317    }
318
319    #[tokio::test]
320    async fn test_processor_error_retry_behavior() {
321        struct RetryTestPipeline {
322            attempt_count: Arc<AtomicU32>,
323        }
324
325        #[async_trait]
326        impl Processor for RetryTestPipeline {
327            const NAME: &'static str = "retry_test";
328            type Value = StoredData;
329            async fn process(
330                &self,
331                checkpoint: &Arc<Checkpoint>,
332            ) -> anyhow::Result<Vec<Self::Value>> {
333                if checkpoint.summary.sequence_number == 1 {
334                    Ok(vec![])
335                } else {
336                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
337                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
338                    Ok(vec![])
339                }
340            }
341        }
342
343        // Set up test data
344        let checkpoint1: Arc<Checkpoint> =
345            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
346        let checkpoint2: Arc<Checkpoint> =
347            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
348
349        let attempt_count = Arc::new(AtomicU32::new(0));
350        let processor = Arc::new(RetryTestPipeline {
351            attempt_count: attempt_count.clone(),
352        });
353
354        let (data_tx, data_rx) = mpsc::channel(2);
355        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
356
357        let metrics = IndexerMetrics::new(None, &Default::default());
358
359        // Spawn the processor task
360        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
361
362        // Send and verify first checkpoint (should succeed immediately)
363        data_tx.send(checkpoint1.clone()).await.unwrap();
364        let indexed1 = indexed_rx
365            .recv()
366            .await
367            .expect("Should receive first IndexedCheckpoint");
368        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
369
370        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
371        data_tx.send(checkpoint2.clone()).await.unwrap();
372
373        let indexed2 = indexed_rx
374            .recv()
375            .await
376            .expect("Should receive second IndexedCheckpoint after retries");
377        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
378
379        // Verify that exactly 3 attempts were made (2 failures + 1 success)
380        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
381    }
382
383    // By default, Rust's async tests run on the single-threaded runtime.
384    // We need multi_thread here because our test uses std::thread::sleep which blocks the worker thread.
385    // The multi-threaded runtime allows other worker threads to continue processing while one is blocked.
386    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
387    async fn test_processor_concurrency() {
388        // Create a processor that simulates work by sleeping
389        struct SlowProcessor;
390        #[async_trait]
391        impl Processor for SlowProcessor {
392            const NAME: &'static str = "slow";
393            const FANOUT: usize = 3; // Small fanout for testing
394            type Value = StoredData;
395
396            async fn process(
397                &self,
398                checkpoint: &Arc<Checkpoint>,
399            ) -> anyhow::Result<Vec<Self::Value>> {
400                // Simulate work by sleeping
401                std::thread::sleep(std::time::Duration::from_millis(500));
402                Ok(vec![StoredData {
403                    value: checkpoint.summary.sequence_number,
404                }])
405            }
406        }
407
408        // Set up test data
409        let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
410            .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
411            .collect();
412
413        // Set up channels and metrics
414        let processor = Arc::new(SlowProcessor);
415        let (data_tx, data_rx) = mpsc::channel(10);
416        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
417        let metrics = IndexerMetrics::new(None, &Default::default());
418
419        // Spawn processor task
420        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
421
422        // Send all checkpoints and measure time
423        let start = std::time::Instant::now();
424        for checkpoint in checkpoints {
425            data_tx.send(checkpoint).await.unwrap();
426        }
427        drop(data_tx);
428
429        // Receive all results
430        let mut received = Vec::new();
431        while let Some(indexed) = indexed_rx.recv().await {
432            received.push(indexed);
433        }
434
435        // Verify concurrency: total time should be less than sequential processing
436        // With FANOUT=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
437        // Adding small 200ms for some processing overhead.
438        let elapsed = start.elapsed();
439        assert!(elapsed < std::time::Duration::from_millis(1200));
440
441        // Verify results
442        assert_eq!(received.len(), 5);
443    }
444}