sui_indexer_alt_framework/ingestion/
ingestion_client.rs1use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use sui_futures::future::with_slow_future_monitor;
14use sui_rpc::Client;
15use sui_rpc::client::HeadersInterceptor;
16use sui_storage::blob::Blob;
17use tracing::debug;
18use tracing::error;
19use tracing::warn;
20use url::Url;
21
22use crate::ingestion::Error as IngestionError;
23use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
24use crate::ingestion::Result as IngestionResult;
25use crate::ingestion::local_client::LocalIngestionClient;
26use crate::ingestion::remote_client::RemoteIngestionClient;
27use crate::metrics::CheckpointLagMetricReporter;
28use crate::metrics::IngestionMetrics;
29use crate::types::full_checkpoint_content::Checkpoint;
30use crate::types::full_checkpoint_content::CheckpointData;
31
32const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
34
35const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
41
42#[async_trait]
43pub(crate) trait IngestionClientTrait: Send + Sync {
44 async fn fetch(&self, checkpoint: u64) -> FetchResult;
45}
46
47#[derive(clap::Args, Clone, Debug, Default)]
48#[group(required = true)]
49pub struct IngestionClientArgs {
50 #[clap(long, group = "source")]
52 pub remote_store_url: Option<Url>,
53
54 #[clap(long, group = "source")]
57 pub local_ingestion_path: Option<PathBuf>,
58
59 #[clap(long, env, group = "source")]
62 pub rpc_api_url: Option<Url>,
63
64 #[clap(long, env)]
66 pub rpc_username: Option<String>,
67
68 #[clap(long, env)]
70 pub rpc_password: Option<String>,
71}
72
73#[derive(thiserror::Error, Debug)]
74pub enum FetchError {
75 #[error("Checkpoint not found")]
76 NotFound,
77 #[error("Failed to fetch checkpoint due to {reason}: {error}")]
78 Transient {
79 reason: &'static str,
80 #[source]
81 error: anyhow::Error,
82 },
83 #[error("Permanent error in {reason}: {error}")]
84 Permanent {
85 reason: &'static str,
86 #[source]
87 error: anyhow::Error,
88 },
89}
90
91pub type FetchResult = Result<FetchData, FetchError>;
92
93#[derive(Clone)]
94#[allow(clippy::large_enum_variant)]
95pub enum FetchData {
96 Raw(Bytes),
97 Checkpoint(Checkpoint),
98}
99
100#[derive(Clone)]
101pub struct IngestionClient {
102 client: Arc<dyn IngestionClientTrait>,
103 metrics: Arc<IngestionMetrics>,
105 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
106}
107
108impl IngestionClient {
109 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
111 let client = if let Some(url) = args.remote_store_url.as_ref() {
113 IngestionClient::new_remote(url.clone(), metrics.clone())?
114 } else if let Some(path) = args.local_ingestion_path.as_ref() {
115 IngestionClient::new_local(path.clone(), metrics.clone())
116 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
117 IngestionClient::new_rpc(
118 rpc_api_url.clone(),
119 args.rpc_username,
120 args.rpc_password,
121 metrics.clone(),
122 )?
123 } else {
124 panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
125 };
126
127 Ok(client)
128 }
129
130 pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
133 let client = Arc::new(RemoteIngestionClient::new(url)?);
134 Ok(Self::new_impl(client, metrics))
135 }
136
137 pub fn new_remote_with_timeout(
140 url: Url,
141 timeout: std::time::Duration,
142 metrics: Arc<IngestionMetrics>,
143 ) -> IngestionResult<Self> {
144 let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
145 Ok(Self::new_impl(client, metrics))
146 }
147
148 pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
150 let client = Arc::new(LocalIngestionClient::new(path));
151 Self::new_impl(client, metrics)
152 }
153
154 pub fn new_rpc(
156 url: Url,
157 username: Option<String>,
158 password: Option<String>,
159 metrics: Arc<IngestionMetrics>,
160 ) -> IngestionResult<Self> {
161 let client = if let Some(username) = username {
162 let mut headers = HeadersInterceptor::new();
163 headers.basic_auth(username, password);
164 Client::new(url.to_string())?
165 .with_headers(headers)
166 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167 } else {
168 Client::new(url.to_string())?
169 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
170 };
171 Ok(Self::new_impl(Arc::new(client), metrics))
172 }
173
174 pub(crate) fn new_impl(
175 client: Arc<dyn IngestionClientTrait>,
176 metrics: Arc<IngestionMetrics>,
177 ) -> Self {
178 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
179 metrics.ingested_checkpoint_timestamp_lag.clone(),
180 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
181 metrics.latest_ingested_checkpoint.clone(),
182 );
183 IngestionClient {
184 client,
185 metrics,
186 checkpoint_lag_reporter,
187 }
188 }
189
190 pub async fn wait_for(
196 &self,
197 checkpoint: u64,
198 retry_interval: Duration,
199 ) -> IngestionResult<Arc<Checkpoint>> {
200 let backoff = Constant::new(retry_interval);
201 let fetch = || async move {
202 use backoff::Error as BE;
203 self.fetch(checkpoint).await.map_err(|e| match e {
204 IngestionError::NotFound(checkpoint) => {
205 debug!(checkpoint, "Checkpoint not found, retrying...");
206 self.metrics.total_ingested_not_found_retries.inc();
207 BE::transient(e)
208 }
209 e => BE::permanent(e),
210 })
211 };
212
213 backoff::future::retry(backoff, fetch).await
214 }
215
216 pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
225 let client = self.client.clone();
226 let request = move || {
227 let client = client.clone();
228 async move {
229 let fetch_data = with_slow_future_monitor(
230 client.fetch(checkpoint),
231 SLOW_OPERATION_WARNING_THRESHOLD,
232 || {
234 warn!(
235 checkpoint,
236 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
237 "Slow checkpoint fetch operation detected"
238 );
239 },
240 )
241 .await
242 .map_err(|err| match err {
243 FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
244 FetchError::Transient { reason, error } => self.metrics.inc_retry(
245 checkpoint,
246 reason,
247 IngestionError::FetchError(checkpoint, error),
248 ),
249 FetchError::Permanent { reason, error } => {
250 error!(checkpoint, reason, "Permanent fetch error: {error}");
251 self.metrics
252 .total_ingested_permanent_errors
253 .with_label_values(&[reason])
254 .inc();
255 BE::permanent(IngestionError::FetchError(checkpoint, error))
256 }
257 })?;
258
259 Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
260 FetchData::Raw(bytes) => {
261 self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
262 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
263 self.metrics.inc_retry(
264 checkpoint,
265 "deserialization",
266 IngestionError::DeserializationError(checkpoint, e),
267 )
268 })?;
269 checkpoint.into()
270 }
271 FetchData::Checkpoint(data) => {
272 data
275 }
276 })
277 }
278 };
279
280 let backoff = ExponentialBackoff {
282 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
283 max_elapsed_time: None,
284 ..Default::default()
285 };
286
287 let guard = self.metrics.ingested_checkpoint_latency.start_timer();
288 let data = backoff::future::retry(backoff, request).await?;
289 let elapsed = guard.stop_and_record();
290
291 debug!(
292 checkpoint,
293 elapsed_ms = elapsed * 1000.0,
294 "Fetched checkpoint"
295 );
296
297 self.checkpoint_lag_reporter
298 .report_lag(checkpoint, data.summary.timestamp_ms);
299
300 self.metrics.total_ingested_checkpoints.inc();
301
302 self.metrics
303 .total_ingested_transactions
304 .inc_by(data.transactions.len() as u64);
305
306 self.metrics.total_ingested_events.inc_by(
307 data.transactions
308 .iter()
309 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
310 .sum(),
311 );
312
313 self.metrics
314 .total_ingested_objects
315 .inc_by(data.object_set.len() as u64);
316
317 Ok(Arc::new(data))
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use dashmap::DashMap;
324 use prometheus::Registry;
325 use std::sync::Arc;
326 use std::time::Duration;
327 use tokio::time::timeout;
328
329 use crate::ingestion::test_utils::test_checkpoint_data;
330
331 use super::*;
332
333 #[derive(Default)]
335 struct MockIngestionClient {
336 checkpoints: DashMap<u64, FetchData>,
337 transient_failures: DashMap<u64, usize>,
338 not_found_failures: DashMap<u64, usize>,
339 permanent_failures: DashMap<u64, usize>,
340 }
341
342 #[async_trait]
343 impl IngestionClientTrait for MockIngestionClient {
344 async fn fetch(&self, checkpoint: u64) -> FetchResult {
345 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
347 && *remaining > 0
348 {
349 *remaining -= 1;
350 return Err(FetchError::NotFound);
351 }
352
353 if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
355 && *remaining > 0
356 {
357 *remaining -= 1;
358 return Err(FetchError::Permanent {
359 reason: "mock_permanent_error",
360 error: anyhow::anyhow!("Mock permanent error"),
361 });
362 }
363
364 if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
366 && *remaining > 0
367 {
368 *remaining -= 1;
369 return Err(FetchError::Transient {
370 reason: "mock_transient_error",
371 error: anyhow::anyhow!("Mock transient error"),
372 });
373 }
374
375 self.checkpoints
377 .get(&checkpoint)
378 .as_deref()
379 .cloned()
380 .ok_or(FetchError::NotFound)
381 }
382 }
383
384 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
385 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
386 let metrics = IngestionMetrics::new(None, ®istry);
387 let mock_client = Arc::new(MockIngestionClient::default());
388 let client = IngestionClient::new_impl(mock_client.clone(), metrics);
389 (client, mock_client)
390 }
391
392 #[tokio::test]
393 async fn test_fetch_raw_bytes_success() {
394 let (client, mock) = setup_test();
395
396 let bytes = Bytes::from(test_checkpoint_data(1));
398 mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
399
400 let result = client.fetch(1).await.unwrap();
402 assert_eq!(result.summary.sequence_number(), &1);
403 }
404
405 #[tokio::test]
406 async fn test_fetch_checkpoint_success() {
407 let (client, mock) = setup_test();
408
409 let bytes = test_checkpoint_data(1);
411 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
412 mock.checkpoints
413 .insert(1, FetchData::Checkpoint(checkpoint.into()));
414
415 let result = client.fetch(1).await.unwrap();
417 assert_eq!(result.summary.sequence_number(), &1);
418 }
419
420 #[tokio::test]
421 async fn test_fetch_not_found() {
422 let (client, _) = setup_test();
423
424 let result = client.fetch(1).await;
426 assert!(matches!(result, Err(IngestionError::NotFound(1))));
427 }
428
429 #[tokio::test]
430 async fn test_fetch_transient_error_with_retry() {
431 let (client, mock) = setup_test();
432
433 let bytes = test_checkpoint_data(1);
435 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
436
437 mock.checkpoints
439 .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
440 mock.transient_failures.insert(1, 2);
441
442 let result = client.fetch(1).await.unwrap();
444 assert_eq!(*result.summary.sequence_number(), 1);
445
446 let retries = client
448 .metrics
449 .total_ingested_transient_retries
450 .with_label_values(&["mock_transient_error"])
451 .get();
452 assert_eq!(retries, 2);
453 }
454
455 #[tokio::test]
456 async fn test_wait_for_checkpoint_with_retry() {
457 let (client, mock) = setup_test();
458
459 let bytes = test_checkpoint_data(1);
461 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
462
463 mock.checkpoints
465 .insert(1, FetchData::Checkpoint(checkpoint.into()));
466 mock.not_found_failures.insert(1, 1);
467
468 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
470 assert_eq!(result.summary.sequence_number(), &1);
471
472 let retries = client.metrics.total_ingested_not_found_retries.get();
474 assert_eq!(retries, 1);
475 }
476
477 #[tokio::test]
478 async fn test_wait_for_checkpoint_instant() {
479 let (client, mock) = setup_test();
480
481 let bytes = test_checkpoint_data(1);
483 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
484
485 mock.checkpoints
487 .insert(1, FetchData::Checkpoint(checkpoint.into()));
488
489 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
491 assert_eq!(result.summary.sequence_number(), &1);
492 }
493
494 #[tokio::test]
495 async fn test_wait_for_permanent_deserialization_error() {
496 let (client, mock) = setup_test();
497
498 mock.checkpoints
500 .insert(1, FetchData::Raw(Bytes::from("invalid data")));
501
502 timeout(
504 Duration::from_secs(1),
505 client.wait_for(1, Duration::from_millis(50)),
506 )
507 .await
508 .unwrap_err();
509 }
510
511 #[tokio::test]
512 async fn test_fetch_non_retryable_error() {
513 let (client, mock) = setup_test();
514
515 mock.permanent_failures.insert(1, 1);
516
517 let result = client.fetch(1).await;
518 assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
519
520 let errors = client
522 .metrics
523 .total_ingested_permanent_errors
524 .with_label_values(&["mock_permanent_error"])
525 .get();
526 assert_eq!(errors, 1);
527 }
528}