sui_indexer_alt_framework/pipeline/
processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_types::full_checkpoint_content::Checkpoint;
9use tokio::{sync::mpsc, task::JoinHandle};
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info};
13
14use crate::{
15    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
16    pipeline::Break,
17    task::TrySpawnStreamExt,
18};
19
20use super::IndexedCheckpoint;
21use async_trait::async_trait;
22
23/// If the processor needs to retry processing a checkpoint, it will wait this long initially.
24const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
25
26/// If the processor needs to retry processing a checkpoint, it will wait at most this long between retries.
27const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
28
29/// Implementors of this trait are responsible for transforming checkpoint into rows for their
30/// table. The `FANOUT` associated value controls how many concurrent workers will be used to
31/// process checkpoint information.
32#[async_trait]
33pub trait Processor: Send + Sync + 'static {
34    /// Used to identify the pipeline in logs and metrics.
35    const NAME: &'static str;
36
37    /// How much concurrency to use when processing checkpoint data.
38    const FANOUT: usize = 10;
39
40    /// The type of value being inserted by the handler.
41    type Value: Send + Sync + 'static;
42
43    /// The processing logic for turning a checkpoint into rows of the table.
44    ///
45    /// All errors returned from this method are treated as transient and will be retried
46    /// indefinitely with exponential backoff.
47    ///
48    /// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
49    /// format, unsupported protocol version), you should panic! This stops the indexer and alerts
50    /// operators that manual intervention is required. Do not return permanent errors as they will
51    /// cause infinite retries and block the pipeline.
52    ///
53    /// For transient errors (e.g., network issues, rate limiting), simply return the error and
54    /// let the framework retry automatically.
55    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
56}
57
58/// The processor task is responsible for taking checkpoint data and breaking it down into rows
59/// ready to commit. It spins up a supervisor that waits on the `rx` channel for checkpoints, and
60/// distributes them among `H::FANOUT` workers.
61///
62/// Each worker processes a checkpoint into rows and sends them on to the committer using the `tx`
63/// channel.
64///
65/// The task will shutdown if the `cancel` token is cancelled.
66pub(super) fn processor<P: Processor>(
67    processor: Arc<P>,
68    rx: mpsc::Receiver<Arc<Checkpoint>>,
69    tx: mpsc::Sender<IndexedCheckpoint<P>>,
70    metrics: Arc<IndexerMetrics>,
71    cancel: CancellationToken,
72) -> JoinHandle<()> {
73    tokio::spawn(async move {
74        info!(pipeline = P::NAME, "Starting processor");
75        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
76            &metrics.processed_checkpoint_timestamp_lag,
77            &metrics.latest_processed_checkpoint_timestamp_lag_ms,
78            &metrics.latest_processed_checkpoint,
79        );
80
81        match ReceiverStream::new(rx)
82            .try_for_each_spawned(P::FANOUT, |checkpoint| {
83                let tx = tx.clone();
84                let metrics = metrics.clone();
85                let cancel = cancel.clone();
86                let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
87                let processor = processor.clone();
88
89                async move {
90                    if cancel.is_cancelled() {
91                        return Err(Break::Cancel);
92                    }
93
94                    metrics
95                        .total_handler_checkpoints_received
96                        .with_label_values(&[P::NAME])
97                        .inc();
98
99                    let guard = metrics
100                        .handler_checkpoint_latency
101                        .with_label_values(&[P::NAME])
102                        .start_timer();
103
104                    // Retry processing with exponential backoff - treat all errors as transient
105                    let backoff = ExponentialBackoff {
106                        initial_interval: INITIAL_RETRY_INTERVAL,
107                        current_interval: INITIAL_RETRY_INTERVAL,
108                        max_interval: MAX_RETRY_INTERVAL,
109                        max_elapsed_time: None, // Retry indefinitely
110                        ..Default::default()
111                    };
112
113                    let values = backoff::future::retry(backoff, || async {
114                        processor
115                            .process(&checkpoint)
116                            .await
117                            .map_err(backoff::Error::transient)
118                    })
119                    .await?;
120
121                    let elapsed = guard.stop_and_record();
122
123                    let epoch = checkpoint.summary.epoch;
124                    let cp_sequence_number = checkpoint.summary.sequence_number;
125                    let tx_hi = checkpoint.summary.network_total_transactions;
126                    let timestamp_ms = checkpoint.summary.timestamp_ms;
127
128                    debug!(
129                        pipeline = P::NAME,
130                        checkpoint = cp_sequence_number,
131                        elapsed_ms = elapsed * 1000.0,
132                        "Processed checkpoint",
133                    );
134
135                    checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
136
137                    metrics
138                        .total_handler_checkpoints_processed
139                        .with_label_values(&[P::NAME])
140                        .inc();
141
142                    metrics
143                        .total_handler_rows_created
144                        .with_label_values(&[P::NAME])
145                        .inc_by(values.len() as u64);
146
147                    tx.send(IndexedCheckpoint::new(
148                        epoch,
149                        cp_sequence_number,
150                        tx_hi,
151                        timestamp_ms,
152                        values,
153                    ))
154                    .await
155                    .map_err(|_| Break::Cancel)?;
156
157                    Ok(())
158                }
159            })
160            .await
161        {
162            Ok(()) => {
163                info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
164            }
165
166            Err(Break::Cancel) => {
167                info!(pipeline = P::NAME, "Shutdown received, stopping processor");
168            }
169
170            Err(Break::Err(e)) => {
171                error!(pipeline = P::NAME, "Error from handler: {e}");
172                cancel.cancel();
173            }
174        };
175    })
176}
177
178#[cfg(test)]
179mod tests {
180    use crate::metrics::IndexerMetrics;
181    use anyhow::ensure;
182    use std::{
183        sync::{
184            Arc,
185            atomic::{AtomicU32, Ordering},
186        },
187        time::Duration,
188    };
189    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
190    use tokio::{sync::mpsc, time::timeout};
191    use tokio_util::sync::CancellationToken;
192
193    use super::*;
194
195    pub struct StoredData {
196        pub value: u64,
197    }
198
199    pub struct DataPipeline;
200
201    #[async_trait]
202    impl Processor for DataPipeline {
203        const NAME: &'static str = "data";
204
205        type Value = StoredData;
206
207        async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
208            Ok(vec![
209                StoredData {
210                    value: checkpoint.summary.sequence_number * 10 + 1,
211                },
212                StoredData {
213                    value: checkpoint.summary.sequence_number * 10 + 2,
214                },
215            ])
216        }
217    }
218
219    #[tokio::test]
220    async fn test_processor_process_checkpoints() {
221        // Build two checkpoints using the test builder
222        let checkpoint1: Arc<Checkpoint> = Arc::new(
223            TestCheckpointBuilder::new(1)
224                .with_epoch(2)
225                .with_network_total_transactions(5)
226                .with_timestamp_ms(1000000001)
227                .build_checkpoint(),
228        );
229        let checkpoint2: Arc<Checkpoint> = Arc::new(
230            TestCheckpointBuilder::new(2)
231                .with_epoch(2)
232                .with_network_total_transactions(10)
233                .with_timestamp_ms(1000000002)
234                .build_checkpoint(),
235        );
236
237        // Set up the processor, channels, and metrics
238        let processor = Arc::new(DataPipeline);
239        let (data_tx, data_rx) = mpsc::channel(2);
240        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
241        let metrics = IndexerMetrics::new(None, &Default::default());
242        let cancel = CancellationToken::new();
243
244        // Spawn the processor task
245        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
246
247        // Send both checkpoints
248        data_tx.send(checkpoint1.clone()).await.unwrap();
249        data_tx.send(checkpoint2.clone()).await.unwrap();
250
251        // Receive and verify first checkpoint
252        let indexed1 = indexed_rx
253            .recv()
254            .await
255            .expect("Should receive first IndexedCheckpoint");
256        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
257        assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
258        assert_eq!(indexed1.watermark.tx_hi, 5);
259        assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
260        assert_eq!(indexed1.values.len(), 2);
261        assert_eq!(indexed1.values[0].value, 11); // 1 * 10 + 1
262        assert_eq!(indexed1.values[1].value, 12); // 1 * 10 + 2
263
264        // Receive and verify second checkpoint
265        let indexed2 = indexed_rx
266            .recv()
267            .await
268            .expect("Should receive second IndexedCheckpoint");
269        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
270        assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
271        assert_eq!(indexed2.watermark.tx_hi, 10);
272        assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
273        assert_eq!(indexed2.values.len(), 2);
274        assert_eq!(indexed2.values[0].value, 21); // 2 * 10 + 1
275        assert_eq!(indexed2.values[1].value, 22); // 2 * 10 + 2
276
277        let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
278        assert!(
279            timeout_result.is_err(),
280            "Should timeout waiting for more checkpoints"
281        );
282
283        // Clean up
284        drop(data_tx);
285        let _ = handle.await;
286    }
287
288    #[tokio::test]
289    async fn test_processor_does_not_process_checkpoint_after_cancellation() {
290        // Build two checkpoints using the test builder
291        let checkpoint1: Arc<Checkpoint> =
292            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
293        let checkpoint2: Arc<Checkpoint> =
294            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
295
296        // Set up the processor, channels, and metrics
297        let processor = Arc::new(DataPipeline);
298        let (data_tx, data_rx) = mpsc::channel(2);
299        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
300        let metrics = IndexerMetrics::new(None, &Default::default());
301        let cancel = CancellationToken::new();
302
303        // Spawn the processor task
304        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
305
306        // Send first checkpoint.
307        data_tx.send(checkpoint1.clone()).await.unwrap();
308
309        // Receive and verify first checkpoint
310        let indexed1 = indexed_rx
311            .recv()
312            .await
313            .expect("Should receive first IndexedCheckpoint");
314        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
315
316        // Cancel the processor
317        cancel.cancel();
318
319        // Send second checkpoint after cancellation
320        data_tx.send(checkpoint2.clone()).await.unwrap();
321
322        // Indexed channel is closed, and indexed_rx receives the last None result.
323        let next_result = indexed_rx.recv().await;
324        assert!(
325            next_result.is_none(),
326            "Channel should be closed after cancellation"
327        );
328
329        // Clean up
330        let _ = handle.await;
331    }
332
333    #[tokio::test]
334    async fn test_processor_error_retry_behavior() {
335        struct RetryTestPipeline {
336            attempt_count: Arc<AtomicU32>,
337        }
338
339        #[async_trait]
340        impl Processor for RetryTestPipeline {
341            const NAME: &'static str = "retry_test";
342            type Value = StoredData;
343            async fn process(
344                &self,
345                checkpoint: &Arc<Checkpoint>,
346            ) -> anyhow::Result<Vec<Self::Value>> {
347                if checkpoint.summary.sequence_number == 1 {
348                    Ok(vec![])
349                } else {
350                    let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
351                    ensure!(attempt > 2, "Transient error - attempt {attempt}");
352                    Ok(vec![])
353                }
354            }
355        }
356
357        // Set up test data
358        let checkpoint1: Arc<Checkpoint> =
359            Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
360        let checkpoint2: Arc<Checkpoint> =
361            Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
362
363        let attempt_count = Arc::new(AtomicU32::new(0));
364        let processor = Arc::new(RetryTestPipeline {
365            attempt_count: attempt_count.clone(),
366        });
367
368        let (data_tx, data_rx) = mpsc::channel(2);
369        let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
370
371        let metrics = IndexerMetrics::new(None, &Default::default());
372        let cancel = CancellationToken::new();
373
374        // Spawn the processor task
375        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
376
377        // Send and verify first checkpoint (should succeed immediately)
378        data_tx.send(checkpoint1.clone()).await.unwrap();
379        let indexed1 = indexed_rx
380            .recv()
381            .await
382            .expect("Should receive first IndexedCheckpoint");
383        assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
384
385        // Send second checkpoint (should fail twice, then succeed on 3rd attempt)
386        data_tx.send(checkpoint2.clone()).await.unwrap();
387
388        let indexed2 = indexed_rx
389            .recv()
390            .await
391            .expect("Should receive second IndexedCheckpoint after retries");
392        assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
393
394        // Verify that exactly 3 attempts were made (2 failures + 1 success)
395        assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
396
397        // Clean up
398        drop(data_tx);
399        let _ = handle.await;
400    }
401
402    // By default, Rust's async tests run on the single-threaded runtime.
403    // We need multi_thread here because our test uses std::thread::sleep which blocks the worker thread.
404    // The multi-threaded runtime allows other worker threads to continue processing while one is blocked.
405    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
406    async fn test_processor_concurrency() {
407        // Create a processor that simulates work by sleeping
408        struct SlowProcessor;
409        #[async_trait]
410        impl Processor for SlowProcessor {
411            const NAME: &'static str = "slow";
412            const FANOUT: usize = 3; // Small fanout for testing
413            type Value = StoredData;
414
415            async fn process(
416                &self,
417                checkpoint: &Arc<Checkpoint>,
418            ) -> anyhow::Result<Vec<Self::Value>> {
419                // Simulate work by sleeping
420                std::thread::sleep(std::time::Duration::from_millis(500));
421                Ok(vec![StoredData {
422                    value: checkpoint.summary.sequence_number,
423                }])
424            }
425        }
426
427        // Set up test data
428        let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
429            .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
430            .collect();
431
432        // Set up channels and metrics
433        let processor = Arc::new(SlowProcessor);
434        let (data_tx, data_rx) = mpsc::channel(10);
435        let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
436        let metrics = IndexerMetrics::new(None, &Default::default());
437        let cancel = CancellationToken::new();
438
439        // Spawn processor task
440        let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
441
442        // Send all checkpoints and measure time
443        let start = std::time::Instant::now();
444        for checkpoint in checkpoints {
445            data_tx.send(checkpoint).await.unwrap();
446        }
447        drop(data_tx);
448
449        // Receive all results
450        let mut received = Vec::new();
451        while let Some(indexed) = indexed_rx.recv().await {
452            received.push(indexed);
453        }
454
455        // Verify concurrency: total time should be less than sequential processing
456        // With FANOUT=3, 5 checkpoints should take ~1000ms (500ms * 2 (batches)) instead of 2500ms (500ms * 5).
457        // Adding small 200ms for some processing overhead.
458        let elapsed = start.elapsed();
459        assert!(elapsed < std::time::Duration::from_millis(1200));
460
461        // Verify results
462        assert_eq!(received.len(), 5);
463
464        // Clean up
465        let _ = handle.await;
466    }
467}