sui_indexer_alt_framework/ingestion/
ingestion_client.rs1use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::Error as BE;
9use backoff::ExponentialBackoff;
10use backoff::backoff::Constant;
11use sui_rpc::Client;
12use sui_rpc::client::HeadersInterceptor;
13use sui_storage::blob::Blob;
14use tokio_util::bytes::Bytes;
15use tracing::{debug, error, warn};
16use url::Url;
17
18use crate::ingestion::Error as IngestionError;
19use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
20use crate::ingestion::Result as IngestionResult;
21use crate::ingestion::local_client::LocalIngestionClient;
22use crate::ingestion::remote_client::RemoteIngestionClient;
23use crate::metrics::CheckpointLagMetricReporter;
24use crate::metrics::IngestionMetrics;
25use crate::task::with_slow_future_monitor;
26use crate::types::full_checkpoint_content::{Checkpoint, CheckpointData};
27use async_trait::async_trait;
28
29const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
31
32const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
38
39#[async_trait]
40pub(crate) trait IngestionClientTrait: Send + Sync {
41 async fn fetch(&self, checkpoint: u64) -> FetchResult;
42}
43
44#[derive(clap::Args, Clone, Debug, Default)]
45#[group(required = true)]
46pub struct IngestionClientArgs {
47 #[clap(long, group = "source")]
49 pub remote_store_url: Option<Url>,
50
51 #[clap(long, group = "source")]
54 pub local_ingestion_path: Option<PathBuf>,
55
56 #[clap(long, env, group = "source")]
59 pub rpc_api_url: Option<Url>,
60
61 #[clap(long, env)]
63 pub rpc_username: Option<String>,
64
65 #[clap(long, env)]
67 pub rpc_password: Option<String>,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum FetchError {
72 #[error("Checkpoint not found")]
73 NotFound,
74 #[error("Failed to fetch checkpoint due to {reason}: {error}")]
75 Transient {
76 reason: &'static str,
77 #[source]
78 error: anyhow::Error,
79 },
80 #[error("Permenent error in {reason}: {error}")]
81 Permanent {
82 reason: &'static str,
83 #[source]
84 error: anyhow::Error,
85 },
86}
87
88pub type FetchResult = Result<FetchData, FetchError>;
89
90#[derive(Clone)]
91#[allow(clippy::large_enum_variant)]
92pub enum FetchData {
93 Raw(Bytes),
94 Checkpoint(Checkpoint),
95}
96
97#[derive(Clone)]
98pub struct IngestionClient {
99 client: Arc<dyn IngestionClientTrait>,
100 metrics: Arc<IngestionMetrics>,
102 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
103}
104
105impl IngestionClient {
106 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
108 let client = if let Some(url) = args.remote_store_url.as_ref() {
110 IngestionClient::new_remote(url.clone(), metrics.clone())?
111 } else if let Some(path) = args.local_ingestion_path.as_ref() {
112 IngestionClient::new_local(path.clone(), metrics.clone())
113 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
114 IngestionClient::new_rpc(
115 rpc_api_url.clone(),
116 args.rpc_username,
117 args.rpc_password,
118 metrics.clone(),
119 )?
120 } else {
121 panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
122 };
123
124 Ok(client)
125 }
126
127 pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
130 let client = Arc::new(RemoteIngestionClient::new(url)?);
131 Ok(Self::new_impl(client, metrics))
132 }
133
134 pub fn new_remote_with_timeout(
137 url: Url,
138 timeout: std::time::Duration,
139 metrics: Arc<IngestionMetrics>,
140 ) -> IngestionResult<Self> {
141 let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
142 Ok(Self::new_impl(client, metrics))
143 }
144
145 pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
147 let client = Arc::new(LocalIngestionClient::new(path));
148 Self::new_impl(client, metrics)
149 }
150
151 pub fn new_rpc(
153 url: Url,
154 username: Option<String>,
155 password: Option<String>,
156 metrics: Arc<IngestionMetrics>,
157 ) -> IngestionResult<Self> {
158 let client = if let Some(username) = username {
159 let mut headers = HeadersInterceptor::new();
160 headers.basic_auth(username, password);
161 Client::new(url.to_string())?
162 .with_headers(headers)
163 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
164 } else {
165 Client::new(url.to_string())?
166 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167 };
168 Ok(Self::new_impl(Arc::new(client), metrics))
169 }
170
171 pub(crate) fn new_impl(
172 client: Arc<dyn IngestionClientTrait>,
173 metrics: Arc<IngestionMetrics>,
174 ) -> Self {
175 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
176 metrics.ingested_checkpoint_timestamp_lag.clone(),
177 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
178 metrics.latest_ingested_checkpoint.clone(),
179 );
180 IngestionClient {
181 client,
182 metrics,
183 checkpoint_lag_reporter,
184 }
185 }
186
187 pub async fn wait_for(
193 &self,
194 checkpoint: u64,
195 retry_interval: Duration,
196 ) -> IngestionResult<Arc<Checkpoint>> {
197 let backoff = Constant::new(retry_interval);
198 let fetch = || async move {
199 use backoff::Error as BE;
200 self.fetch(checkpoint).await.map_err(|e| match e {
201 IngestionError::NotFound(checkpoint) => {
202 debug!(checkpoint, "Checkpoint not found, retrying...");
203 self.metrics.total_ingested_not_found_retries.inc();
204 BE::transient(e)
205 }
206 e => BE::permanent(e),
207 })
208 };
209
210 backoff::future::retry(backoff, fetch).await
211 }
212
213 pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
222 let client = self.client.clone();
223 let request = move || {
224 let client = client.clone();
225 async move {
226 let fetch_data = with_slow_future_monitor(
227 client.fetch(checkpoint),
228 SLOW_OPERATION_WARNING_THRESHOLD,
229 || {
231 warn!(
232 checkpoint,
233 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
234 "Slow checkpoint fetch operation detected"
235 );
236 },
237 )
238 .await
239 .map_err(|err| match err {
240 FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
241 FetchError::Transient { reason, error } => self.metrics.inc_retry(
242 checkpoint,
243 reason,
244 IngestionError::FetchError(checkpoint, error),
245 ),
246 FetchError::Permanent { reason, error } => {
247 error!(checkpoint, reason, "Permanent fetch error: {error}");
248 self.metrics
249 .total_ingested_permanent_errors
250 .with_label_values(&[reason])
251 .inc();
252 BE::permanent(IngestionError::FetchError(checkpoint, error))
253 }
254 })?;
255
256 Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
257 FetchData::Raw(bytes) => {
258 self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
259 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
260 self.metrics.inc_retry(
261 checkpoint,
262 "deserialization",
263 IngestionError::DeserializationError(checkpoint, e),
264 )
265 })?;
266 checkpoint.into()
267 }
268 FetchData::Checkpoint(data) => {
269 data
272 }
273 })
274 }
275 };
276
277 let backoff = ExponentialBackoff {
279 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
280 max_elapsed_time: None,
281 ..Default::default()
282 };
283
284 let guard = self.metrics.ingested_checkpoint_latency.start_timer();
285 let data = backoff::future::retry(backoff, request).await?;
286 let elapsed = guard.stop_and_record();
287
288 debug!(
289 checkpoint,
290 elapsed_ms = elapsed * 1000.0,
291 "Fetched checkpoint"
292 );
293
294 self.checkpoint_lag_reporter
295 .report_lag(checkpoint, data.summary.timestamp_ms);
296
297 self.metrics.total_ingested_checkpoints.inc();
298
299 self.metrics
300 .total_ingested_transactions
301 .inc_by(data.transactions.len() as u64);
302
303 self.metrics.total_ingested_events.inc_by(
304 data.transactions
305 .iter()
306 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
307 .sum(),
308 );
309
310 self.metrics
311 .total_ingested_objects
312 .inc_by(data.object_set.len() as u64);
313
314 Ok(Arc::new(data))
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use dashmap::DashMap;
321 use prometheus::Registry;
322 use std::sync::Arc;
323 use std::time::Duration;
324 use tokio::time::timeout;
325 use tokio_util::bytes::Bytes;
326
327 use crate::ingestion::test_utils::test_checkpoint_data;
328
329 use super::*;
330
331 #[derive(Default)]
333 struct MockIngestionClient {
334 checkpoints: DashMap<u64, FetchData>,
335 transient_failures: DashMap<u64, usize>,
336 not_found_failures: DashMap<u64, usize>,
337 permanent_failures: DashMap<u64, usize>,
338 }
339
340 #[async_trait]
341 impl IngestionClientTrait for MockIngestionClient {
342 async fn fetch(&self, checkpoint: u64) -> FetchResult {
343 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
345 && *remaining > 0
346 {
347 *remaining -= 1;
348 return Err(FetchError::NotFound);
349 }
350
351 if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
353 && *remaining > 0
354 {
355 *remaining -= 1;
356 return Err(FetchError::Permanent {
357 reason: "mock_permanent_error",
358 error: anyhow::anyhow!("Mock permanent error"),
359 });
360 }
361
362 if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
364 && *remaining > 0
365 {
366 *remaining -= 1;
367 return Err(FetchError::Transient {
368 reason: "mock_transient_error",
369 error: anyhow::anyhow!("Mock transient error"),
370 });
371 }
372
373 self.checkpoints
375 .get(&checkpoint)
376 .as_deref()
377 .cloned()
378 .ok_or(FetchError::NotFound)
379 }
380 }
381
382 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
383 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
384 let metrics = IngestionMetrics::new(None, ®istry);
385 let mock_client = Arc::new(MockIngestionClient::default());
386 let client = IngestionClient::new_impl(mock_client.clone(), metrics);
387 (client, mock_client)
388 }
389
390 #[tokio::test]
391 async fn test_fetch_raw_bytes_success() {
392 let (client, mock) = setup_test();
393
394 let bytes = Bytes::from(test_checkpoint_data(1));
396 mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
397
398 let result = client.fetch(1).await.unwrap();
400 assert_eq!(result.summary.sequence_number(), &1);
401 }
402
403 #[tokio::test]
404 async fn test_fetch_checkpoint_success() {
405 let (client, mock) = setup_test();
406
407 let bytes = test_checkpoint_data(1);
409 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
410 mock.checkpoints
411 .insert(1, FetchData::Checkpoint(checkpoint.into()));
412
413 let result = client.fetch(1).await.unwrap();
415 assert_eq!(result.summary.sequence_number(), &1);
416 }
417
418 #[tokio::test]
419 async fn test_fetch_not_found() {
420 let (client, _) = setup_test();
421
422 let result = client.fetch(1).await;
424 assert!(matches!(result, Err(IngestionError::NotFound(1))));
425 }
426
427 #[tokio::test]
428 async fn test_fetch_transient_error_with_retry() {
429 let (client, mock) = setup_test();
430
431 let bytes = test_checkpoint_data(1);
433 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
434
435 mock.checkpoints
437 .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
438 mock.transient_failures.insert(1, 2);
439
440 let result = client.fetch(1).await.unwrap();
442 assert_eq!(*result.summary.sequence_number(), 1);
443
444 let retries = client
446 .metrics
447 .total_ingested_transient_retries
448 .with_label_values(&["mock_transient_error"])
449 .get();
450 assert_eq!(retries, 2);
451 }
452
453 #[tokio::test]
454 async fn test_wait_for_checkpoint_with_retry() {
455 let (client, mock) = setup_test();
456
457 let bytes = test_checkpoint_data(1);
459 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
460
461 mock.checkpoints
463 .insert(1, FetchData::Checkpoint(checkpoint.into()));
464 mock.not_found_failures.insert(1, 1);
465
466 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
468 assert_eq!(result.summary.sequence_number(), &1);
469
470 let retries = client.metrics.total_ingested_not_found_retries.get();
472 assert_eq!(retries, 1);
473 }
474
475 #[tokio::test]
476 async fn test_wait_for_checkpoint_instant() {
477 let (client, mock) = setup_test();
478
479 let bytes = test_checkpoint_data(1);
481 let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
482
483 mock.checkpoints
485 .insert(1, FetchData::Checkpoint(checkpoint.into()));
486
487 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
489 assert_eq!(result.summary.sequence_number(), &1);
490 }
491
492 #[tokio::test]
493 async fn test_wait_for_permanent_deserialization_error() {
494 let (client, mock) = setup_test();
495
496 mock.checkpoints
498 .insert(1, FetchData::Raw(Bytes::from("invalid data")));
499
500 timeout(
502 Duration::from_secs(1),
503 client.wait_for(1, Duration::from_millis(50)),
504 )
505 .await
506 .unwrap_err();
507 }
508
509 #[tokio::test]
510 async fn test_fetch_non_retryable_error() {
511 let (client, mock) = setup_test();
512
513 mock.permanent_failures.insert(1, 1);
514
515 let result = client.fetch(1).await;
516 assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
517
518 let errors = client
520 .metrics
521 .total_ingested_permanent_errors
522 .with_label_values(&["mock_permanent_error"])
523 .get();
524 assert_eq!(errors, 1);
525 }
526}