sui_indexer_alt_framework/pipeline/
processor.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use sui_types::full_checkpoint_content::Checkpoint;
12use tokio::sync::mpsc;
13use tokio_stream::wrappers::ReceiverStream;
14use tracing::debug;
15use tracing::error;
16use tracing::info;
17
18use async_trait::async_trait;
19
20use crate::metrics::CheckpointLagMetricReporter;
21use crate::metrics::IndexerMetrics;
22use crate::pipeline::IndexedCheckpoint;
23
24const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
26
27const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
29
30#[async_trait]
34pub trait Processor: Send + Sync + 'static {
35 const NAME: &'static str;
37
38 const FANOUT: usize = 10;
40
41 type Value: Send + Sync + 'static;
43
44 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
57}
58
59pub(super) fn processor<P: Processor>(
66 processor: Arc<P>,
67 rx: mpsc::Receiver<Arc<Checkpoint>>,
68 tx: mpsc::Sender<IndexedCheckpoint<P>>,
69 metrics: Arc<IndexerMetrics>,
70) -> Service {
71 Service::new().spawn_aborting(async move {
72 info!(pipeline = P::NAME, "Starting processor");
73 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
74 &metrics.processed_checkpoint_timestamp_lag,
75 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
76 &metrics.latest_processed_checkpoint,
77 );
78
79 match ReceiverStream::new(rx)
80 .try_for_each_spawned(P::FANOUT, |checkpoint| {
81 let tx = tx.clone();
82 let metrics = metrics.clone();
83 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
84 let processor = processor.clone();
85
86 async move {
87 metrics
88 .total_handler_checkpoints_received
89 .with_label_values(&[P::NAME])
90 .inc();
91
92 let guard = metrics
93 .handler_checkpoint_latency
94 .with_label_values(&[P::NAME])
95 .start_timer();
96
97 let backoff = ExponentialBackoff {
99 initial_interval: INITIAL_RETRY_INTERVAL,
100 current_interval: INITIAL_RETRY_INTERVAL,
101 max_interval: MAX_RETRY_INTERVAL,
102 max_elapsed_time: None, ..Default::default()
104 };
105
106 let values = backoff::future::retry(backoff, || async {
107 processor
108 .process(&checkpoint)
109 .await
110 .map_err(backoff::Error::transient)
111 })
112 .await?;
113
114 let elapsed = guard.stop_and_record();
115
116 let epoch = checkpoint.summary.epoch;
117 let cp_sequence_number = checkpoint.summary.sequence_number;
118 let tx_hi = checkpoint.summary.network_total_transactions;
119 let timestamp_ms = checkpoint.summary.timestamp_ms;
120
121 debug!(
122 pipeline = P::NAME,
123 checkpoint = cp_sequence_number,
124 elapsed_ms = elapsed * 1000.0,
125 "Processed checkpoint",
126 );
127
128 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
129
130 metrics
131 .total_handler_checkpoints_processed
132 .with_label_values(&[P::NAME])
133 .inc();
134
135 metrics
136 .total_handler_rows_created
137 .with_label_values(&[P::NAME])
138 .inc_by(values.len() as u64);
139
140 tx.send(IndexedCheckpoint::new(
141 epoch,
142 cp_sequence_number,
143 tx_hi,
144 timestamp_ms,
145 values,
146 ))
147 .await
148 .map_err(|_| Break::Break)?;
149
150 Ok(())
151 }
152 })
153 .await
154 {
155 Ok(()) => {
156 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
157 }
158
159 Err(Break::Break) => {
160 info!(pipeline = P::NAME, "Channel closed, stopping processor");
161 }
162
163 Err(Break::Err(e)) => {
164 error!(pipeline = P::NAME, "Error from handler: {e}");
165 return Err(e.context(format!("Error from processor {}", P::NAME)));
166 }
167 };
168
169 Ok(())
170 })
171}
172
173#[cfg(test)]
174mod tests {
175 use std::sync::Arc;
176 use std::sync::atomic::AtomicU32;
177 use std::sync::atomic::Ordering;
178 use std::time::Duration;
179
180 use anyhow::ensure;
181 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
182 use tokio::sync::mpsc;
183 use tokio::time::timeout;
184
185 use crate::metrics::IndexerMetrics;
186
187 use super::*;
188
189 pub struct StoredData {
190 pub value: u64,
191 }
192
193 pub struct DataPipeline;
194
195 #[async_trait]
196 impl Processor for DataPipeline {
197 const NAME: &'static str = "data";
198
199 type Value = StoredData;
200
201 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
202 Ok(vec![
203 StoredData {
204 value: checkpoint.summary.sequence_number * 10 + 1,
205 },
206 StoredData {
207 value: checkpoint.summary.sequence_number * 10 + 2,
208 },
209 ])
210 }
211 }
212
213 #[tokio::test]
214 async fn test_processor_process_checkpoints() {
215 let checkpoint1: Arc<Checkpoint> = Arc::new(
217 TestCheckpointBuilder::new(1)
218 .with_epoch(2)
219 .with_network_total_transactions(5)
220 .with_timestamp_ms(1000000001)
221 .build_checkpoint(),
222 );
223 let checkpoint2: Arc<Checkpoint> = Arc::new(
224 TestCheckpointBuilder::new(2)
225 .with_epoch(2)
226 .with_network_total_transactions(10)
227 .with_timestamp_ms(1000000002)
228 .build_checkpoint(),
229 );
230
231 let processor = Arc::new(DataPipeline);
233 let (data_tx, data_rx) = mpsc::channel(2);
234 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
235 let metrics = IndexerMetrics::new(None, &Default::default());
236
237 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
239
240 data_tx.send(checkpoint1.clone()).await.unwrap();
242 data_tx.send(checkpoint2.clone()).await.unwrap();
243
244 let indexed1 = indexed_rx
246 .recv()
247 .await
248 .expect("Should receive first IndexedCheckpoint");
249 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
250 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
251 assert_eq!(indexed1.watermark.tx_hi, 5);
252 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
253 assert_eq!(indexed1.values.len(), 2);
254 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
259 .recv()
260 .await
261 .expect("Should receive second IndexedCheckpoint");
262 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
263 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
264 assert_eq!(indexed2.watermark.tx_hi, 10);
265 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
266 assert_eq!(indexed2.values.len(), 2);
267 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
271 assert!(
272 timeout_result.is_err(),
273 "Should timeout waiting for more checkpoints"
274 );
275 }
276
277 #[tokio::test]
278 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
279 let checkpoint1: Arc<Checkpoint> =
281 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
282 let checkpoint2: Arc<Checkpoint> =
283 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
284
285 let processor = Arc::new(DataPipeline);
287 let (data_tx, data_rx) = mpsc::channel(2);
288 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
289 let metrics = IndexerMetrics::new(None, &Default::default());
290
291 let svc = super::processor(processor, data_rx, indexed_tx, metrics);
293
294 data_tx.send(checkpoint1.clone()).await.unwrap();
296
297 let indexed1 = indexed_rx
299 .recv()
300 .await
301 .expect("Should receive first IndexedCheckpoint");
302 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
303
304 svc.shutdown().await.unwrap();
306
307 data_tx.send(checkpoint2.clone()).await.unwrap_err();
310
311 let next_result = indexed_rx.recv().await;
313 assert!(
314 next_result.is_none(),
315 "Channel should be closed after shutdown"
316 );
317 }
318
319 #[tokio::test]
320 async fn test_processor_error_retry_behavior() {
321 struct RetryTestPipeline {
322 attempt_count: Arc<AtomicU32>,
323 }
324
325 #[async_trait]
326 impl Processor for RetryTestPipeline {
327 const NAME: &'static str = "retry_test";
328 type Value = StoredData;
329 async fn process(
330 &self,
331 checkpoint: &Arc<Checkpoint>,
332 ) -> anyhow::Result<Vec<Self::Value>> {
333 if checkpoint.summary.sequence_number == 1 {
334 Ok(vec![])
335 } else {
336 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
337 ensure!(attempt > 2, "Transient error - attempt {attempt}");
338 Ok(vec![])
339 }
340 }
341 }
342
343 let checkpoint1: Arc<Checkpoint> =
345 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
346 let checkpoint2: Arc<Checkpoint> =
347 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
348
349 let attempt_count = Arc::new(AtomicU32::new(0));
350 let processor = Arc::new(RetryTestPipeline {
351 attempt_count: attempt_count.clone(),
352 });
353
354 let (data_tx, data_rx) = mpsc::channel(2);
355 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
356
357 let metrics = IndexerMetrics::new(None, &Default::default());
358
359 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
361
362 data_tx.send(checkpoint1.clone()).await.unwrap();
364 let indexed1 = indexed_rx
365 .recv()
366 .await
367 .expect("Should receive first IndexedCheckpoint");
368 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
369
370 data_tx.send(checkpoint2.clone()).await.unwrap();
372
373 let indexed2 = indexed_rx
374 .recv()
375 .await
376 .expect("Should receive second IndexedCheckpoint after retries");
377 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
378
379 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
381 }
382
383 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
387 async fn test_processor_concurrency() {
388 struct SlowProcessor;
390 #[async_trait]
391 impl Processor for SlowProcessor {
392 const NAME: &'static str = "slow";
393 const FANOUT: usize = 3; type Value = StoredData;
395
396 async fn process(
397 &self,
398 checkpoint: &Arc<Checkpoint>,
399 ) -> anyhow::Result<Vec<Self::Value>> {
400 std::thread::sleep(std::time::Duration::from_millis(500));
402 Ok(vec![StoredData {
403 value: checkpoint.summary.sequence_number,
404 }])
405 }
406 }
407
408 let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
410 .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
411 .collect();
412
413 let processor = Arc::new(SlowProcessor);
415 let (data_tx, data_rx) = mpsc::channel(10);
416 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
417 let metrics = IndexerMetrics::new(None, &Default::default());
418
419 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
421
422 let start = std::time::Instant::now();
424 for checkpoint in checkpoints {
425 data_tx.send(checkpoint).await.unwrap();
426 }
427 drop(data_tx);
428
429 let mut received = Vec::new();
431 while let Some(indexed) = indexed_rx.recv().await {
432 received.push(indexed);
433 }
434
435 let elapsed = start.elapsed();
439 assert!(elapsed < std::time::Duration::from_millis(1200));
440
441 assert_eq!(received.len(), 5);
443 }
444}