sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use backoff::ExponentialBackoff;
7use sui_types::full_checkpoint_content::CheckpointData;
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13use crate::{
14    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
15    pipeline::Break,
16    task::TrySpawnStreamExt,
17};
18
19use super::IndexedCheckpoint;
20use async_trait::async_trait;
21
22/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24
25/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
26const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
27
28/// Implementors of this trait are responsible for transforming checkpoint into rows for their
29/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
30/// process checkpoint information.
31#[async_trait]
32pub trait Processor: Send + Sync + 'static {
33    /// Used to identify the pipeline in logs and metrics.
34    const NAME: &'static str;
35
36    /// How much concurrency to use when processing checkpoint data.
37    const FANOUT: usize = 10;
38
39    /// The type of value being inserted by the handler.
40    type Value: Send + Sync + 'static;
41
42    /// The processing logic for turning a checkpoint into rows of the table.
43    ///
44    /// All errors returned from this method are treated as transient and will be retried
45    /// indefinitely with exponential backoff.
46    ///
47    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
48    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
49    /// operators that manual intervention is required. Do not return permanent errors as they will
50    /// cause infinite retries and block the pipeline.
51    ///
52    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
53    /// let the framework retry automatically.
54    async fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
55}
56
57/// The processor task is responsible for taking checkpoint data and breaking it down into rows
58/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
59/// distributes them among `H::FANOUT` workers.
60///
61/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
62/// channel.
63///
64/// The task will shutdown if the `cancel` token is cancelled.
65pub(super) fn processor<P: Processor>(
66    processor: Arc<P>,
67    rx: mpsc::Receiver<Arc<CheckpointData>>,
68    tx: mpsc::Sender<IndexedCheckpoint<P>>,
69    metrics: Arc<IndexerMetrics>,
70    cancel: CancellationToken,
71) -> JoinHandle<()> {
72    tokio::spawn(async move {
73        info!(pipeline = P::NAME, "Starting processor");
74        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
75            &metrics.processed_checkpoint_timestamp_lag,
76            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
77            &metrics.latest_processed_checkpoint,
78        );
79
80        match ReceiverStream::new(rx)
81            .try_for_each_spawned(P::FANOUT, |checkpoint| {
82                let tx = tx.clone();
83                let metrics = metrics.clone();
84                let cancel = cancel.clone();
85                let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
86                let processor = processor.clone();
87
88                async move {
89                    if cancel.is_cancelled() {
90                        return Err(Break::Cancel);
91                    }
92
93                    metrics
94                        .total_handler_checkpoints_received
95                        .with_label_values(&[P::NAME])
96                        .inc();
97
98                    let guard = metrics
99                        .handler_checkpoint_latency
100                        .with_label_values(&[P::NAME])
101                        .start_timer();
102
103                    // Retry processing with exponential backoff - treat all errors as transient
104                    let backoff = ExponentialBackoff {
105                        initial_interval: INITIAL_RETRY_INTERVAL,
106                        current_interval: INITIAL_RETRY_INTERVAL,
107                        max_interval: MAX_RETRY_INTERVAL,
108                        max_elapsed_time: None, // Retry indefinitely
109                        ..Default::default()
110                    };
111
112                    let values = backoff::future::retry(backoff, || async {
113                        processor
114                            .process(&checkpoint)
115                            .await
116                            .map_err(backoff::Error::transient)
117                    })
118                    .await?;
119
120                    let elapsed = guard.stop_and_record();
121
122                    let epoch = checkpoint.checkpoint_summary.epoch;
123                    let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
124                    let tx_hi = checkpoint.checkpoint_summary.network_total_transactions;
125                    let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
126
127                    debug!(
128                        pipeline = P::NAME,
129                        checkpoint = cp_sequence_number,
130                        elapsed_ms = elapsed * 1000.0,
131                        "Processed checkpoint",
132                    );
133
134                    checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
135
136                    metrics
137                        .total_handler_checkpoints_processed
138                        .with_label_values(&[P::NAME])
139                        .inc();
140
141                    metrics
142                        .total_handler_rows_created
143                        .with_label_values(&[P::NAME])
144                        .inc_by(values.len() as u64);
145
146                    tx.send(IndexedCheckpoint::new(
147                        epoch,
148                        cp_sequence_number,
149                        tx_hi,
150                        timestamp_ms,
151                        values,
152                    ))
153                    .await
154                    .map_err(|_| Break::Cancel)?;
155
156                    Ok(())
157                }
158            })
159            .await
160        {
161            Ok(()) => {
162                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
163            }
164
165            Err(Break::Cancel) => {
166                info!(pipeline = P::NAME, "Shutdown received, stopping processor");
167            }
168
169            Err(Break::Err(e)) => {
170                error!(pipeline = P::NAME, "Error from handler: {e}");
171                cancel.cancel();
172            }
173        };
174    })
175}
176
177#[cfg(test)]
178mod tests {
179    use crate::metrics::IndexerMetrics;
180    use anyhow::ensure;
181    use std::{
182        sync::{
183            Arc,
184            atomic::{AtomicU32, Ordering},
185        },
186        time::Duration,
187    };
188    use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;
189    use tokio::{sync::mpsc, time::timeout};
190    use tokio_util::sync::CancellationToken;
191
192    use super::*;
193
194    pub struct StoredData {
195        pub value: u64,
196    }
197
198    pub struct DataPipeline;
199
200    #[async_trait]
201    impl Processor for DataPipeline {
202        const NAME: &'static str = "data";
203
204        type Value = StoredData;
205
206        async fn process(
207            &self,
208            checkpoint: &Arc<CheckpointData>,
209        ) -> anyhow::Result<Vec<Self::Value>> {
210            Ok(vec![
211                StoredData {
212                    value: checkpoint.checkpoint_summary.sequence_number * 10 + 1,
213                },
214                StoredData {
215                    value: checkpoint.checkpoint_summary.sequence_number * 10 + 2,
216                },
217            ])
218        }
219    }
220
221    #[tokio::test]
222    async fn test_processor_process_checkpoints() {
223        // Build two checkpoints using the test builder
224        let checkpoint1 = Arc::new(
225            TestCheckpointDataBuilder::new(1)
226                .with_epoch(2)
227                .with_network_total_transactions(5)
228                .with_timestamp_ms(1000000001)
229                .build_checkpoint(),
230        );
231        let checkpoint2 = Arc::new(
232            TestCheckpointDataBuilder::new(2)
233                .with_epoch(2)
234                .with_network_total_transactions(10)
235                .with_timestamp_ms(1000000002)
236                .build_checkpoint(),
237        );
238
239        // Set up the processor, channels, and metrics
240        let processor = Arc::new(DataPipeline);
241        let (data_tx, data_rx) = mpsc::channel(2);
242        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
243        let metrics = IndexerMetrics::new(None, &Default::default());
244        let cancel = CancellationToken::new();
245
246        // Spawn the processor task
247        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
248
249        // Send both checkpoints
250        data_tx.send(checkpoint1.clone()).await.unwrap();
251        data_tx.send(checkpoint2.clone()).await.unwrap();
252
253        // Receive and verify first checkpoint
254        let indexed1 = indexed_rx
255            .recv()
256            .await
257            .expect("Should receive first IndexedCheckpoint");
258        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
259        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
260        assert_eq!(indexed1.watermark.tx_hi, 5);
261        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
262        assert_eq!(indexed1.values.len(), 2);
263        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
264        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
265
266        // Receive and verify second checkpoint
267        let indexed2 = indexed_rx
268            .recv()
269            .await
270            .expect("Should receive second IndexedCheckpoint");
271        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
272        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
273        assert_eq!(indexed2.watermark.tx_hi, 10);
274        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
275        assert_eq!(indexed2.values.len(), 2);
276        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
277        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
278
279        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
280        assert!(
281            timeout_result.is_err(),
282            "Should timeout waiting for more checkpoints"
283        );
284
285        // Clean up
286        drop(data_tx);
287        let _ = handle.await;
288    }
289
290    #[tokio::test]
291    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
292        // Build two checkpoints using the test builder
293        let checkpoint1 = Arc::new(TestCheckpointDataBuilder::new(1).build_checkpoint());
294        let checkpoint2 = Arc::new(TestCheckpointDataBuilder::new(2).build_checkpoint());
295
296        // Set up the processor, channels, and metrics
297        let processor = Arc::new(DataPipeline);
298        let (data_tx, data_rx) = mpsc::channel(2);
299        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
300        let metrics = IndexerMetrics::new(None, &Default::default());
301        let cancel = CancellationToken::new();
302
303        // Spawn the processor task
304        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
305
306        // Send first checkpoint.
307        data_tx.send(checkpoint1.clone()).await.unwrap();
308
309        // Receive and verify first checkpoint
310        let indexed1 = indexed_rx
311            .recv()
312            .await
313            .expect("Should receive first IndexedCheckpoint");
314        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
315
316        // Cancel the processor
317        cancel.cancel();
318
319        // Send second checkpoint after cancellation
320        data_tx.send(checkpoint2.clone()).await.unwrap();
321
322        // Indexed channel is closed, and indexed_rx receives the last None result.
323        let next_result = indexed_rx.recv().await;
324        assert!(
325            next_result.is_none(),
326            "Channel should be closed after cancellation"
327        );
328
329        // Clean up
330        let _ = handle.await;
331    }
332
333    #[tokio::test]
334    async fn test_processor_error_retry_behavior() {
335        struct RetryTestPipeline {
336            attempt_count: Arc<AtomicU32>,
337        }
338
339        #[async_trait]
340        impl Processor for RetryTestPipeline {
341            const NAME: &'static str = "retry_test";
342            type Value = StoredData;
343            async fn process(
344                &self,
345                checkpoint: &Arc<CheckpointData>,
346            ) -> anyhow::Result<Vec<Self::Value>> {
347                if checkpoint.checkpoint_summary.sequence_number == 1 {
348                    Ok(vec![])
349                } else {
350                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
351                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
352                    Ok(vec![])
353                }
354            }
355        }
356
357        // Set up test data
358        let checkpoint1 = Arc::new(TestCheckpointDataBuilder::new(1).build_checkpoint());
359        let checkpoint2 = Arc::new(TestCheckpointDataBuilder::new(2).build_checkpoint());
360
361        let attempt_count = Arc::new(AtomicU32::new(0));
362        let processor = Arc::new(RetryTestPipeline {
363            attempt_count: attempt_count.clone(),
364        });
365
366        let (data_tx, data_rx) = mpsc::channel(2);
367        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
368
369        let metrics = IndexerMetrics::new(None, &Default::default());
370        let cancel = CancellationToken::new();
371
372        // Spawn the processor task
373        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
374
375        // Send and verify first checkpoint (should succeed immediately)
376        data_tx.send(checkpoint1.clone()).await.unwrap();
377        let indexed1 = indexed_rx
378            .recv()
379            .await
380            .expect("Should receive first IndexedCheckpoint");
381        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
382
383        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
384        data_tx.send(checkpoint2.clone()).await.unwrap();
385
386        let indexed2 = indexed_rx
387            .recv()
388            .await
389            .expect("Should receive second IndexedCheckpoint after retries");
390        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
391
392        // Verify that exactly 3 attempts were made (2 failures + 1 success)
393        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
394
395        // Clean up
396        drop(data_tx);
397        let _ = handle.await;
398    }
399
400    // By default, Rust's async tests run on the single-threaded runtime.
401    // We need multi_thread here because our test uses std::thread::sleep which blocks the worker thread.
402    // The multi-threaded runtime allows other worker threads to continue processing while one is blocked.
403    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
404    async fn test_processor_concurrency() {
405        // Create a processor that simulates work by sleeping
406        struct SlowProcessor;
407        #[async_trait]
408        impl Processor for SlowProcessor {
409            const NAME: &'static str = "slow";
410            const FANOUT: usize = 3; // Small fanout for testing
411            type Value = StoredData;
412
413            async fn process(
414                &self,
415                checkpoint: &Arc<CheckpointData>,
416            ) -> anyhow::Result<Vec<Self::Value>> {
417                // Simulate work by sleeping
418                std::thread::sleep(std::time::Duration::from_millis(500));
419                Ok(vec![StoredData {
420                    value: checkpoint.checkpoint_summary.sequence_number,
421                }])
422            }
423        }
424
425        // Set up test data
426        let checkpoints: Vec<_> = (0..5)
427            .map(|i| Arc::new(TestCheckpointDataBuilder::new(i).build_checkpoint()))
428            .collect();
429
430        // Set up channels and metrics
431        let processor = Arc::new(SlowProcessor);
432        let (data_tx, data_rx) = mpsc::channel(10);
433        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
434        let metrics = IndexerMetrics::new(None, &Default::default());
435        let cancel = CancellationToken::new();
436
437        // Spawn processor task
438        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
439
440        // Send all checkpoints and measure time
441        let start = std::time::Instant::now();
442        for checkpoint in checkpoints {
443            data_tx.send(checkpoint).await.unwrap();
444        }
445        drop(data_tx);
446
447        // Receive all results
448        let mut received = Vec::new();
449        while let Some(indexed) = indexed_rx.recv().await {
450            received.push(indexed);
451        }
452
453        // Verify concurrency: total time should be less than sequential processing
454        // With FANOUT=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
455        // Adding small 200ms for some processing overhead.
456        let elapsed = start.elapsed();
457        assert!(elapsed < std::time::Duration::from_millis(1200));
458
459        // Verify results
460        assert_eq!(received.len(), 5);
461
462        // Clean up
463        let _ = handle.await;
464    }
465}