sui_indexer_alt_framework/pipeline/
processor.rs1use std::{sync::Arc, time::Duration};
5
6use backoff::ExponentialBackoff;
7use sui_types::full_checkpoint_content::CheckpointData;
8use tokio::{sync::mpsc, task::JoinHandle};
9use tokio_stream::wrappers::ReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13use crate::{
14 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
15 pipeline::Break,
16 task::TrySpawnStreamExt,
17};
18
19use super::IndexedCheckpoint;
20use async_trait::async_trait;
21
22const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24
25const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
27
28#[async_trait]
32pub trait Processor: Send + Sync + 'static {
33 const NAME: &'static str;
35
36 const FANOUT: usize = 10;
38
39 type Value: Send + Sync + 'static;
41
42 async fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
55}
56
57pub(super) fn processor<P: Processor>(
66 processor: Arc<P>,
67 rx: mpsc::Receiver<Arc<CheckpointData>>,
68 tx: mpsc::Sender<IndexedCheckpoint<P>>,
69 metrics: Arc<IndexerMetrics>,
70 cancel: CancellationToken,
71) -> JoinHandle<()> {
72 tokio::spawn(async move {
73 info!(pipeline = P::NAME, "Starting processor");
74 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
75 &metrics.processed_checkpoint_timestamp_lag,
76 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
77 &metrics.latest_processed_checkpoint,
78 );
79
80 match ReceiverStream::new(rx)
81 .try_for_each_spawned(P::FANOUT, |checkpoint| {
82 let tx = tx.clone();
83 let metrics = metrics.clone();
84 let cancel = cancel.clone();
85 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
86 let processor = processor.clone();
87
88 async move {
89 if cancel.is_cancelled() {
90 return Err(Break::Cancel);
91 }
92
93 metrics
94 .total_handler_checkpoints_received
95 .with_label_values(&[P::NAME])
96 .inc();
97
98 let guard = metrics
99 .handler_checkpoint_latency
100 .with_label_values(&[P::NAME])
101 .start_timer();
102
103 let backoff = ExponentialBackoff {
105 initial_interval: INITIAL_RETRY_INTERVAL,
106 current_interval: INITIAL_RETRY_INTERVAL,
107 max_interval: MAX_RETRY_INTERVAL,
108 max_elapsed_time: None, ..Default::default()
110 };
111
112 let values = backoff::future::retry(backoff, || async {
113 processor
114 .process(&checkpoint)
115 .await
116 .map_err(backoff::Error::transient)
117 })
118 .await?;
119
120 let elapsed = guard.stop_and_record();
121
122 let epoch = checkpoint.checkpoint_summary.epoch;
123 let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
124 let tx_hi = checkpoint.checkpoint_summary.network_total_transactions;
125 let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
126
127 debug!(
128 pipeline = P::NAME,
129 checkpoint = cp_sequence_number,
130 elapsed_ms = elapsed * 1000.0,
131 "Processed checkpoint",
132 );
133
134 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
135
136 metrics
137 .total_handler_checkpoints_processed
138 .with_label_values(&[P::NAME])
139 .inc();
140
141 metrics
142 .total_handler_rows_created
143 .with_label_values(&[P::NAME])
144 .inc_by(values.len() as u64);
145
146 tx.send(IndexedCheckpoint::new(
147 epoch,
148 cp_sequence_number,
149 tx_hi,
150 timestamp_ms,
151 values,
152 ))
153 .await
154 .map_err(|_| Break::Cancel)?;
155
156 Ok(())
157 }
158 })
159 .await
160 {
161 Ok(()) => {
162 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
163 }
164
165 Err(Break::Cancel) => {
166 info!(pipeline = P::NAME, "Shutdown received, stopping processor");
167 }
168
169 Err(Break::Err(e)) => {
170 error!(pipeline = P::NAME, "Error from handler: {e}");
171 cancel.cancel();
172 }
173 };
174 })
175}
176
177#[cfg(test)]
178mod tests {
179 use crate::metrics::IndexerMetrics;
180 use anyhow::ensure;
181 use std::{
182 sync::{
183 Arc,
184 atomic::{AtomicU32, Ordering},
185 },
186 time::Duration,
187 };
188 use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;
189 use tokio::{sync::mpsc, time::timeout};
190 use tokio_util::sync::CancellationToken;
191
192 use super::*;
193
194 pub struct StoredData {
195 pub value: u64,
196 }
197
198 pub struct DataPipeline;
199
200 #[async_trait]
201 impl Processor for DataPipeline {
202 const NAME: &'static str = "data";
203
204 type Value = StoredData;
205
206 async fn process(
207 &self,
208 checkpoint: &Arc<CheckpointData>,
209 ) -> anyhow::Result<Vec<Self::Value>> {
210 Ok(vec![
211 StoredData {
212 value: checkpoint.checkpoint_summary.sequence_number * 10 + 1,
213 },
214 StoredData {
215 value: checkpoint.checkpoint_summary.sequence_number * 10 + 2,
216 },
217 ])
218 }
219 }
220
221 #[tokio::test]
222 async fn test_processor_process_checkpoints() {
223 let checkpoint1 = Arc::new(
225 TestCheckpointDataBuilder::new(1)
226 .with_epoch(2)
227 .with_network_total_transactions(5)
228 .with_timestamp_ms(1000000001)
229 .build_checkpoint(),
230 );
231 let checkpoint2 = Arc::new(
232 TestCheckpointDataBuilder::new(2)
233 .with_epoch(2)
234 .with_network_total_transactions(10)
235 .with_timestamp_ms(1000000002)
236 .build_checkpoint(),
237 );
238
239 let processor = Arc::new(DataPipeline);
241 let (data_tx, data_rx) = mpsc::channel(2);
242 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
243 let metrics = IndexerMetrics::new(None, &Default::default());
244 let cancel = CancellationToken::new();
245
246 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
248
249 data_tx.send(checkpoint1.clone()).await.unwrap();
251 data_tx.send(checkpoint2.clone()).await.unwrap();
252
253 let indexed1 = indexed_rx
255 .recv()
256 .await
257 .expect("Should receive first IndexedCheckpoint");
258 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
259 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
260 assert_eq!(indexed1.watermark.tx_hi, 5);
261 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
262 assert_eq!(indexed1.values.len(), 2);
263 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
268 .recv()
269 .await
270 .expect("Should receive second IndexedCheckpoint");
271 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
272 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
273 assert_eq!(indexed2.watermark.tx_hi, 10);
274 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
275 assert_eq!(indexed2.values.len(), 2);
276 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
280 assert!(
281 timeout_result.is_err(),
282 "Should timeout waiting for more checkpoints"
283 );
284
285 drop(data_tx);
287 let _ = handle.await;
288 }
289
290 #[tokio::test]
291 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
292 let checkpoint1 = Arc::new(TestCheckpointDataBuilder::new(1).build_checkpoint());
294 let checkpoint2 = Arc::new(TestCheckpointDataBuilder::new(2).build_checkpoint());
295
296 let processor = Arc::new(DataPipeline);
298 let (data_tx, data_rx) = mpsc::channel(2);
299 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
300 let metrics = IndexerMetrics::new(None, &Default::default());
301 let cancel = CancellationToken::new();
302
303 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
305
306 data_tx.send(checkpoint1.clone()).await.unwrap();
308
309 let indexed1 = indexed_rx
311 .recv()
312 .await
313 .expect("Should receive first IndexedCheckpoint");
314 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
315
316 cancel.cancel();
318
319 data_tx.send(checkpoint2.clone()).await.unwrap();
321
322 let next_result = indexed_rx.recv().await;
324 assert!(
325 next_result.is_none(),
326 "Channel should be closed after cancellation"
327 );
328
329 let _ = handle.await;
331 }
332
333 #[tokio::test]
334 async fn test_processor_error_retry_behavior() {
335 struct RetryTestPipeline {
336 attempt_count: Arc<AtomicU32>,
337 }
338
339 #[async_trait]
340 impl Processor for RetryTestPipeline {
341 const NAME: &'static str = "retry_test";
342 type Value = StoredData;
343 async fn process(
344 &self,
345 checkpoint: &Arc<CheckpointData>,
346 ) -> anyhow::Result<Vec<Self::Value>> {
347 if checkpoint.checkpoint_summary.sequence_number == 1 {
348 Ok(vec![])
349 } else {
350 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
351 ensure!(attempt > 2, "Transient error - attempt {attempt}");
352 Ok(vec![])
353 }
354 }
355 }
356
357 let checkpoint1 = Arc::new(TestCheckpointDataBuilder::new(1).build_checkpoint());
359 let checkpoint2 = Arc::new(TestCheckpointDataBuilder::new(2).build_checkpoint());
360
361 let attempt_count = Arc::new(AtomicU32::new(0));
362 let processor = Arc::new(RetryTestPipeline {
363 attempt_count: attempt_count.clone(),
364 });
365
366 let (data_tx, data_rx) = mpsc::channel(2);
367 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
368
369 let metrics = IndexerMetrics::new(None, &Default::default());
370 let cancel = CancellationToken::new();
371
372 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
374
375 data_tx.send(checkpoint1.clone()).await.unwrap();
377 let indexed1 = indexed_rx
378 .recv()
379 .await
380 .expect("Should receive first IndexedCheckpoint");
381 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
382
383 data_tx.send(checkpoint2.clone()).await.unwrap();
385
386 let indexed2 = indexed_rx
387 .recv()
388 .await
389 .expect("Should receive second IndexedCheckpoint after retries");
390 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
391
392 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
394
395 drop(data_tx);
397 let _ = handle.await;
398 }
399
400 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
404 async fn test_processor_concurrency() {
405 struct SlowProcessor;
407 #[async_trait]
408 impl Processor for SlowProcessor {
409 const NAME: &'static str = "slow";
410 const FANOUT: usize = 3; type Value = StoredData;
412
413 async fn process(
414 &self,
415 checkpoint: &Arc<CheckpointData>,
416 ) -> anyhow::Result<Vec<Self::Value>> {
417 std::thread::sleep(std::time::Duration::from_millis(500));
419 Ok(vec![StoredData {
420 value: checkpoint.checkpoint_summary.sequence_number,
421 }])
422 }
423 }
424
425 let checkpoints: Vec<_> = (0..5)
427 .map(|i| Arc::new(TestCheckpointDataBuilder::new(i).build_checkpoint()))
428 .collect();
429
430 let processor = Arc::new(SlowProcessor);
432 let (data_tx, data_rx) = mpsc::channel(10);
433 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
434 let metrics = IndexerMetrics::new(None, &Default::default());
435 let cancel = CancellationToken::new();
436
437 let handle = super::processor(processor, data_rx, indexed_tx, metrics, cancel.clone());
439
440 let start = std::time::Instant::now();
442 for checkpoint in checkpoints {
443 data_tx.send(checkpoint).await.unwrap();
444 }
445 drop(data_tx);
446
447 let mut received = Vec::new();
449 while let Some(indexed) = indexed_rx.recv().await {
450 received.push(indexed);
451 }
452
453 let elapsed = start.elapsed();
457 assert!(elapsed < std::time::Duration::from_millis(1200));
458
459 assert_eq!(received.len(), 5);
461
462 let _ = handle.await;
464 }
465}