sui_indexer_alt_framework/pipeline/
processor.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::{
9 service::Service,
10 stream::{Break, TrySpawnStreamExt},
11};
12use sui_types::full_checkpoint_content::Checkpoint;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, error, info};
16
17use crate::metrics::{CheckpointLagMetricReporter, IndexerMetrics};
18
19use super::IndexedCheckpoint;
20use async_trait::async_trait;
21
22const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24
25const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(5);
27
28#[async_trait]
32pub trait Processor: Send + Sync + 'static {
33 const NAME: &'static str;
35
36 const FANOUT: usize = 10;
38
39 type Value: Send + Sync + 'static;
41
42 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
55}
56
57pub(super) fn processor<P: Processor>(
64 processor: Arc<P>,
65 rx: mpsc::Receiver<Arc<Checkpoint>>,
66 tx: mpsc::Sender<IndexedCheckpoint<P>>,
67 metrics: Arc<IndexerMetrics>,
68) -> Service {
69 Service::new().spawn_aborting(async move {
70 info!(pipeline = P::NAME, "Starting processor");
71 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<P>(
72 &metrics.processed_checkpoint_timestamp_lag,
73 &metrics.latest_processed_checkpoint_timestamp_lag_ms,
74 &metrics.latest_processed_checkpoint,
75 );
76
77 match ReceiverStream::new(rx)
78 .try_for_each_spawned(P::FANOUT, |checkpoint| {
79 let tx = tx.clone();
80 let metrics = metrics.clone();
81 let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
82 let processor = processor.clone();
83
84 async move {
85 metrics
86 .total_handler_checkpoints_received
87 .with_label_values(&[P::NAME])
88 .inc();
89
90 let guard = metrics
91 .handler_checkpoint_latency
92 .with_label_values(&[P::NAME])
93 .start_timer();
94
95 let backoff = ExponentialBackoff {
97 initial_interval: INITIAL_RETRY_INTERVAL,
98 current_interval: INITIAL_RETRY_INTERVAL,
99 max_interval: MAX_RETRY_INTERVAL,
100 max_elapsed_time: None, ..Default::default()
102 };
103
104 let values = backoff::future::retry(backoff, || async {
105 processor
106 .process(&checkpoint)
107 .await
108 .map_err(backoff::Error::transient)
109 })
110 .await?;
111
112 let elapsed = guard.stop_and_record();
113
114 let epoch = checkpoint.summary.epoch;
115 let cp_sequence_number = checkpoint.summary.sequence_number;
116 let tx_hi = checkpoint.summary.network_total_transactions;
117 let timestamp_ms = checkpoint.summary.timestamp_ms;
118
119 debug!(
120 pipeline = P::NAME,
121 checkpoint = cp_sequence_number,
122 elapsed_ms = elapsed * 1000.0,
123 "Processed checkpoint",
124 );
125
126 checkpoint_lag_reporter.report_lag(cp_sequence_number, timestamp_ms);
127
128 metrics
129 .total_handler_checkpoints_processed
130 .with_label_values(&[P::NAME])
131 .inc();
132
133 metrics
134 .total_handler_rows_created
135 .with_label_values(&[P::NAME])
136 .inc_by(values.len() as u64);
137
138 tx.send(IndexedCheckpoint::new(
139 epoch,
140 cp_sequence_number,
141 tx_hi,
142 timestamp_ms,
143 values,
144 ))
145 .await
146 .map_err(|_| Break::Break)?;
147
148 Ok(())
149 }
150 })
151 .await
152 {
153 Ok(()) => {
154 info!(pipeline = P::NAME, "Checkpoints done, stopping processor");
155 }
156
157 Err(Break::Break) => {
158 info!(pipeline = P::NAME, "Channel closed, stopping processor");
159 }
160
161 Err(Break::Err(e)) => {
162 error!(pipeline = P::NAME, "Error from handler: {e}");
163 return Err(e.context(format!("Error from processor {}", P::NAME)));
164 }
165 };
166
167 Ok(())
168 })
169}
170
171#[cfg(test)]
172mod tests {
173 use crate::metrics::IndexerMetrics;
174 use anyhow::ensure;
175 use std::{
176 sync::{
177 Arc,
178 atomic::{AtomicU32, Ordering},
179 },
180 time::Duration,
181 };
182 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
183 use tokio::{sync::mpsc, time::timeout};
184
185 use super::*;
186
187 pub struct StoredData {
188 pub value: u64,
189 }
190
191 pub struct DataPipeline;
192
193 #[async_trait]
194 impl Processor for DataPipeline {
195 const NAME: &'static str = "data";
196
197 type Value = StoredData;
198
199 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
200 Ok(vec![
201 StoredData {
202 value: checkpoint.summary.sequence_number * 10 + 1,
203 },
204 StoredData {
205 value: checkpoint.summary.sequence_number * 10 + 2,
206 },
207 ])
208 }
209 }
210
211 #[tokio::test]
212 async fn test_processor_process_checkpoints() {
213 let checkpoint1: Arc<Checkpoint> = Arc::new(
215 TestCheckpointBuilder::new(1)
216 .with_epoch(2)
217 .with_network_total_transactions(5)
218 .with_timestamp_ms(1000000001)
219 .build_checkpoint(),
220 );
221 let checkpoint2: Arc<Checkpoint> = Arc::new(
222 TestCheckpointBuilder::new(2)
223 .with_epoch(2)
224 .with_network_total_transactions(10)
225 .with_timestamp_ms(1000000002)
226 .build_checkpoint(),
227 );
228
229 let processor = Arc::new(DataPipeline);
231 let (data_tx, data_rx) = mpsc::channel(2);
232 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
233 let metrics = IndexerMetrics::new(None, &Default::default());
234
235 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
237
238 data_tx.send(checkpoint1.clone()).await.unwrap();
240 data_tx.send(checkpoint2.clone()).await.unwrap();
241
242 let indexed1 = indexed_rx
244 .recv()
245 .await
246 .expect("Should receive first IndexedCheckpoint");
247 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
248 assert_eq!(indexed1.watermark.epoch_hi_inclusive, 2);
249 assert_eq!(indexed1.watermark.tx_hi, 5);
250 assert_eq!(indexed1.watermark.timestamp_ms_hi_inclusive, 1000000001);
251 assert_eq!(indexed1.values.len(), 2);
252 assert_eq!(indexed1.values[0].value, 11); assert_eq!(indexed1.values[1].value, 12); let indexed2 = indexed_rx
257 .recv()
258 .await
259 .expect("Should receive second IndexedCheckpoint");
260 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
261 assert_eq!(indexed2.watermark.epoch_hi_inclusive, 2);
262 assert_eq!(indexed2.watermark.tx_hi, 10);
263 assert_eq!(indexed2.watermark.timestamp_ms_hi_inclusive, 1000000002);
264 assert_eq!(indexed2.values.len(), 2);
265 assert_eq!(indexed2.values[0].value, 21); assert_eq!(indexed2.values[1].value, 22); let timeout_result = timeout(Duration::from_secs(1), indexed_rx.recv()).await;
269 assert!(
270 timeout_result.is_err(),
271 "Should timeout waiting for more checkpoints"
272 );
273 }
274
275 #[tokio::test]
276 async fn test_processor_does_not_process_checkpoint_after_cancellation() {
277 let checkpoint1: Arc<Checkpoint> =
279 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
280 let checkpoint2: Arc<Checkpoint> =
281 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
282
283 let processor = Arc::new(DataPipeline);
285 let (data_tx, data_rx) = mpsc::channel(2);
286 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
287 let metrics = IndexerMetrics::new(None, &Default::default());
288
289 let svc = super::processor(processor, data_rx, indexed_tx, metrics);
291
292 data_tx.send(checkpoint1.clone()).await.unwrap();
294
295 let indexed1 = indexed_rx
297 .recv()
298 .await
299 .expect("Should receive first IndexedCheckpoint");
300 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
301
302 svc.shutdown().await.unwrap();
304
305 data_tx.send(checkpoint2.clone()).await.unwrap_err();
308
309 let next_result = indexed_rx.recv().await;
311 assert!(
312 next_result.is_none(),
313 "Channel should be closed after shutdown"
314 );
315 }
316
317 #[tokio::test]
318 async fn test_processor_error_retry_behavior() {
319 struct RetryTestPipeline {
320 attempt_count: Arc<AtomicU32>,
321 }
322
323 #[async_trait]
324 impl Processor for RetryTestPipeline {
325 const NAME: &'static str = "retry_test";
326 type Value = StoredData;
327 async fn process(
328 &self,
329 checkpoint: &Arc<Checkpoint>,
330 ) -> anyhow::Result<Vec<Self::Value>> {
331 if checkpoint.summary.sequence_number == 1 {
332 Ok(vec![])
333 } else {
334 let attempt = self.attempt_count.fetch_add(1, Ordering::Relaxed) + 1;
335 ensure!(attempt > 2, "Transient error - attempt {attempt}");
336 Ok(vec![])
337 }
338 }
339 }
340
341 let checkpoint1: Arc<Checkpoint> =
343 Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
344 let checkpoint2: Arc<Checkpoint> =
345 Arc::new(TestCheckpointBuilder::new(2).build_checkpoint());
346
347 let attempt_count = Arc::new(AtomicU32::new(0));
348 let processor = Arc::new(RetryTestPipeline {
349 attempt_count: attempt_count.clone(),
350 });
351
352 let (data_tx, data_rx) = mpsc::channel(2);
353 let (indexed_tx, mut indexed_rx) = mpsc::channel(2);
354
355 let metrics = IndexerMetrics::new(None, &Default::default());
356
357 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
359
360 data_tx.send(checkpoint1.clone()).await.unwrap();
362 let indexed1 = indexed_rx
363 .recv()
364 .await
365 .expect("Should receive first IndexedCheckpoint");
366 assert_eq!(indexed1.watermark.checkpoint_hi_inclusive, 1);
367
368 data_tx.send(checkpoint2.clone()).await.unwrap();
370
371 let indexed2 = indexed_rx
372 .recv()
373 .await
374 .expect("Should receive second IndexedCheckpoint after retries");
375 assert_eq!(indexed2.watermark.checkpoint_hi_inclusive, 2);
376
377 assert_eq!(attempt_count.load(Ordering::Relaxed), 3);
379 }
380
381 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
385 async fn test_processor_concurrency() {
386 struct SlowProcessor;
388 #[async_trait]
389 impl Processor for SlowProcessor {
390 const NAME: &'static str = "slow";
391 const FANOUT: usize = 3; type Value = StoredData;
393
394 async fn process(
395 &self,
396 checkpoint: &Arc<Checkpoint>,
397 ) -> anyhow::Result<Vec<Self::Value>> {
398 std::thread::sleep(std::time::Duration::from_millis(500));
400 Ok(vec![StoredData {
401 value: checkpoint.summary.sequence_number,
402 }])
403 }
404 }
405
406 let checkpoints: Vec<Arc<Checkpoint>> = (0..5)
408 .map(|i| Arc::new(TestCheckpointBuilder::new(i).build_checkpoint()))
409 .collect();
410
411 let processor = Arc::new(SlowProcessor);
413 let (data_tx, data_rx) = mpsc::channel(10);
414 let (indexed_tx, mut indexed_rx) = mpsc::channel(10);
415 let metrics = IndexerMetrics::new(None, &Default::default());
416
417 let _svc = super::processor(processor, data_rx, indexed_tx, metrics);
419
420 let start = std::time::Instant::now();
422 for checkpoint in checkpoints {
423 data_tx.send(checkpoint).await.unwrap();
424 }
425 drop(data_tx);
426
427 let mut received = Vec::new();
429 while let Some(indexed) = indexed_rx.recv().await {
430 received.push(indexed);
431 }
432
433 let elapsed = start.elapsed();
437 assert!(elapsed < std::time::Duration::from_millis(1200));
438
439 assert_eq!(received.len(), 5);
441 }
442}