sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::{
9    service::Service,
10    stream::{Break, TrySpawnStreamExt},
11};
12use sui_types::full_checkpoint_content::Checkpoint;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, error, info};
16
17use crate::metrics::{CheckpointLagMetricReporter, IndexerMetrics};
18
19use super::IndexedCheckpoint;
20use async_trait::async_trait;
21
22/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24
25/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
26const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
27
28/// Implementors of this trait are responsible for transforming checkpoint into rows for their
29/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
30/// process checkpoint information.
31#[async_trait]
32pub trait Processor: Send + Sync + 'static {
33    /// Used to identify the pipeline in logs and metrics.
34    const NAME: &'static str;
35
36    /// How much concurrency to use when processing checkpoint data.
37    const FANOUT: usize = 10;
38
39    /// The type of value being inserted by the handler.
40    type Value: Send + Sync + 'static;
41
42    /// The processing logic for turning a checkpoint into rows of the table.
43    ///
44    /// All errors returned from this method are treated as transient and will be retried
45    /// indefinitely with exponential backoff.
46    ///
47    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
48    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
49    /// operators that manual intervention is required. Do not return permanent errors as they will
50    /// cause infinite retries and block the pipeline.
51    ///
52    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
53    /// let the framework retry automatically.
54    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
55}
56
57/// The processor task is responsible for taking checkpoint data and breaking it down into rows
58/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
59/// distributes them among `H::FANOUT` workers.
60///
61/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
62/// channel.
63pub(super) fn processor<P: Processor>(
64    processor: Arc<P>,
65    rx: mpsc::Receiver<Arc<Checkpoint>>,
66    tx: mpsc::Sender<IndexedCheckpoint<P>>,
67    metrics: Arc<IndexerMetrics>,
68) -> Service {
69    Service::new().spawn_aborting(async move {
70        info!(pipeline = P::NAME, "Starting processor");
71        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
72            &metrics.processed_checkpoint_timestamp_lag,
73            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
74            &metrics.latest_processed_checkpoint,
75        );
76
77        match ReceiverStream::new(rx)
78            .try_for_each_spawned(P::FANOUT, |checkpoint| {
79                let tx = tx.clone();
80                let metrics = metrics.clone();
81                let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
82                let processor = processor.clone();
83
84                async move {
85                    metrics
86                        .total_handler_checkpoints_received
87                        .with_label_values(&[P::NAME])
88                        .inc();
89
90                    let guard = metrics
91                        .handler_checkpoint_latency
92                        .with_label_values(&[P::NAME])
93                        .start_timer();
94
95                    // Retry processing with exponential backoff - treat all errors as transient
96                    let backoff = ExponentialBackoff {
97                        initial_interval: INITIAL_RETRY_INTERVAL,
98                        current_interval: INITIAL_RETRY_INTERVAL,
99                        max_interval: MAX_RETRY_INTERVAL,
100                        max_elapsed_time: None, // Retry indefinitely
101                        ..Default::default()
102                    };
103
104                    let values = backoff::future::retry(backoff, || async {
105                        processor
106                            .process(&checkpoint)
107                            .await
108                            .map_err(backoff::Error::transient)
109                    })
110                    .await?;
111
112                    let elapsed = guard.stop_and_record();
113
114                    let epoch = checkpoint.summary.epoch;
115                    let cp_sequence_number = checkpoint.summary.sequence_number;
116                    let tx_hi = checkpoint.summary.network_total_transactions;
117                    let timestamp_ms = checkpoint.summary.timestamp_ms;
118
119                    debug!(
120                        pipeline = P::NAME,
121                        checkpoint = cp_sequence_number,
122                        elapsed_ms = elapsed * 1000.0,
123                        "Processed checkpoint",
124                    );
125
126                    checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
127
128                    metrics
129                        .total_handler_checkpoints_processed
130                        .with_label_values(&[P::NAME])
131                        .inc();
132
133                    metrics
134                        .total_handler_rows_created
135                        .with_label_values(&[P::NAME])
136                        .inc_by(values.len() as u64);
137
138                    tx.send(IndexedCheckpoint::new(
139                        epoch,
140                        cp_sequence_number,
141                        tx_hi,
142                        timestamp_ms,
143                        values,
144                    ))
145                    .await
146                    .map_err(|_| Break::Break)?;
147
148                    Ok(())
149                }
150            })
151            .await
152        {
153            Ok(()) => {
154                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
155            }
156
157            Err(Break::Break) => {
158                info!(pipeline = P::NAME, "Channel closed, stopping processor");
159            }
160
161            Err(Break::Err(e)) => {
162                error!(pipeline = P::NAME, "Error from handler: {e}");
163                return Err(e.context(format!("Error from processor {}", P::NAME)));
164            }
165        };
166
167        Ok(())
168    })
169}
170
171#[cfg(test)]
172mod tests {
173    use crate::metrics::IndexerMetrics;
174    use anyhow::ensure;
175    use std::{
176        sync::{
177            Arc,
178            atomic::{AtomicU32, Ordering},
179        },
180        time::Duration,
181    };
182    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
183    use tokio::{sync::mpsc, time::timeout};
184
185    use super::*;
186
187    pub struct StoredData {
188        pub value: u64,
189    }
190
191    pub struct DataPipeline;
192
193    #[async_trait]
194    impl Processor for DataPipeline {
195        const NAME: &'static str = "data";
196
197        type Value = StoredData;
198
199        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
200            Ok(vec![
201                StoredData {
202                    value: checkpoint.summary.sequence_number * 10 + 1,
203                },
204                StoredData {
205                    value: checkpoint.summary.sequence_number * 10 + 2,
206                },
207            ])
208        }
209    }
210
211    #[tokio::test]
212    async fn test_processor_process_checkpoints() {
213        // Build two checkpoints using the test builder
214        let checkpoint1: Arc<Checkpoint> = Arc::new(
215            TestCheckpointBuilder::new(1)
216                .with_epoch(2)
217                .with_network_total_transactions(5)
218                .with_timestamp_ms(1000000001)
219                .build_checkpoint(),
220        );
221        let checkpoint2: Arc<Checkpoint> = Arc::new(
222            TestCheckpointBuilder::new(2)
223                .with_epoch(2)
224                .with_network_total_transactions(10)
225                .with_timestamp_ms(1000000002)
226                .build_checkpoint(),
227        );
228
229        // Set up the processor, channels, and metrics
230        let processor = Arc::new(DataPipeline);
231        let (data_tx, data_rx) = mpsc::channel(2);
232        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
233        let metrics = IndexerMetrics::new(None, &Default::default());
234
235        // Spawn the processor task
236        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
237
238        // Send both checkpoints
239        data_tx.send(checkpoint1.clone()).await.unwrap();
240        data_tx.send(checkpoint2.clone()).await.unwrap();
241
242        // Receive and verify first checkpoint
243        let indexed1 = indexed_rx
244            .recv()
245            .await
246            .expect("Should receive first IndexedCheckpoint");
247        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
248        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
249        assert_eq!(indexed1.watermark.tx_hi, 5);
250        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
251        assert_eq!(indexed1.values.len(), 2);
252        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
253        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
254
255        // Receive and verify second checkpoint
256        let indexed2 = indexed_rx
257            .recv()
258            .await
259            .expect("Should receive second IndexedCheckpoint");
260        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
261        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
262        assert_eq!(indexed2.watermark.tx_hi, 10);
263        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
264        assert_eq!(indexed2.values.len(), 2);
265        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
266        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
267
268        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
269        assert!(
270            timeout_result.is_err(),
271            "Should timeout waiting for more checkpoints"
272        );
273    }
274
275    #[tokio::test]
276    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
277        // Build two checkpoints using the test builder
278        let checkpoint1: Arc<Checkpoint> =
279            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
280        let checkpoint2: Arc<Checkpoint> =
281            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
282
283        // Set up the processor, channels, and metrics
284        let processor = Arc::new(DataPipeline);
285        let (data_tx, data_rx) = mpsc::channel(2);
286        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
287        let metrics = IndexerMetrics::new(None, &Default::default());
288
289        // Spawn the processor task
290        let svc = super::processor(processor, data_rx, indexed_tx, metrics);
291
292        // Send first checkpoint.
293        data_tx.send(checkpoint1.clone()).await.unwrap();
294
295        // Receive and verify first checkpoint
296        let indexed1 = indexed_rx
297            .recv()
298            .await
299            .expect("Should receive first IndexedCheckpoint");
300        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
301
302        // Shutdown the processor
303        svc.shutdown().await.unwrap();
304
305        // Sending second checkpoint after shutdown should fail, because the data_rx channel is
306        // closed.
307        data_tx.send(checkpoint2.clone()).await.unwrap_err();
308
309        // Indexed channel is closed, and indexed_rx receives the last None result.
310        let next_result = indexed_rx.recv().await;
311        assert!(
312            next_result.is_none(),
313            "Channel should be closed after shutdown"
314        );
315    }
316
317    #[tokio::test]
318    async fn test_processor_error_retry_behavior() {
319        struct RetryTestPipeline {
320            attempt_count: Arc<AtomicU32>,
321        }
322
323        #[async_trait]
324        impl Processor for RetryTestPipeline {
325            const NAME: &'static str = "retry_test";
326            type Value = StoredData;
327            async fn process(
328                &self,
329                checkpoint: &Arc<Checkpoint>,
330            ) -> anyhow::Result<Vec<Self::Value>> {
331                if checkpoint.summary.sequence_number == 1 {
332                    Ok(vec![])
333                } else {
334                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
335                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
336                    Ok(vec![])
337                }
338            }
339        }
340
341        // Set up test data
342        let checkpoint1: Arc<Checkpoint> =
343            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
344        let checkpoint2: Arc<Checkpoint> =
345            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
346
347        let attempt_count = Arc::new(AtomicU32::new(0));
348        let processor = Arc::new(RetryTestPipeline {
349            attempt_count: attempt_count.clone(),
350        });
351
352        let (data_tx, data_rx) = mpsc::channel(2);
353        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
354
355        let metrics = IndexerMetrics::new(None, &Default::default());
356
357        // Spawn the processor task
358        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
359
360        // Send and verify first checkpoint (should succeed immediately)
361        data_tx.send(checkpoint1.clone()).await.unwrap();
362        let indexed1 = indexed_rx
363            .recv()
364            .await
365            .expect("Should receive first IndexedCheckpoint");
366        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
367
368        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
369        data_tx.send(checkpoint2.clone()).await.unwrap();
370
371        let indexed2 = indexed_rx
372            .recv()
373            .await
374            .expect("Should receive second IndexedCheckpoint after retries");
375        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
376
377        // Verify that exactly 3 attempts were made (2 failures + 1 success)
378        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
379    }
380
381    // By default, Rust's async tests run on the single-threaded runtime.
382    // We need multi_thread here because our test uses std::thread::sleep which blocks the worker thread.
383    // The multi-threaded runtime allows other worker threads to continue processing while one is blocked.
384    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
385    async fn test_processor_concurrency() {
386        // Create a processor that simulates work by sleeping
387        struct SlowProcessor;
388        #[async_trait]
389        impl Processor for SlowProcessor {
390            const NAME: &'static str = "slow";
391            const FANOUT: usize = 3; // Small fanout for testing
392            type Value = StoredData;
393
394            async fn process(
395                &self,
396                checkpoint: &Arc<Checkpoint>,
397            ) -> anyhow::Result<Vec<Self::Value>> {
398                // Simulate work by sleeping
399                std::thread::sleep(std::time::Duration::from_millis(500));
400                Ok(vec![StoredData {
401                    value: checkpoint.summary.sequence_number,
402                }])
403            }
404        }
405
406        // Set up test data
407        let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
408            .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
409            .collect();
410
411        // Set up channels and metrics
412        let processor = Arc::new(SlowProcessor);
413        let (data_tx, data_rx) = mpsc::channel(10);
414        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
415        let metrics = IndexerMetrics::new(None, &Default::default());
416
417        // Spawn processor task
418        let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
419
420        // Send all checkpoints and measure time
421        let start = std::time::Instant::now();
422        for checkpoint in checkpoints {
423            data_tx.send(checkpoint).await.unwrap();
424        }
425        drop(data_tx);
426
427        // Receive all results
428        let mut received = Vec::new();
429        while let Some(indexed) = indexed_rx.recv().await {
430            received.push(indexed);
431        }
432
433        // Verify concurrency: total time should be less than sequential processing
434        // With FANOUT=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
435        // Adding small 200ms for some processing overhead.
436        let elapsed = start.elapsed();
437        assert!(elapsed < std::time::Duration::from_millis(1200));
438
439        // Verify results
440        assert_eq!(received.len(), 5);
441    }
442}