sui_indexer_alt_framework/ingestion/
ingestion_client.rs1use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use sui_futures::future::with_slow_future_monitor;
14use sui_rpc::Client;
15use sui_rpc::client::HeadersInterceptor;
16use sui_storage::blob::Blob;
17use tracing::{debug, error, warn};
18use url::Url;
19
20use crate::ingestion::Error as IngestionError;
21use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
22use crate::ingestion::Result as IngestionResult;
23use crate::ingestion::local_client::LocalIngestionClient;
24use crate::ingestion::remote_client::RemoteIngestionClient;
25use crate::metrics::CheckpointLagMetricReporter;
26use crate::metrics::IngestionMetrics;
27use crate::types::full_checkpoint_content::{Checkpoint, CheckpointData};
28
29const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
31
32const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
38
39#[async_trait]
40pub(crate) trait IngestionClientTrait: Send + Sync {
41 async fn fetch(&self, checkpoint: u64) -> FetchResult;
42}
43
44#[derive(clap::Args, Clone, Debug, Default)]
45#[group(required = true)]
46pub struct IngestionClientArgs {
47 #[clap(long, group = "source")]
49 pub remote_store_url: Option<Url>,
50
51 #[clap(long, group = "source")]
54 pub local_ingestion_path: Option<PathBuf>,
55
56 #[clap(long, env, group = "source")]
59 pub rpc_api_url: Option<Url>,
60
61 #[clap(long, env)]
63 pub rpc_username: Option<String>,
64
65 #[clap(long, env)]
67 pub rpc_password: Option<String>,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum FetchError {
72 #[error("Checkpoint not found")]
73 NotFound,
74 #[error("Failed to fetch checkpoint due to {reason}: {error}")]
75 Transient {
76 reason: &'static str,
77 #[source]
78 error: anyhow::Error,
79 },
80 #[error("Permenent error in {reason}: {error}")]
81 Permanent {
82 reason: &'static str,
83 #[source]
84 error: anyhow::Error,
85 },
86}
87
88pub type FetchResult = Result<FetchData, FetchError>;
89
90#[derive(Clone)]
91#[allow(clippy::large_enum_variant)]
92pub enum FetchData {
93 Raw(Bytes),
94 Checkpoint(Checkpoint),
95}
96
97#[derive(Clone)]
98pub struct IngestionClient {
99 client: Arc<dyn IngestionClientTrait>,
100 metrics: Arc<IngestionMetrics>,
102 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
103}
104
105impl IngestionClient {
106 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
108 let client = if let Some(url) = args.remote_store_url.as_ref() {
110 IngestionClient::new_remote(url.clone(), metrics.clone())?
111 } else if let Some(path) = args.local_ingestion_path.as_ref() {
112 IngestionClient::new_local(path.clone(), metrics.clone())
113 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
114 IngestionClient::new_rpc(
115 rpc_api_url.clone(),
116 args.rpc_username,
117 args.rpc_password,
118 metrics.clone(),
119 )?
120 } else {
121 panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
122 };
123
124 Ok(client)
125 }
126
127 pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
130 let client = Arc::new(RemoteIngestionClient::new(url)?);
131 Ok(Self::new_impl(client, metrics))
132 }
133
134 pub fn new_remote_with_timeout(
137 url: Url,
138 timeout: std::time::Duration,
139 metrics: Arc<IngestionMetrics>,
140 ) -> IngestionResult<Self> {
141 let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
142 Ok(Self::new_impl(client, metrics))
143 }
144
145 pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
147 let client = Arc::new(LocalIngestionClient::new(path));
148 Self::new_impl(client, metrics)
149 }
150
151 pub fn new_rpc(
153 url: Url,
154 username: Option<String>,
155 password: Option<String>,
156 metrics: Arc<IngestionMetrics>,
157 ) -> IngestionResult<Self> {
158 let client = if let Some(username) = username {
159 let mut headers = HeadersInterceptor::new();
160 headers.basic_auth(username, password);
161 Client::new(url.to_string())?
162 .with_headers(headers)
163 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
164 } else {
165 Client::new(url.to_string())?
166 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167 };
168 Ok(Self::new_impl(Arc::new(client), metrics))
169 }
170
171 pub(crate) fn new_impl(
172 client: Arc<dyn IngestionClientTrait>,
173 metrics: Arc<IngestionMetrics>,
174 ) -> Self {
175 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
176 metrics.ingested_checkpoint_timestamp_lag.clone(),
177 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
178 metrics.latest_ingested_checkpoint.clone(),
179 );
180 IngestionClient {
181 client,
182 metrics,
183 checkpoint_lag_reporter,
184 }
185 }
186
187 pub async fn wait_for(
193 &self,
194 checkpoint: u64,
195 retry_interval: Duration,
196 ) -> IngestionResult<Arc<Checkpoint>> {
197 let backoff = Constant::new(retry_interval);
198 let fetch = || async move {
199 use backoff::Error as BE;
200 self.fetch(checkpoint).await.map_err(|e| match e {
201 IngestionError::NotFound(checkpoint) => {
202 debug!(checkpoint, "Checkpoint not found, retrying...");
203 self.metrics.total_ingested_not_found_retries.inc();
204 BE::transient(e)
205 }
206 e => BE::permanent(e),
207 })
208 };
209
210 backoff::future::retry(backoff, fetch).await
211 }
212
213 pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
222 let client = self.client.clone();
223 let request = move || {
224 let client = client.clone();
225 async move {
226 let fetch_data = with_slow_future_monitor(
227 client.fetch(checkpoint),
228 SLOW_OPERATION_WARNING_THRESHOLD,
229 || {
231 warn!(
232 checkpoint,
233 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
234 "Slow checkpoint fetch operation detected"
235 );
236 },
237 )
238 .await
239 .map_err(|err| match err {
240 FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
241 FetchError::Transient { reason, error } => self.metrics.inc_retry(
242 checkpoint,
243 reason,
244 IngestionError::FetchError(checkpoint, error),
245 ),
246 FetchError::Permanent { reason, error } => {
247 error!(checkpoint, reason, "Permanent fetch error: {error}");
248 self.metrics
249 .total_ingested_permanent_errors
250 .with_label_values(&[reason])
251 .inc();
252 BE::permanent(IngestionError::FetchError(checkpoint, error))
253 }
254 })?;
255
256 Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
257 FetchData::Raw(bytes) => {
258 self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
259 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
260 self.metrics.inc_retry(
261 checkpoint,
262 "deserialization",
263 IngestionError::DeserializationError(checkpoint, e),
264 )
265 })?;
266 checkpoint.into()
267 }
268 FetchData::Checkpoint(data) => {
269 data
272 }
273 })
274 }
275 };
276
277 let backoff = ExponentialBackoff {
279 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
280 max_elapsed_time: None,
281 ..Default::default()
282 };
283
284 let guard = self.metrics.ingested_checkpoint_latency.start_timer();
285 let data = backoff::future::retry(backoff, request).await?;
286 let elapsed = guard.stop_and_record();
287
288 debug!(
289 checkpoint,
290 elapsed_ms = elapsed * 1000.0,
291 "Fetched checkpoint"
292 );
293
294 self.checkpoint_lag_reporter
295 .report_lag(checkpoint, data.summary.timestamp_ms);
296
297 self.metrics.total_ingested_checkpoints.inc();
298
299 self.metrics
300 .total_ingested_transactions
301 .inc_by(data.transactions.len() as u64);
302
303 self.metrics.total_ingested_events.inc_by(
304 data.transactions
305 .iter()
306 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
307 .sum(),
308 );
309
310 self.metrics
311 .total_ingested_objects
312 .inc_by(data.object_set.len() as u64);
313
314 Ok(Arc::new(data))
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use dashmap::DashMap;
321 use prometheus::Registry;
322 use std::sync::Arc;
323 use std::time::Duration;
324 use tokio::time::timeout;
325
326 use crate::ingestion::test_utils::test_checkpoint_data;
327
328 use super::*;
329
330 #[derive(Default)]
332 struct MockIngestionClient {
333 checkpoints: DashMap<u64, FetchData>,
334 transient_failures: DashMap<u64, usize>,
335 not_found_failures: DashMap<u64, usize>,
336 permanent_failures: DashMap<u64, usize>,
337 }
338
339 #[async_trait]
340 impl IngestionClientTrait for MockIngestionClient {
341 async fn fetch(&self, checkpoint: u64) -> FetchResult {
342 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
344 && *remaining > 0
345 {
346 *remaining -= 1;
347 return Err(FetchError::NotFound);
348 }
349
350 if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
352 && *remaining > 0
353 {
354 *remaining -= 1;
355 return Err(FetchError::Permanent {
356 reason: "mock_permanent_error",
357 error: anyhow::anyhow!("Mock permanent error"),
358 });
359 }
360
361 if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
363 && *remaining > 0
364 {
365 *remaining -= 1;
366 return Err(FetchError::Transient {
367 reason: "mock_transient_error",
368 error: anyhow::anyhow!("Mock transient error"),
369 });
370 }
371
372 self.checkpoints
374 .get(&checkpoint)
375 .as_deref()
376 .cloned()
377 .ok_or(FetchError::NotFound)
378 }
379 }
380
381 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
382 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
383 let metrics = IngestionMetrics::new(None, ®istry);
384 let mock_client = Arc::new(MockIngestionClient::default());
385 let client = IngestionClient::new_impl(mock_client.clone(), metrics);
386 (client, mock_client)
387 }
388
389 #[tokio::test]
390 async fn test_fetch_raw_bytes_success() {
391 let (client, mock) = setup_test();
392
393 let bytes = Bytes::from(test_checkpoint_data(1));
395 mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
396
397 let result = client.fetch(1).await.unwrap();
399 assert_eq!(result.summary.sequence_number(), &1);
400 }
401
402 #[tokio::test]
403 async fn test_fetch_checkpoint_success() {
404 let (client, mock) = setup_test();
405
406 let bytes = test_checkpoint_data(1);
408 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
409 mock.checkpoints
410 .insert(1, FetchData::Checkpoint(checkpoint.into()));
411
412 let result = client.fetch(1).await.unwrap();
414 assert_eq!(result.summary.sequence_number(), &1);
415 }
416
417 #[tokio::test]
418 async fn test_fetch_not_found() {
419 let (client, _) = setup_test();
420
421 let result = client.fetch(1).await;
423 assert!(matches!(result, Err(IngestionError::NotFound(1))));
424 }
425
426 #[tokio::test]
427 async fn test_fetch_transient_error_with_retry() {
428 let (client, mock) = setup_test();
429
430 let bytes = test_checkpoint_data(1);
432 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
433
434 mock.checkpoints
436 .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
437 mock.transient_failures.insert(1, 2);
438
439 let result = client.fetch(1).await.unwrap();
441 assert_eq!(*result.summary.sequence_number(), 1);
442
443 let retries = client
445 .metrics
446 .total_ingested_transient_retries
447 .with_label_values(&["mock_transient_error"])
448 .get();
449 assert_eq!(retries, 2);
450 }
451
452 #[tokio::test]
453 async fn test_wait_for_checkpoint_with_retry() {
454 let (client, mock) = setup_test();
455
456 let bytes = test_checkpoint_data(1);
458 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
459
460 mock.checkpoints
462 .insert(1, FetchData::Checkpoint(checkpoint.into()));
463 mock.not_found_failures.insert(1, 1);
464
465 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
467 assert_eq!(result.summary.sequence_number(), &1);
468
469 let retries = client.metrics.total_ingested_not_found_retries.get();
471 assert_eq!(retries, 1);
472 }
473
474 #[tokio::test]
475 async fn test_wait_for_checkpoint_instant() {
476 let (client, mock) = setup_test();
477
478 let bytes = test_checkpoint_data(1);
480 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
481
482 mock.checkpoints
484 .insert(1, FetchData::Checkpoint(checkpoint.into()));
485
486 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
488 assert_eq!(result.summary.sequence_number(), &1);
489 }
490
491 #[tokio::test]
492 async fn test_wait_for_permanent_deserialization_error() {
493 let (client, mock) = setup_test();
494
495 mock.checkpoints
497 .insert(1, FetchData::Raw(Bytes::from("invalid data")));
498
499 timeout(
501 Duration::from_secs(1),
502 client.wait_for(1, Duration::from_millis(50)),
503 )
504 .await
505 .unwrap_err();
506 }
507
508 #[tokio::test]
509 async fn test_fetch_non_retryable_error() {
510 let (client, mock) = setup_test();
511
512 mock.permanent_failures.insert(1, 1);
513
514 let result = client.fetch(1).await;
515 assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
516
517 let errors = client
519 .metrics
520 .total_ingested_permanent_errors
521 .with_label_values(&["mock_permanent_error"])
522 .get();
523 assert_eq!(errors, 1);
524 }
525}