sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use sui_types::full_checkpoint_content::Checkpoint;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tracing::debug;
15use tracing::error;
16use tracing::info;
17
18use async_trait::async_trait;
19
20use crate::config::ConcurrencyConfig;
21use crate::metrics::CheckpointLagMetricReporter;
22use crate::metrics::IndexerMetrics;
23use crate::pipeline::IndexedCheckpoint;
24
25/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
26const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
27
28/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
29const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
30
31/// Implementors of this trait are responsible for transforming checkpoint into rows for their
32/// table.
33#[async_trait]
34pub trait Processor: Send + Sync + 'static {
35    /// Used to identify the pipeline in logs and metrics.
36    const NAME: &'static str;
37
38    /// The type of value being inserted by the handler.
39    type Value: Send + Sync + 'static;
40
41    /// The processing logic for turning a checkpoint into rows of the table.
42    ///
43    /// All errors returned from this method are treated as transient and will be retried
44    /// indefinitely with exponential backoff.
45    ///
46    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
47    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
48    /// operators that manual intervention is required. Do not return permanent errors as they will
49    /// cause infinite retries and block the pipeline.
50    ///
51    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
52    /// let the framework retry automatically.
53    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
54}
55
56/// The processor task is responsible for taking checkpoint data and breaking it down into rows
57/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
58/// distributes them among workers whose concurrency is governed by `concurrency`.
59///
60/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
61/// channel.
62pub(super) fn processor<P: Processor>(
63    processor: Arc<P>,
64    rx: mpsc::Receiver<Arc<Checkpoint>>,
65    tx: mpsc::Sender<IndexedCheckpoint<P>>,
66    metrics: Arc<IndexerMetrics>,
67    concurrency: ConcurrencyConfig,
68) -> Service {
69    Service::new().spawn_aborting(async move {
70        info!(pipeline = P::NAME, "Starting processor");
71        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
72            &metrics.processed_checkpoint_timestamp_lag,
73            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
74            &metrics.latest_processed_checkpoint,
75        );
76
77        let report_metrics = metrics.clone();
78        match ReceiverStream::new(rx)
79            .try_for_each_send_spawned(
80                concurrency.into(),
81                |checkpoint| {
82                    let metrics = metrics.clone();
83                    let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
84                    let processor = processor.clone();
85
86                    async move {
87                        metrics
88                            .total_handler_checkpoints_received
89                            .with_label_values(&[P::NAME])
90                            .inc();
91
92                        let guard = metrics
93                            .handler_checkpoint_latency
94                            .with_label_values(&[P::NAME])
95                            .start_timer();
96
97                        // Retry processing with exponential backoff
98                        let backoff = ExponentialBackoff {
99                            initial_interval: INITIAL_RETRY_INTERVAL,
100                            current_interval: INITIAL_RETRY_INTERVAL,
101                            max_interval: MAX_RETRY_INTERVAL,
102                            max_elapsed_time: None,
103                            ..Default::default()
104                        };
105
106                        let values = backoff::future::retry(backoff, || async {
107                            processor
108                                .process(&checkpoint)
109                                .await
110                                .map_err(backoff::Error::transient)
111                        })
112                        .await?;
113
114                        let elapsed = guard.stop_and_record();
115
116                        let epoch = checkpoint.summary.epoch;
117                        let cp_sequence_number = checkpoint.summary.sequence_number;
118                        let tx_hi = checkpoint.summary.network_total_transactions;
119                        let timestamp_ms = checkpoint.summary.timestamp_ms;
120
121                        debug!(
122                            pipeline = P::NAME,
123                            checkpoint = cp_sequence_number,
124                            elapsed_ms = elapsed * 1000.0,
125                            "Processed checkpoint",
126                        );
127
128                        checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
129
130                        metrics
131                            .total_handler_checkpoints_processed
132                            .with_label_values(&[P::NAME])
133                            .inc();
134
135                        metrics
136                            .total_handler_rows_created
137                            .with_label_values(&[P::NAME])
138                            .inc_by(values.len() as u64);
139
140                        Ok(IndexedCheckpoint::new(
141                            epoch,
142                            cp_sequence_number,
143                            tx_hi,
144                            timestamp_ms,
145                            values,
146                        ))
147                    }
148                },
149                tx,
150                move |stats| {
151                    report_metrics
152                        .processor_concurrency_limit
153                        .with_label_values(&[P::NAME])
154                        .set(stats.limit as i64);
155                    report_metrics
156                        .processor_concurrency_inflight
157                        .with_label_values(&[P::NAME])
158                        .set(stats.inflight as i64);
159                },
160            )
161            .await
162        {
163            Ok(()) => {
164                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
165            }
166
167            Err(Break::Break) => {
168                info!(pipeline = P::NAME, "Channel closed, stopping processor");
169            }
170
171            Err(Break::Err(e)) => {
172                error!(pipeline = P::NAME, "Error from handler: {e}");
173                return Err(e.context(format!("Error from processor {}", P::NAME)));
174            }
175        };
176
177        Ok(())
178    })
179}
180
181#[cfg(test)]
182mod tests {
183    use std::sync::Arc;
184    use std::sync::atomic::AtomicU32;
185    use std::sync::atomic::Ordering;
186    use std::time::Duration;
187
188    use anyhow::ensure;
189    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
190    use tokio::sync::mpsc;
191    use tokio::time::timeout;
192
193    use crate::metrics::IndexerMetrics;
194
195    use super::*;
196
197    pub struct StoredData {
198        pub value: u64,
199    }
200
201    pub struct DataPipeline;
202
203    #[async_trait]
204    impl Processor for DataPipeline {
205        const NAME: &'static str = "data";
206
207        type Value = StoredData;
208
209        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
210            Ok(vec![
211                StoredData {
212                    value: checkpoint.summary.sequence_number * 10 + 1,
213                },
214                StoredData {
215                    value: checkpoint.summary.sequence_number * 10 + 2,
216                },
217            ])
218        }
219    }
220
221    #[tokio::test]
222    async fn test_processor_process_checkpoints() {
223        // Build two checkpoints using the test builder
224        let checkpoint1: Arc<Checkpoint> = Arc::new(
225            TestCheckpointBuilder::new(1)
226                .with_epoch(2)
227                .with_network_total_transactions(5)
228                .with_timestamp_ms(1000000001)
229                .build_checkpoint(),
230        );
231        let checkpoint2: Arc<Checkpoint> = Arc::new(
232            TestCheckpointBuilder::new(2)
233                .with_epoch(2)
234                .with_network_total_transactions(10)
235                .with_timestamp_ms(1000000002)
236                .build_checkpoint(),
237        );
238
239        // Set up the processor, channels, and metrics
240        let processor = Arc::new(DataPipeline);
241        let (data_tx, data_rx) = mpsc::channel(2);
242        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
243        let metrics = IndexerMetrics::new(None, &Default::default());
244
245        // Spawn the processor task
246        let _svc = super::processor(
247            processor,
248            data_rx,
249            indexed_tx,
250            metrics,
251            ConcurrencyConfig::Fixed { value: 10 },
252        );
253
254        // Send both checkpoints
255        data_tx.send(checkpoint1.clone()).await.unwrap();
256        data_tx.send(checkpoint2.clone()).await.unwrap();
257
258        // Receive and verify first checkpoint
259        let indexed1 = indexed_rx
260            .recv()
261            .await
262            .expect("Should receive first IndexedCheckpoint");
263        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
264        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
265        assert_eq!(indexed1.watermark.tx_hi, 5);
266        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
267        assert_eq!(indexed1.values.len(), 2);
268        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
269        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
270
271        // Receive and verify second checkpoint
272        let indexed2 = indexed_rx
273            .recv()
274            .await
275            .expect("Should receive second IndexedCheckpoint");
276        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
277        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
278        assert_eq!(indexed2.watermark.tx_hi, 10);
279        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
280        assert_eq!(indexed2.values.len(), 2);
281        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
282        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
283
284        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
285        assert!(
286            timeout_result.is_err(),
287            "Should timeout waiting for more checkpoints"
288        );
289    }
290
291    #[tokio::test]
292    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
293        // Build two checkpoints using the test builder
294        let checkpoint1: Arc<Checkpoint> =
295            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
296        let checkpoint2: Arc<Checkpoint> =
297            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
298
299        // Set up the processor, channels, and metrics
300        let processor = Arc::new(DataPipeline);
301        let (data_tx, data_rx) = mpsc::channel(2);
302        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
303        let metrics = IndexerMetrics::new(None, &Default::default());
304
305        // Spawn the processor task
306        let svc = super::processor(
307            processor,
308            data_rx,
309            indexed_tx,
310            metrics,
311            ConcurrencyConfig::Fixed { value: 10 },
312        );
313
314        // Send first checkpoint.
315        data_tx.send(checkpoint1.clone()).await.unwrap();
316
317        // Receive and verify first checkpoint
318        let indexed1 = indexed_rx
319            .recv()
320            .await
321            .expect("Should receive first IndexedCheckpoint");
322        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
323
324        // Shutdown the processor
325        svc.shutdown().await.unwrap();
326
327        // Sending second checkpoint after shutdown should fail, because the data_rx channel is
328        // closed.
329        data_tx.send(checkpoint2.clone()).await.unwrap_err();
330
331        // Indexed channel is closed, and indexed_rx receives the last None result.
332        let next_result = indexed_rx.recv().await;
333        assert!(
334            next_result.is_none(),
335            "Channel should be closed after shutdown"
336        );
337    }
338
339    #[tokio::test]
340    async fn test_processor_error_retry_behavior() {
341        struct RetryTestPipeline {
342            attempt_count: Arc<AtomicU32>,
343        }
344
345        #[async_trait]
346        impl Processor for RetryTestPipeline {
347            const NAME: &'static str = "retry_test";
348            type Value = StoredData;
349            async fn process(
350                &self,
351                checkpoint: &Arc<Checkpoint>,
352            ) -> anyhow::Result<Vec<Self::Value>> {
353                if checkpoint.summary.sequence_number == 1 {
354                    Ok(vec![])
355                } else {
356                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
357                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
358                    Ok(vec![])
359                }
360            }
361        }
362
363        // Set up test data
364        let checkpoint1: Arc<Checkpoint> =
365            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
366        let checkpoint2: Arc<Checkpoint> =
367            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
368
369        let attempt_count = Arc::new(AtomicU32::new(0));
370        let processor = Arc::new(RetryTestPipeline {
371            attempt_count: attempt_count.clone(),
372        });
373
374        let (data_tx, data_rx) = mpsc::channel(2);
375        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
376
377        let metrics = IndexerMetrics::new(None, &Default::default());
378
379        // Spawn the processor task
380        let _svc = super::processor(
381            processor,
382            data_rx,
383            indexed_tx,
384            metrics,
385            ConcurrencyConfig::Fixed { value: 10 },
386        );
387
388        // Send and verify first checkpoint (should succeed immediately)
389        data_tx.send(checkpoint1.clone()).await.unwrap();
390        let indexed1 = indexed_rx
391            .recv()
392            .await
393            .expect("Should receive first IndexedCheckpoint");
394        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
395
396        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
397        data_tx.send(checkpoint2.clone()).await.unwrap();
398
399        let indexed2 = indexed_rx
400            .recv()
401            .await
402            .expect("Should receive second IndexedCheckpoint after retries");
403        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
404
405        // Verify that exactly 3 attempts were made (2 failures + 1 success)
406        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
407    }
408
409    // By default, Rust's async tests run on the single-threaded runtime.
410    // We need multi_thread here because our test uses std::thread::sleep which blocks the worker thread.
411    // The multi-threaded runtime allows other worker threads to continue processing while one is blocked.
412    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
413    async fn test_processor_concurrency() {
414        // Create a processor that simulates work by sleeping
415        struct SlowProcessor;
416        #[async_trait]
417        impl Processor for SlowProcessor {
418            const NAME: &'static str = "slow";
419            type Value = StoredData;
420
421            async fn process(
422                &self,
423                checkpoint: &Arc<Checkpoint>,
424            ) -> anyhow::Result<Vec<Self::Value>> {
425                // Simulate work by sleeping
426                std::thread::sleep(std::time::Duration::from_millis(500));
427                Ok(vec![StoredData {
428                    value: checkpoint.summary.sequence_number,
429                }])
430            }
431        }
432
433        // Set up test data
434        let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
435            .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
436            .collect();
437
438        // Set up channels and metrics
439        let processor = Arc::new(SlowProcessor);
440        let (data_tx, data_rx) = mpsc::channel(10);
441        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
442        let metrics = IndexerMetrics::new(None, &Default::default());
443
444        // Spawn processor task
445        let _svc = super::processor(
446            processor,
447            data_rx,
448            indexed_tx,
449            metrics,
450            ConcurrencyConfig::Fixed { value: 3 },
451        );
452
453        // Send all checkpoints and measure time
454        let start = std::time::Instant::now();
455        for checkpoint in checkpoints {
456            data_tx.send(checkpoint).await.unwrap();
457        }
458        drop(data_tx);
459
460        // Receive all results
461        let mut received = Vec::new();
462        while let Some(indexed) = indexed_rx.recv().await {
463            received.push(indexed);
464        }
465
466        // Verify concurrency: total time should be less than sequential processing
467        // With concurrency=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
468        // Adding small 200ms for some processing overhead.
469        let elapsed = start.elapsed();
470        assert!(elapsed < std::time::Duration::from_millis(1200));
471
472        // Verify results
473        assert_eq!(received.len(), 5);
474    }
475}