sui_indexer_alt_framework/pipeline/
processor.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use sui_types::full_checkpoint_content::Checkpoint;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tracing::debug;
15use tracing::error;
16use tracing::info;
17
18use async_trait::async_trait;
19
20use crate::config::ConcurrencyConfig;
21use crate::metrics::CheckpointLagMetricReporter;
22use crate::metrics::IndexerMetrics;
23use crate::pipeline::IndexedCheckpoint;
24
25const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
27
28const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
30
31#[async_trait]
34pub trait Processor: Send + Sync + 'static {
35 const NAME: &'static str;
37
38 type Value: Send + Sync + 'static;
40
41 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
54}
55
56pub(super) fn processor<P: Processor>(
63 processor: Arc<P>,
64 rx: mpsc::Receiver<Arc<Checkpoint>>,
65 tx: mpsc::Sender<IndexedCheckpoint<P>>,
66 metrics: Arc<IndexerMetrics>,
67 concurrency: ConcurrencyConfig,
68) -> Service {
69 Service::new().spawn_aborting(async move {
70 info!(pipeline = P::NAME, "Starting processor");
71 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
72 &metrics.processed_checkpoint_timestamp_lag,
73 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
74 &metrics.latest_processed_checkpoint,
75 );
76
77 let report_metrics = metrics.clone();
78 match ReceiverStream::new(rx)
79 .try_for_each_send_spawned(
80 concurrency.into(),
81 |checkpoint| {
82 let metrics = metrics.clone();
83 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
84 let processor = processor.clone();
85
86 async move {
87 metrics
88 .total_handler_checkpoints_received
89 .with_label_values(&[P::NAME])
90 .inc();
91
92 let guard = metrics
93 .handler_checkpoint_latency
94 .with_label_values(&[P::NAME])
95 .start_timer();
96
97 let backoff = ExponentialBackoff {
99 initial_interval: INITIAL_RETRY_INTERVAL,
100 current_interval: INITIAL_RETRY_INTERVAL,
101 max_interval: MAX_RETRY_INTERVAL,
102 max_elapsed_time: None,
103 ..Default::default()
104 };
105
106 let values = backoff::future::retry(backoff, || async {
107 processor
108 .process(&checkpoint)
109 .await
110 .map_err(backoff::Error::transient)
111 })
112 .await?;
113
114 let elapsed = guard.stop_and_record();
115
116 let epoch = checkpoint.summary.epoch;
117 let cp_sequence_number = checkpoint.summary.sequence_number;
118 let tx_hi = checkpoint.summary.network_total_transactions;
119 let timestamp_ms = checkpoint.summary.timestamp_ms;
120
121 debug!(
122 pipeline = P::NAME,
123 checkpoint = cp_sequence_number,
124 elapsed_ms = elapsed * 1000.0,
125 "Processed checkpoint",
126 );
127
128 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
129
130 metrics
131 .total_handler_checkpoints_processed
132 .with_label_values(&[P::NAME])
133 .inc();
134
135 metrics
136 .total_handler_rows_created
137 .with_label_values(&[P::NAME])
138 .inc_by(values.len() as u64);
139
140 Ok(IndexedCheckpoint::new(
141 epoch,
142 cp_sequence_number,
143 tx_hi,
144 timestamp_ms,
145 values,
146 ))
147 }
148 },
149 tx,
150 move |stats| {
151 report_metrics
152 .processor_concurrency_limit
153 .with_label_values(&[P::NAME])
154 .set(stats.limit as i64);
155 report_metrics
156 .processor_concurrency_inflight
157 .with_label_values(&[P::NAME])
158 .set(stats.inflight as i64);
159 },
160 )
161 .await
162 {
163 Ok(()) => {
164 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
165 }
166
167 Err(Break::Break) => {
168 info!(pipeline = P::NAME, "Channel closed, stopping processor");
169 }
170
171 Err(Break::Err(e)) => {
172 error!(pipeline = P::NAME, "Error from handler: {e}");
173 return Err(e.context(format!("Error from processor {}", P::NAME)));
174 }
175 };
176
177 Ok(())
178 })
179}
180
181#[cfg(test)]
182mod tests {
183 use std::sync::Arc;
184 use std::sync::atomic::AtomicU32;
185 use std::sync::atomic::Ordering;
186 use std::time::Duration;
187
188 use anyhow::ensure;
189 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
190 use tokio::sync::mpsc;
191 use tokio::time::timeout;
192
193 use crate::metrics::IndexerMetrics;
194
195 use super::*;
196
197 pub struct StoredData {
198 pub value: u64,
199 }
200
201 pub struct DataPipeline;
202
203 #[async_trait]
204 impl Processor for DataPipeline {
205 const NAME: &'static str = "data";
206
207 type Value = StoredData;
208
209 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
210 Ok(vec![
211 StoredData {
212 value: checkpoint.summary.sequence_number * 10 + 1,
213 },
214 StoredData {
215 value: checkpoint.summary.sequence_number * 10 + 2,
216 },
217 ])
218 }
219 }
220
221 #[tokio::test]
222 async fn test_processor_process_checkpoints() {
223 let checkpoint1: Arc<Checkpoint> = Arc::new(
225 TestCheckpointBuilder::new(1)
226 .with_epoch(2)
227 .with_network_total_transactions(5)
228 .with_timestamp_ms(1000000001)
229 .build_checkpoint(),
230 );
231 let checkpoint2: Arc<Checkpoint> = Arc::new(
232 TestCheckpointBuilder::new(2)
233 .with_epoch(2)
234 .with_network_total_transactions(10)
235 .with_timestamp_ms(1000000002)
236 .build_checkpoint(),
237 );
238
239 let processor = Arc::new(DataPipeline);
241 let (data_tx, data_rx) = mpsc::channel(2);
242 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
243 let metrics = IndexerMetrics::new(None, &Default::default());
244
245 let _svc = super::processor(
247 processor,
248 data_rx,
249 indexed_tx,
250 metrics,
251 ConcurrencyConfig::Fixed { value: 10 },
252 );
253
254 data_tx.send(checkpoint1.clone()).await.unwrap();
256 data_tx.send(checkpoint2.clone()).await.unwrap();
257
258 let indexed1 = indexed_rx
260 .recv()
261 .await
262 .expect("Should receive first IndexedCheckpoint");
263 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
264 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
265 assert_eq!(indexed1.watermark.tx_hi, 5);
266 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
267 assert_eq!(indexed1.values.len(), 2);
268 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
273 .recv()
274 .await
275 .expect("Should receive second IndexedCheckpoint");
276 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
277 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
278 assert_eq!(indexed2.watermark.tx_hi, 10);
279 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
280 assert_eq!(indexed2.values.len(), 2);
281 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
285 assert!(
286 timeout_result.is_err(),
287 "Should timeout waiting for more checkpoints"
288 );
289 }
290
291 #[tokio::test]
292 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
293 let checkpoint1: Arc<Checkpoint> =
295 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
296 let checkpoint2: Arc<Checkpoint> =
297 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
298
299 let processor = Arc::new(DataPipeline);
301 let (data_tx, data_rx) = mpsc::channel(2);
302 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
303 let metrics = IndexerMetrics::new(None, &Default::default());
304
305 let svc = super::processor(
307 processor,
308 data_rx,
309 indexed_tx,
310 metrics,
311 ConcurrencyConfig::Fixed { value: 10 },
312 );
313
314 data_tx.send(checkpoint1.clone()).await.unwrap();
316
317 let indexed1 = indexed_rx
319 .recv()
320 .await
321 .expect("Should receive first IndexedCheckpoint");
322 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
323
324 svc.shutdown().await.unwrap();
326
327 data_tx.send(checkpoint2.clone()).await.unwrap_err();
330
331 let next_result = indexed_rx.recv().await;
333 assert!(
334 next_result.is_none(),
335 "Channel should be closed after shutdown"
336 );
337 }
338
339 #[tokio::test]
340 async fn test_processor_error_retry_behavior() {
341 struct RetryTestPipeline {
342 attempt_count: Arc<AtomicU32>,
343 }
344
345 #[async_trait]
346 impl Processor for RetryTestPipeline {
347 const NAME: &'static str = "retry_test";
348 type Value = StoredData;
349 async fn process(
350 &self,
351 checkpoint: &Arc<Checkpoint>,
352 ) -> anyhow::Result<Vec<Self::Value>> {
353 if checkpoint.summary.sequence_number == 1 {
354 Ok(vec![])
355 } else {
356 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
357 ensure!(attempt > 2, "Transient error - attempt {attempt}");
358 Ok(vec![])
359 }
360 }
361 }
362
363 let checkpoint1: Arc<Checkpoint> =
365 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
366 let checkpoint2: Arc<Checkpoint> =
367 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
368
369 let attempt_count = Arc::new(AtomicU32::new(0));
370 let processor = Arc::new(RetryTestPipeline {
371 attempt_count: attempt_count.clone(),
372 });
373
374 let (data_tx, data_rx) = mpsc::channel(2);
375 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
376
377 let metrics = IndexerMetrics::new(None, &Default::default());
378
379 let _svc = super::processor(
381 processor,
382 data_rx,
383 indexed_tx,
384 metrics,
385 ConcurrencyConfig::Fixed { value: 10 },
386 );
387
388 data_tx.send(checkpoint1.clone()).await.unwrap();
390 let indexed1 = indexed_rx
391 .recv()
392 .await
393 .expect("Should receive first IndexedCheckpoint");
394 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
395
396 data_tx.send(checkpoint2.clone()).await.unwrap();
398
399 let indexed2 = indexed_rx
400 .recv()
401 .await
402 .expect("Should receive second IndexedCheckpoint after retries");
403 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
404
405 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
407 }
408
409 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
413 async fn test_processor_concurrency() {
414 struct SlowProcessor;
416 #[async_trait]
417 impl Processor for SlowProcessor {
418 const NAME: &'static str = "slow";
419 type Value = StoredData;
420
421 async fn process(
422 &self,
423 checkpoint: &Arc<Checkpoint>,
424 ) -> anyhow::Result<Vec<Self::Value>> {
425 std::thread::sleep(std::time::Duration::from_millis(500));
427 Ok(vec![StoredData {
428 value: checkpoint.summary.sequence_number,
429 }])
430 }
431 }
432
433 let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
435 .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
436 .collect();
437
438 let processor = Arc::new(SlowProcessor);
440 let (data_tx, data_rx) = mpsc::channel(10);
441 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
442 let metrics = IndexerMetrics::new(None, &Default::default());
443
444 let _svc = super::processor(
446 processor,
447 data_rx,
448 indexed_tx,
449 metrics,
450 ConcurrencyConfig::Fixed { value: 3 },
451 );
452
453 let start = std::time::Instant::now();
455 for checkpoint in checkpoints {
456 data_tx.send(checkpoint).await.unwrap();
457 }
458 drop(data_tx);
459
460 let mut received = Vec::new();
462 while let Some(indexed) = indexed_rx.recv().await {
463 received.push(indexed);
464 }
465
466 let elapsed = start.elapsed();
470 assert!(elapsed < std::time::Duration::from_millis(1200));
471
472 assert_eq!(received.len(), 5);
474 }
475}