1use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use clap::ArgGroup;
14use object_store::ClientOptions;
15use object_store::ObjectStore;
16use object_store::aws::AmazonS3Builder;
17use object_store::azure::MicrosoftAzureBuilder;
18use object_store::gcp::GoogleCloudStorageBuilder;
19use object_store::http::HttpBuilder;
20use object_store::local::LocalFileSystem;
21use sui_futures::future::with_slow_future_monitor;
22use sui_rpc::Client;
23use sui_rpc::client::HeadersInterceptor;
24use tracing::debug;
25use tracing::error;
26use tracing::warn;
27use url::Url;
28
29use crate::ingestion::Error as IngestionError;
30use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
31use crate::ingestion::Result as IngestionResult;
32use crate::ingestion::decode;
33use crate::ingestion::store_client::StoreIngestionClient;
34use crate::metrics::CheckpointLagMetricReporter;
35use crate::metrics::IngestionMetrics;
36use crate::types::full_checkpoint_content::Checkpoint;
37
38const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
40
41const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
47
48#[async_trait]
49pub(crate) trait IngestionClientTrait: Send + Sync {
50 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
51}
52
53#[derive(clap::Args, Clone, Debug)]
54#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
55pub struct IngestionClientArgs {
56 #[arg(long, group = "source")]
58 pub remote_store_url: Option<Url>,
59
60 #[arg(long, group = "source")]
63 pub remote_store_s3: Option<String>,
64
65 #[arg(long, group = "source")]
68 pub remote_store_gcs: Option<String>,
69
70 #[arg(long, group = "source")]
73 pub remote_store_azure: Option<String>,
74
75 #[arg(long, group = "source")]
77 pub local_ingestion_path: Option<PathBuf>,
78
79 #[arg(long, group = "source")]
81 pub rpc_api_url: Option<Url>,
82
83 #[arg(long, env, requires = "rpc_api_url")]
85 pub rpc_username: Option<String>,
86
87 #[arg(long, env, requires = "rpc_api_url")]
89 pub rpc_password: Option<String>,
90
91 #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
94 pub checkpoint_timeout_ms: u64,
95
96 #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
99 pub checkpoint_connection_timeout_ms: u64,
100}
101
102impl Default for IngestionClientArgs {
103 fn default() -> Self {
104 Self {
105 remote_store_url: None,
106 remote_store_s3: None,
107 remote_store_gcs: None,
108 remote_store_azure: None,
109 local_ingestion_path: None,
110 rpc_api_url: None,
111 rpc_username: None,
112 rpc_password: None,
113 checkpoint_timeout_ms: 120_000,
114 checkpoint_connection_timeout_ms: 120_000,
115 }
116 }
117}
118
119impl IngestionClientArgs {
120 fn client_options(&self) -> ClientOptions {
121 let mut options = ClientOptions::default();
122 options = if self.checkpoint_timeout_ms == 0 {
123 options.with_timeout_disabled()
124 } else {
125 let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
126 options.with_timeout(timeout)
127 };
128 options = if self.checkpoint_connection_timeout_ms == 0 {
129 options.with_connect_timeout_disabled()
130 } else {
131 let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
132 options.with_connect_timeout(timeout)
133 };
134 options
135 }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum CheckpointError {
140 #[error("Checkpoint not found")]
141 NotFound,
142 #[error("Failed to fetch checkpoint due to {reason}: {error}")]
143 Transient {
144 reason: &'static str,
145 #[source]
146 error: anyhow::Error,
147 },
148 #[error("Permanent error in {reason}: {error}")]
149 Permanent {
150 reason: &'static str,
151 #[source]
152 error: anyhow::Error,
153 },
154}
155
156pub type CheckpointResult = Result<CheckpointData, CheckpointError>;
157
158#[derive(Clone)]
159#[allow(clippy::large_enum_variant)]
160pub enum CheckpointData {
161 Raw(Bytes),
162 Checkpoint(Checkpoint),
163}
164
165#[derive(Clone)]
166pub struct IngestionClient {
167 client: Arc<dyn IngestionClientTrait>,
168 metrics: Arc<IngestionMetrics>,
170 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
171}
172
173impl IngestionClient {
174 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
176 let retry = super::store_client::retry_config();
178 let client = if let Some(url) = args.remote_store_url.as_ref() {
179 let store = HttpBuilder::new()
180 .with_url(url.to_string())
181 .with_client_options(args.client_options().with_allow_http(true))
182 .with_retry(retry)
183 .build()
184 .map(Arc::new)?;
185 IngestionClient::with_store(store, metrics.clone())?
186 } else if let Some(bucket) = args.remote_store_s3.as_ref() {
187 let store = AmazonS3Builder::from_env()
188 .with_client_options(args.client_options())
189 .with_retry(retry)
190 .with_imdsv1_fallback()
191 .with_bucket_name(bucket)
192 .build()
193 .map(Arc::new)?;
194 IngestionClient::with_store(store, metrics.clone())?
195 } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
196 let store = GoogleCloudStorageBuilder::from_env()
197 .with_client_options(args.client_options())
198 .with_retry(retry)
199 .with_bucket_name(bucket)
200 .build()
201 .map(Arc::new)?;
202 IngestionClient::with_store(store, metrics.clone())?
203 } else if let Some(container) = args.remote_store_azure.as_ref() {
204 let store = MicrosoftAzureBuilder::from_env()
205 .with_client_options(args.client_options())
206 .with_retry(retry)
207 .with_container_name(container)
208 .build()
209 .map(Arc::new)?;
210 IngestionClient::with_store(store, metrics.clone())?
211 } else if let Some(path) = args.local_ingestion_path.as_ref() {
212 let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
213 IngestionClient::with_store(store, metrics.clone())?
214 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
215 IngestionClient::with_grpc(
216 rpc_api_url.clone(),
217 args.rpc_username,
218 args.rpc_password,
219 metrics.clone(),
220 )?
221 } else {
222 panic!(
223 "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
224 local_ingestion_path or rpc_api_url must be provided"
225 );
226 };
227
228 Ok(client)
229 }
230
231 pub fn with_store(
233 store: Arc<dyn ObjectStore>,
234 metrics: Arc<IngestionMetrics>,
235 ) -> IngestionResult<Self> {
236 let client = Arc::new(StoreIngestionClient::new(store));
237 Ok(Self::new_impl(client, metrics))
238 }
239
240 pub fn with_grpc(
242 url: Url,
243 username: Option<String>,
244 password: Option<String>,
245 metrics: Arc<IngestionMetrics>,
246 ) -> IngestionResult<Self> {
247 let client = if let Some(username) = username {
248 let mut headers = HeadersInterceptor::new();
249 headers.basic_auth(username, password);
250 Client::new(url.to_string())?
251 .with_headers(headers)
252 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
253 } else {
254 Client::new(url.to_string())?
255 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
256 };
257 Ok(Self::new_impl(Arc::new(client), metrics))
258 }
259
260 pub(crate) fn new_impl(
261 client: Arc<dyn IngestionClientTrait>,
262 metrics: Arc<IngestionMetrics>,
263 ) -> Self {
264 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
265 metrics.ingested_checkpoint_timestamp_lag.clone(),
266 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
267 metrics.latest_ingested_checkpoint.clone(),
268 );
269 IngestionClient {
270 client,
271 metrics,
272 checkpoint_lag_reporter,
273 }
274 }
275
276 pub async fn wait_for(
282 &self,
283 checkpoint: u64,
284 retry_interval: Duration,
285 ) -> IngestionResult<Arc<Checkpoint>> {
286 let backoff = Constant::new(retry_interval);
287 let fetch = || async move {
288 use backoff::Error as BE;
289 self.checkpoint(checkpoint).await.map_err(|e| match e {
290 IngestionError::NotFound(checkpoint) => {
291 debug!(checkpoint, "Checkpoint not found, retrying...");
292 self.metrics.total_ingested_not_found_retries.inc();
293 BE::transient(e)
294 }
295 e => BE::permanent(e),
296 })
297 };
298
299 backoff::future::retry(backoff, fetch).await
300 }
301
302 pub async fn checkpoint(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
311 let client = self.client.clone();
312 let request = move || {
313 let client = client.clone();
314 async move {
315 let fetch_data = with_slow_future_monitor(
316 client.checkpoint(checkpoint),
317 SLOW_OPERATION_WARNING_THRESHOLD,
318 || {
320 warn!(
321 checkpoint,
322 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
323 "Slow checkpoint fetch operation detected"
324 );
325 },
326 )
327 .await
328 .map_err(|err| match err {
329 CheckpointError::NotFound => {
330 BE::permanent(IngestionError::NotFound(checkpoint))
331 }
332 CheckpointError::Transient { reason, error } => self.metrics.inc_retry(
333 checkpoint,
334 reason,
335 IngestionError::FetchError(checkpoint, error),
336 ),
337 CheckpointError::Permanent { reason, error } => {
338 error!(checkpoint, reason, "Permanent fetch error: {error}");
339 self.metrics
340 .total_ingested_permanent_errors
341 .with_label_values(&[reason])
342 .inc();
343 BE::permanent(IngestionError::FetchError(checkpoint, error))
344 }
345 })?;
346
347 Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
348 CheckpointData::Raw(bytes) => {
349 self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
350
351 decode::checkpoint(&bytes).map_err(|e| {
352 self.metrics.inc_retry(
353 checkpoint,
354 e.reason(),
355 IngestionError::DeserializationError(checkpoint, e.into()),
356 )
357 })?
358 }
359 CheckpointData::Checkpoint(data) => {
360 data
363 }
364 })
365 }
366 };
367
368 let backoff = ExponentialBackoff {
370 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
371 max_elapsed_time: None,
372 ..Default::default()
373 };
374
375 let guard = self.metrics.ingested_checkpoint_latency.start_timer();
376 let data = backoff::future::retry(backoff, request).await?;
377 let elapsed = guard.stop_and_record();
378
379 debug!(
380 checkpoint,
381 elapsed_ms = elapsed * 1000.0,
382 "Fetched checkpoint"
383 );
384
385 self.checkpoint_lag_reporter
386 .report_lag(checkpoint, data.summary.timestamp_ms);
387
388 self.metrics.total_ingested_checkpoints.inc();
389
390 self.metrics
391 .total_ingested_transactions
392 .inc_by(data.transactions.len() as u64);
393
394 self.metrics.total_ingested_events.inc_by(
395 data.transactions
396 .iter()
397 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
398 .sum(),
399 );
400
401 self.metrics
402 .total_ingested_objects
403 .inc_by(data.object_set.len() as u64);
404
405 Ok(Arc::new(data))
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use clap::Parser;
412 use clap::error::ErrorKind;
413 use dashmap::DashMap;
414 use prometheus::Registry;
415 use std::sync::Arc;
416 use std::time::Duration;
417 use tokio::time::timeout;
418
419 use crate::ingestion::test_utils::test_checkpoint_data;
420
421 use super::*;
422
423 #[derive(Debug, Parser)]
424 struct TestArgs {
425 #[clap(flatten)]
426 ingestion: IngestionClientArgs,
427 }
428
429 #[derive(Default)]
431 struct MockIngestionClient {
432 checkpoints: DashMap<u64, CheckpointData>,
433 transient_failures: DashMap<u64, usize>,
434 not_found_failures: DashMap<u64, usize>,
435 permanent_failures: DashMap<u64, usize>,
436 }
437
438 #[async_trait]
439 impl IngestionClientTrait for MockIngestionClient {
440 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
441 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
443 && *remaining > 0
444 {
445 *remaining -= 1;
446 return Err(CheckpointError::NotFound);
447 }
448
449 if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
451 && *remaining > 0
452 {
453 *remaining -= 1;
454 return Err(CheckpointError::Permanent {
455 reason: "mock_permanent_error",
456 error: anyhow::anyhow!("Mock permanent error"),
457 });
458 }
459
460 if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
462 && *remaining > 0
463 {
464 *remaining -= 1;
465 return Err(CheckpointError::Transient {
466 reason: "mock_transient_error",
467 error: anyhow::anyhow!("Mock transient error"),
468 });
469 }
470
471 self.checkpoints
473 .get(&checkpoint)
474 .as_deref()
475 .cloned()
476 .ok_or(CheckpointError::NotFound)
477 }
478 }
479
480 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
481 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
482 let metrics = IngestionMetrics::new(None, ®istry);
483 let mock_client = Arc::new(MockIngestionClient::default());
484 let client = IngestionClient::new_impl(mock_client.clone(), metrics);
485 (client, mock_client)
486 }
487
488 #[test]
489 fn test_args_multiple_ingestion_sources_are_rejected() {
490 let err = TestArgs::try_parse_from([
491 "cmd",
492 "--remote-store-url",
493 "https://example.com",
494 "--rpc-api-url",
495 "http://localhost:8080",
496 ])
497 .unwrap_err();
498
499 assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
500 }
501
502 #[test]
503 fn test_args_optional_credentials() {
504 let args = TestArgs::try_parse_from([
505 "cmd",
506 "--rpc-api-url",
507 "http://localhost:8080",
508 "--rpc-username",
509 "alice",
510 "--rpc-password",
511 "secret",
512 ])
513 .unwrap();
514
515 assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
516 assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
517 assert_eq!(
518 args.ingestion.rpc_api_url,
519 Some(Url::parse("http://localhost:8080").unwrap())
520 );
521 }
522
523 #[test]
524 fn test_args_credentials_require_rpc_url() {
525 let err = TestArgs::try_parse_from([
526 "cmd",
527 "--rpc-username",
528 "alice",
529 "--rpc-password",
530 "secret",
531 ])
532 .unwrap_err();
533
534 assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
535 }
536
537 #[tokio::test]
538 async fn test_fetch_raw_bytes_success() {
539 let (client, mock) = setup_test();
540
541 let bytes = Bytes::from(test_checkpoint_data(1));
543 mock.checkpoints
544 .insert(1, CheckpointData::Raw(bytes.clone()));
545
546 let result = client.checkpoint(1).await.unwrap();
548 assert_eq!(result.summary.sequence_number(), &1);
549 }
550
551 #[tokio::test]
552 async fn test_fetch_checkpoint_success() {
553 let (client, mock) = setup_test();
554
555 let bytes = Bytes::from(test_checkpoint_data(1));
557 mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
558
559 let result = client.checkpoint(1).await.unwrap();
561 assert_eq!(result.summary.sequence_number(), &1);
562 }
563
564 #[tokio::test]
565 async fn test_fetch_not_found() {
566 let (client, _) = setup_test();
567
568 let result = client.checkpoint(1).await;
570 assert!(matches!(result, Err(IngestionError::NotFound(1))));
571 }
572
573 #[tokio::test]
574 async fn test_fetch_transient_error_with_retry() {
575 let (client, mock) = setup_test();
576
577 let bytes = Bytes::from(test_checkpoint_data(1));
579
580 mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
582 mock.transient_failures.insert(1, 2);
583
584 let result = client.checkpoint(1).await.unwrap();
586 assert_eq!(*result.summary.sequence_number(), 1);
587
588 let retries = client
590 .metrics
591 .total_ingested_transient_retries
592 .with_label_values(&["mock_transient_error"])
593 .get();
594 assert_eq!(retries, 2);
595 }
596
597 #[tokio::test]
598 async fn test_wait_for_checkpoint_with_retry() {
599 let (client, mock) = setup_test();
600
601 let bytes = Bytes::from(test_checkpoint_data(1));
603
604 mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
606 mock.not_found_failures.insert(1, 1);
607
608 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
610 assert_eq!(result.summary.sequence_number(), &1);
611
612 let retries = client.metrics.total_ingested_not_found_retries.get();
614 assert_eq!(retries, 1);
615 }
616
617 #[tokio::test]
618 async fn test_wait_for_checkpoint_instant() {
619 let (client, mock) = setup_test();
620
621 let bytes = Bytes::from(test_checkpoint_data(1));
623
624 mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
626
627 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
629 assert_eq!(result.summary.sequence_number(), &1);
630 }
631
632 #[tokio::test]
633 async fn test_wait_for_permanent_deserialization_error() {
634 let (client, mock) = setup_test();
635
636 mock.checkpoints
638 .insert(1, CheckpointData::Raw(Bytes::from("invalid data")));
639
640 timeout(
642 Duration::from_secs(1),
643 client.wait_for(1, Duration::from_millis(50)),
644 )
645 .await
646 .unwrap_err();
647 }
648
649 #[tokio::test]
650 async fn test_fetch_non_retryable_error() {
651 let (client, mock) = setup_test();
652
653 mock.permanent_failures.insert(1, 1);
654
655 let result = client.checkpoint(1).await;
656 assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
657
658 let errors = client
660 .metrics
661 .total_ingested_permanent_errors
662 .with_label_values(&["mock_permanent_error"])
663 .get();
664 assert_eq!(errors, 1);
665 }
666}