sui_indexer_alt_framework/pipeline/
processor.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_types::full_checkpoint_content::Checkpoint;
9use tokio::{sync::mpsc, task::JoinHandle};
10use tokio_stream::wrappers::ReceiverStream;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info};
13
14use crate::{
15 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
16 pipeline::Break,
17 task::TrySpawnStreamExt,
18};
19
20use super::IndexedCheckpoint;
21use async_trait::async_trait;
22
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
25
26const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
28
29#[async_trait]
33pub trait Processor: Send + Sync + 'static {
34 const NAME: &'static str;
36
37 const FANOUT: usize = 10;
39
40 type Value: Send + Sync + 'static;
42
43 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
56}
57
58pub(super) fn processor<P: Processor>(
67 processor: Arc<P>,
68 rx: mpsc::Receiver<Arc<Checkpoint>>,
69 tx: mpsc::Sender<IndexedCheckpoint<P>>,
70 metrics: Arc<IndexerMetrics>,
71 cancel: CancellationToken,
72) -> JoinHandle<()> {
73 tokio::spawn(async move {
74 info!(pipeline = P::NAME, "Starting processor");
75 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
76 &metrics.processed_checkpoint_timestamp_lag,
77 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
78 &metrics.latest_processed_checkpoint,
79 );
80
81 match ReceiverStream::new(rx)
82 .try_for_each_spawned(P::FANOUT, |checkpoint| {
83 let tx = tx.clone();
84 let metrics = metrics.clone();
85 let cancel = cancel.clone();
86 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
87 let processor = processor.clone();
88
89 async move {
90 if cancel.is_cancelled() {
91 return Err(Break::Cancel);
92 }
93
94 metrics
95 .total_handler_checkpoints_received
96 .with_label_values(&[P::NAME])
97 .inc();
98
99 let guard = metrics
100 .handler_checkpoint_latency
101 .with_label_values(&[P::NAME])
102 .start_timer();
103
104 let backoff = ExponentialBackoff {
106 initial_interval: INITIAL_RETRY_INTERVAL,
107 current_interval: INITIAL_RETRY_INTERVAL,
108 max_interval: MAX_RETRY_INTERVAL,
109 max_elapsed_time: None, ..Default::default()
111 };
112
113 let values = backoff::future::retry(backoff, || async {
114 processor
115 .process(&checkpoint)
116 .await
117 .map_err(backoff::Error::transient)
118 })
119 .await?;
120
121 let elapsed = guard.stop_and_record();
122
123 let epoch = checkpoint.summary.epoch;
124 let cp_sequence_number = checkpoint.summary.sequence_number;
125 let tx_hi = checkpoint.summary.network_total_transactions;
126 let timestamp_ms = checkpoint.summary.timestamp_ms;
127
128 debug!(
129 pipeline = P::NAME,
130 checkpoint = cp_sequence_number,
131 elapsed_ms = elapsed * 1000.0,
132 "Processed checkpoint",
133 );
134
135 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
136
137 metrics
138 .total_handler_checkpoints_processed
139 .with_label_values(&[P::NAME])
140 .inc();
141
142 metrics
143 .total_handler_rows_created
144 .with_label_values(&[P::NAME])
145 .inc_by(values.len() as u64);
146
147 tx.send(IndexedCheckpoint::new(
148 epoch,
149 cp_sequence_number,
150 tx_hi,
151 timestamp_ms,
152 values,
153 ))
154 .await
155 .map_err(|_| Break::Cancel)?;
156
157 Ok(())
158 }
159 })
160 .await
161 {
162 Ok(()) => {
163 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
164 }
165
166 Err(Break::Cancel) => {
167 info!(pipeline = P::NAME, "Shutdown received, stopping processor");
168 }
169
170 Err(Break::Err(e)) => {
171 error!(pipeline = P::NAME, "Error from handler: {e}");
172 cancel.cancel();
173 }
174 };
175 })
176}
177
178#[cfg(test)]
179mod tests {
180 use crate::metrics::IndexerMetrics;
181 use anyhow::ensure;
182 use std::{
183 sync::{
184 Arc,
185 atomic::{AtomicU32, Ordering},
186 },
187 time::Duration,
188 };
189 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
190 use tokio::{sync::mpsc, time::timeout};
191 use tokio_util::sync::CancellationToken;
192
193 use super::*;
194
195 pub struct StoredData {
196 pub value: u64,
197 }
198
199 pub struct DataPipeline;
200
201 #[async_trait]
202 impl Processor for DataPipeline {
203 const NAME: &'static str = "data";
204
205 type Value = StoredData;
206
207 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
208 Ok(vec![
209 StoredData {
210 value: checkpoint.summary.sequence_number * 10 + 1,
211 },
212 StoredData {
213 value: checkpoint.summary.sequence_number * 10 + 2,
214 },
215 ])
216 }
217 }
218
219 #[tokio::test]
220 async fn test_processor_process_checkpoints() {
221 let checkpoint1: Arc<Checkpoint> = Arc::new(
223 TestCheckpointBuilder::new(1)
224 .with_epoch(2)
225 .with_network_total_transactions(5)
226 .with_timestamp_ms(1000000001)
227 .build_checkpoint(),
228 );
229 let checkpoint2: Arc<Checkpoint> = Arc::new(
230 TestCheckpointBuilder::new(2)
231 .with_epoch(2)
232 .with_network_total_transactions(10)
233 .with_timestamp_ms(1000000002)
234 .build_checkpoint(),
235 );
236
237 let processor = Arc::new(DataPipeline);
239 let (data_tx, data_rx) = mpsc::channel(2);
240 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
241 let metrics = IndexerMetrics::new(None, &Default::default());
242 let cancel = CancellationToken::new();
243
244 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
246
247 data_tx.send(checkpoint1.clone()).await.unwrap();
249 data_tx.send(checkpoint2.clone()).await.unwrap();
250
251 let indexed1 = indexed_rx
253 .recv()
254 .await
255 .expect("Should receive first IndexedCheckpoint");
256 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
257 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
258 assert_eq!(indexed1.watermark.tx_hi, 5);
259 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
260 assert_eq!(indexed1.values.len(), 2);
261 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
266 .recv()
267 .await
268 .expect("Should receive second IndexedCheckpoint");
269 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
270 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
271 assert_eq!(indexed2.watermark.tx_hi, 10);
272 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
273 assert_eq!(indexed2.values.len(), 2);
274 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
278 assert!(
279 timeout_result.is_err(),
280 "Should timeout waiting for more checkpoints"
281 );
282
283 drop(data_tx);
285 let _ = handle.await;
286 }
287
288 #[tokio::test]
289 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
290 let checkpoint1: Arc<Checkpoint> =
292 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
293 let checkpoint2: Arc<Checkpoint> =
294 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
295
296 let processor = Arc::new(DataPipeline);
298 let (data_tx, data_rx) = mpsc::channel(2);
299 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
300 let metrics = IndexerMetrics::new(None, &Default::default());
301 let cancel = CancellationToken::new();
302
303 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
305
306 data_tx.send(checkpoint1.clone()).await.unwrap();
308
309 let indexed1 = indexed_rx
311 .recv()
312 .await
313 .expect("Should receive first IndexedCheckpoint");
314 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
315
316 cancel.cancel();
318
319 data_tx.send(checkpoint2.clone()).await.unwrap();
321
322 let next_result = indexed_rx.recv().await;
324 assert!(
325 next_result.is_none(),
326 "Channel should be closed after cancellation"
327 );
328
329 let _ = handle.await;
331 }
332
333 #[tokio::test]
334 async fn test_processor_error_retry_behavior() {
335 struct RetryTestPipeline {
336 attempt_count: Arc<AtomicU32>,
337 }
338
339 #[async_trait]
340 impl Processor for RetryTestPipeline {
341 const NAME: &'static str = "retry_test";
342 type Value = StoredData;
343 async fn process(
344 &self,
345 checkpoint: &Arc<Checkpoint>,
346 ) -> anyhow::Result<Vec<Self::Value>> {
347 if checkpoint.summary.sequence_number == 1 {
348 Ok(vec![])
349 } else {
350 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
351 ensure!(attempt > 2, "Transient error - attempt {attempt}");
352 Ok(vec![])
353 }
354 }
355 }
356
357 let checkpoint1: Arc<Checkpoint> =
359 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
360 let checkpoint2: Arc<Checkpoint> =
361 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
362
363 let attempt_count = Arc::new(AtomicU32::new(0));
364 let processor = Arc::new(RetryTestPipeline {
365 attempt_count: attempt_count.clone(),
366 });
367
368 let (data_tx, data_rx) = mpsc::channel(2);
369 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
370
371 let metrics = IndexerMetrics::new(None, &Default::default());
372 let cancel = CancellationToken::new();
373
374 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
376
377 data_tx.send(checkpoint1.clone()).await.unwrap();
379 let indexed1 = indexed_rx
380 .recv()
381 .await
382 .expect("Should receive first IndexedCheckpoint");
383 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
384
385 data_tx.send(checkpoint2.clone()).await.unwrap();
387
388 let indexed2 = indexed_rx
389 .recv()
390 .await
391 .expect("Should receive second IndexedCheckpoint after retries");
392 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
393
394 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
396
397 drop(data_tx);
399 let _ = handle.await;
400 }
401
402 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
406 async fn test_processor_concurrency() {
407 struct SlowProcessor;
409 #[async_trait]
410 impl Processor for SlowProcessor {
411 const NAME: &'static str = "slow";
412 const FANOUT: usize = 3; type Value = StoredData;
414
415 async fn process(
416 &self,
417 checkpoint: &Arc<Checkpoint>,
418 ) -> anyhow::Result<Vec<Self::Value>> {
419 std::thread::sleep(std::time::Duration::from_millis(500));
421 Ok(vec![StoredData {
422 value: checkpoint.summary.sequence_number,
423 }])
424 }
425 }
426
427 let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
429 .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
430 .collect();
431
432 let processor = Arc::new(SlowProcessor);
434 let (data_tx, data_rx) = mpsc::channel(10);
435 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
436 let metrics = IndexerMetrics::new(None, &Default::default());
437 let cancel = CancellationToken::new();
438
439 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
441
442 let start = std::time::Instant::now();
444 for checkpoint in checkpoints {
445 data_tx.send(checkpoint).await.unwrap();
446 }
447 drop(data_tx);
448
449 let mut received = Vec::new();
451 while let Some(indexed) = indexed_rx.recv().await {
452 received.push(indexed);
453 }
454
455 let elapsed = start.elapsed();
459 assert!(elapsed < std::time::Duration::from_millis(1200));
460
461 assert_eq!(received.len(), 5);
463
464 let _ = handle.await;
466 }
467}