sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use clap::ArgGroup;
14use object_store::ClientOptions;
15use object_store::ObjectStore;
16use object_store::aws::AmazonS3Builder;
17use object_store::azure::MicrosoftAzureBuilder;
18use object_store::gcp::GoogleCloudStorageBuilder;
19use object_store::http::HttpBuilder;
20use object_store::local::LocalFileSystem;
21use sui_futures::future::with_slow_future_monitor;
22use sui_rpc::Client;
23use sui_rpc::client::HeadersInterceptor;
24use tracing::debug;
25use tracing::error;
26use tracing::warn;
27use url::Url;
28
29use crate::ingestion::Error as IngestionError;
30use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
31use crate::ingestion::Result as IngestionResult;
32use crate::ingestion::decode;
33use crate::ingestion::store_client::StoreIngestionClient;
34use crate::metrics::CheckpointLagMetricReporter;
35use crate::metrics::IngestionMetrics;
36use crate::types::full_checkpoint_content::Checkpoint;
37
38/// Wait at most this long between retries for transient errors.
39const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
40
41/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
42///
43/// Operations that take longer than this duration will trigger a warning log, but will
44/// continue executing without being canceled. This helps identify network issues or
45/// slow remote stores without interrupting the ingestion process.
46const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
47
48#[async_trait]
49pub(crate) trait IngestionClientTrait: Send + Sync {
50    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
51}
52
53#[derive(clap::Args, Clone, Debug)]
54#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
55pub struct IngestionClientArgs {
56    /// Remote Store to fetch checkpoints from over HTTP.
57    #[arg(long, group = "source")]
58    pub remote_store_url: Option<Url>,
59
60    /// Fetch checkpoints from AWS S3. Provide the bucket name or endpoint-and-bucket.
61    /// (env: AWS_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
62    #[arg(long, group = "source")]
63    pub remote_store_s3: Option<String>,
64
65    /// Fetch checkpoints from Google Cloud Storage. Provide the bucket name.
66    /// (env: GOOGLE_SERVICE_ACCOUNT_PATH)
67    #[arg(long, group = "source")]
68    pub remote_store_gcs: Option<String>,
69
70    /// Fetch checkpoints from Azure Blob Storage. Provide the container name.
71    /// (env: AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCESS_KEY)
72    #[arg(long, group = "source")]
73    pub remote_store_azure: Option<String>,
74
75    /// Path to the local ingestion directory.
76    #[arg(long, group = "source")]
77    pub local_ingestion_path: Option<PathBuf>,
78
79    /// Sui fullnode gRPC url to fetch checkpoints from.
80    #[arg(long, group = "source")]
81    pub rpc_api_url: Option<Url>,
82
83    /// Optional username for the gRPC service.
84    #[arg(long, env, requires = "rpc_api_url")]
85    pub rpc_username: Option<String>,
86
87    /// Optional password for the gRPC service.
88    #[arg(long, env, requires = "rpc_api_url")]
89    pub rpc_password: Option<String>,
90
91    /// How long to wait for a checkpoint file to be downloaded (milliseconds). Set to 0 to disable
92    /// the timeout.
93    #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
94    pub checkpoint_timeout_ms: u64,
95
96    /// How long to wait while establishing a connection to the checkpoint store (milliseconds).
97    /// Set to 0 to disable the timeout.
98    #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
99    pub checkpoint_connection_timeout_ms: u64,
100}
101
102impl Default for IngestionClientArgs {
103    fn default() -> Self {
104        Self {
105            remote_store_url: None,
106            remote_store_s3: None,
107            remote_store_gcs: None,
108            remote_store_azure: None,
109            local_ingestion_path: None,
110            rpc_api_url: None,
111            rpc_username: None,
112            rpc_password: None,
113            checkpoint_timeout_ms: 120_000,
114            checkpoint_connection_timeout_ms: 120_000,
115        }
116    }
117}
118
119impl IngestionClientArgs {
120    fn client_options(&self) -> ClientOptions {
121        let mut options = ClientOptions::default();
122        options = if self.checkpoint_timeout_ms == 0 {
123            options.with_timeout_disabled()
124        } else {
125            let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
126            options.with_timeout(timeout)
127        };
128        options = if self.checkpoint_connection_timeout_ms == 0 {
129            options.with_connect_timeout_disabled()
130        } else {
131            let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
132            options.with_connect_timeout(timeout)
133        };
134        options
135    }
136}
137
138#[derive(thiserror::Error, Debug)]
139pub enum CheckpointError {
140    #[error("Checkpoint not found")]
141    NotFound,
142    #[error("Failed to fetch checkpoint due to {reason}: {error}")]
143    Transient {
144        reason: &'static str,
145        #[source]
146        error: anyhow::Error,
147    },
148    #[error("Permanent error in {reason}: {error}")]
149    Permanent {
150        reason: &'static str,
151        #[source]
152        error: anyhow::Error,
153    },
154}
155
156pub type CheckpointResult = Result<CheckpointData, CheckpointError>;
157
158#[derive(Clone)]
159#[allow(clippy::large_enum_variant)]
160pub enum CheckpointData {
161    Raw(Bytes),
162    Checkpoint(Checkpoint),
163}
164
165#[derive(Clone)]
166pub struct IngestionClient {
167    client: Arc<dyn IngestionClientTrait>,
168    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
169    metrics: Arc<IngestionMetrics>,
170    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
171}
172
173impl IngestionClient {
174    /// Construct a new ingestion client. Its source is determined by `args`.
175    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
176        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
177        let retry = super::store_client::retry_config();
178        let client = if let Some(url) = args.remote_store_url.as_ref() {
179            let store = HttpBuilder::new()
180                .with_url(url.to_string())
181                .with_client_options(args.client_options().with_allow_http(true))
182                .with_retry(retry)
183                .build()
184                .map(Arc::new)?;
185            IngestionClient::with_store(store, metrics.clone())?
186        } else if let Some(bucket) = args.remote_store_s3.as_ref() {
187            let store = AmazonS3Builder::from_env()
188                .with_client_options(args.client_options())
189                .with_retry(retry)
190                .with_imdsv1_fallback()
191                .with_bucket_name(bucket)
192                .build()
193                .map(Arc::new)?;
194            IngestionClient::with_store(store, metrics.clone())?
195        } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
196            let store = GoogleCloudStorageBuilder::from_env()
197                .with_client_options(args.client_options())
198                .with_retry(retry)
199                .with_bucket_name(bucket)
200                .build()
201                .map(Arc::new)?;
202            IngestionClient::with_store(store, metrics.clone())?
203        } else if let Some(container) = args.remote_store_azure.as_ref() {
204            let store = MicrosoftAzureBuilder::from_env()
205                .with_client_options(args.client_options())
206                .with_retry(retry)
207                .with_container_name(container)
208                .build()
209                .map(Arc::new)?;
210            IngestionClient::with_store(store, metrics.clone())?
211        } else if let Some(path) = args.local_ingestion_path.as_ref() {
212            let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
213            IngestionClient::with_store(store, metrics.clone())?
214        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
215            IngestionClient::with_grpc(
216                rpc_api_url.clone(),
217                args.rpc_username,
218                args.rpc_password,
219                metrics.clone(),
220            )?
221        } else {
222            panic!(
223                "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
224                local_ingestion_path or rpc_api_url must be provided"
225            );
226        };
227
228        Ok(client)
229    }
230
231    /// An ingestion client that fetches checkpoints from a remote object store.
232    pub fn with_store(
233        store: Arc<dyn ObjectStore>,
234        metrics: Arc<IngestionMetrics>,
235    ) -> IngestionResult<Self> {
236        let client = Arc::new(StoreIngestionClient::new(store));
237        Ok(Self::new_impl(client, metrics))
238    }
239
240    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
241    pub fn with_grpc(
242        url: Url,
243        username: Option<String>,
244        password: Option<String>,
245        metrics: Arc<IngestionMetrics>,
246    ) -> IngestionResult<Self> {
247        let client = if let Some(username) = username {
248            let mut headers = HeadersInterceptor::new();
249            headers.basic_auth(username, password);
250            Client::new(url.to_string())?
251                .with_headers(headers)
252                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
253        } else {
254            Client::new(url.to_string())?
255                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
256        };
257        Ok(Self::new_impl(Arc::new(client), metrics))
258    }
259
260    pub(crate) fn new_impl(
261        client: Arc<dyn IngestionClientTrait>,
262        metrics: Arc<IngestionMetrics>,
263    ) -> Self {
264        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
265            metrics.ingested_checkpoint_timestamp_lag.clone(),
266            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
267            metrics.latest_ingested_checkpoint.clone(),
268        );
269        IngestionClient {
270            client,
271            metrics,
272            checkpoint_lag_reporter,
273        }
274    }
275
276    /// Fetch checkpoint data by sequence number.
277    ///
278    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
279    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
280    /// by the `retry_interval` parameter.
281    pub async fn wait_for(
282        &self,
283        checkpoint: u64,
284        retry_interval: Duration,
285    ) -> IngestionResult<Arc<Checkpoint>> {
286        let backoff = Constant::new(retry_interval);
287        let fetch = || async move {
288            use backoff::Error as BE;
289            self.checkpoint(checkpoint).await.map_err(|e| match e {
290                IngestionError::NotFound(checkpoint) => {
291                    debug!(checkpoint, "Checkpoint not found, retrying...");
292                    self.metrics.total_ingested_not_found_retries.inc();
293                    BE::transient(e)
294                }
295                e => BE::permanent(e),
296            })
297        };
298
299        backoff::future::retry(backoff, fetch).await
300    }
301
302    /// Fetch checkpoint data by sequence number.
303    ///
304    /// Repeatedly retries transient errors with an exponential backoff (up to
305    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are either defined by the client
306    /// implementation that returns a [CheckpointError::Transient] error variant, or within
307    /// this function if we fail to deserialize the result as [Checkpoint].
308    ///
309    /// The function will immediately return if the checkpoint is not found.
310    pub async fn checkpoint(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
311        let client = self.client.clone();
312        let request = move || {
313            let client = client.clone();
314            async move {
315                let fetch_data = with_slow_future_monitor(
316                    client.checkpoint(checkpoint),
317                    SLOW_OPERATION_WARNING_THRESHOLD,
318                    /* on_threshold_exceeded =*/
319                    || {
320                        warn!(
321                            checkpoint,
322                            threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
323                            "Slow checkpoint fetch operation detected"
324                        );
325                    },
326                )
327                .await
328                .map_err(|err| match err {
329                    CheckpointError::NotFound => {
330                        BE::permanent(IngestionError::NotFound(checkpoint))
331                    }
332                    CheckpointError::Transient { reason, error } => self.metrics.inc_retry(
333                        checkpoint,
334                        reason,
335                        IngestionError::FetchError(checkpoint, error),
336                    ),
337                    CheckpointError::Permanent { reason, error } => {
338                        error!(checkpoint, reason, "Permanent fetch error: {error}");
339                        self.metrics
340                            .total_ingested_permanent_errors
341                            .with_label_values(&[reason])
342                            .inc();
343                        BE::permanent(IngestionError::FetchError(checkpoint, error))
344                    }
345                })?;
346
347                Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
348                    CheckpointData::Raw(bytes) => {
349                        self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
350
351                        decode::checkpoint(&bytes).map_err(|e| {
352                            self.metrics.inc_retry(
353                                checkpoint,
354                                e.reason(),
355                                IngestionError::DeserializationError(checkpoint, e.into()),
356                            )
357                        })?
358                    }
359                    CheckpointData::Checkpoint(data) => {
360                        // We are not recording size metric for Checkpoint data (from RPC client).
361                        // TODO: Record the metric when we have a good way to get the size information
362                        data
363                    }
364                })
365            }
366        };
367
368        // Keep backing off until we are waiting for the max interval, but don't give up.
369        let backoff = ExponentialBackoff {
370            max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
371            max_elapsed_time: None,
372            ..Default::default()
373        };
374
375        let guard = self.metrics.ingested_checkpoint_latency.start_timer();
376        let data = backoff::future::retry(backoff, request).await?;
377        let elapsed = guard.stop_and_record();
378
379        debug!(
380            checkpoint,
381            elapsed_ms = elapsed * 1000.0,
382            "Fetched checkpoint"
383        );
384
385        self.checkpoint_lag_reporter
386            .report_lag(checkpoint, data.summary.timestamp_ms);
387
388        self.metrics.total_ingested_checkpoints.inc();
389
390        self.metrics
391            .total_ingested_transactions
392            .inc_by(data.transactions.len() as u64);
393
394        self.metrics.total_ingested_events.inc_by(
395            data.transactions
396                .iter()
397                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
398                .sum(),
399        );
400
401        self.metrics
402            .total_ingested_objects
403            .inc_by(data.object_set.len() as u64);
404
405        Ok(Arc::new(data))
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use clap::Parser;
412    use clap::error::ErrorKind;
413    use dashmap::DashMap;
414    use prometheus::Registry;
415    use std::sync::Arc;
416    use std::time::Duration;
417    use tokio::time::timeout;
418
419    use crate::ingestion::test_utils::test_checkpoint_data;
420
421    use super::*;
422
423    #[derive(Debug, Parser)]
424    struct TestArgs {
425        #[clap(flatten)]
426        ingestion: IngestionClientArgs,
427    }
428
429    /// Mock implementation of IngestionClientTrait for testing
430    #[derive(Default)]
431    struct MockIngestionClient {
432        checkpoints: DashMap<u64, CheckpointData>,
433        transient_failures: DashMap<u64, usize>,
434        not_found_failures: DashMap<u64, usize>,
435        permanent_failures: DashMap<u64, usize>,
436    }
437
438    #[async_trait]
439    impl IngestionClientTrait for MockIngestionClient {
440        async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
441            // Check for not found failures
442            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
443                && *remaining > 0
444            {
445                *remaining -= 1;
446                return Err(CheckpointError::NotFound);
447            }
448
449            // Check for non-retryable failures
450            if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
451                && *remaining > 0
452            {
453                *remaining -= 1;
454                return Err(CheckpointError::Permanent {
455                    reason: "mock_permanent_error",
456                    error: anyhow::anyhow!("Mock permanent error"),
457                });
458            }
459
460            // Check for transient failures
461            if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
462                && *remaining > 0
463            {
464                *remaining -= 1;
465                return Err(CheckpointError::Transient {
466                    reason: "mock_transient_error",
467                    error: anyhow::anyhow!("Mock transient error"),
468                });
469            }
470
471            // Return the checkpoint data if it exists
472            self.checkpoints
473                .get(&checkpoint)
474                .as_deref()
475                .cloned()
476                .ok_or(CheckpointError::NotFound)
477        }
478    }
479
480    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
481        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
482        let metrics = IngestionMetrics::new(None, &registry);
483        let mock_client = Arc::new(MockIngestionClient::default());
484        let client = IngestionClient::new_impl(mock_client.clone(), metrics);
485        (client, mock_client)
486    }
487
488    #[test]
489    fn test_args_multiple_ingestion_sources_are_rejected() {
490        let err = TestArgs::try_parse_from([
491            "cmd",
492            "--remote-store-url",
493            "https://example.com",
494            "--rpc-api-url",
495            "http://localhost:8080",
496        ])
497        .unwrap_err();
498
499        assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
500    }
501
502    #[test]
503    fn test_args_optional_credentials() {
504        let args = TestArgs::try_parse_from([
505            "cmd",
506            "--rpc-api-url",
507            "http://localhost:8080",
508            "--rpc-username",
509            "alice",
510            "--rpc-password",
511            "secret",
512        ])
513        .unwrap();
514
515        assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
516        assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
517        assert_eq!(
518            args.ingestion.rpc_api_url,
519            Some(Url::parse("http://localhost:8080").unwrap())
520        );
521    }
522
523    #[test]
524    fn test_args_credentials_require_rpc_url() {
525        let err = TestArgs::try_parse_from([
526            "cmd",
527            "--rpc-username",
528            "alice",
529            "--rpc-password",
530            "secret",
531        ])
532        .unwrap_err();
533
534        assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
535    }
536
537    #[tokio::test]
538    async fn test_fetch_raw_bytes_success() {
539        let (client, mock) = setup_test();
540
541        // Create test data using test_checkpoint
542        let bytes = Bytes::from(test_checkpoint_data(1));
543        mock.checkpoints
544            .insert(1, CheckpointData::Raw(bytes.clone()));
545
546        // Fetch and verify
547        let result = client.checkpoint(1).await.unwrap();
548        assert_eq!(result.summary.sequence_number(), &1);
549    }
550
551    #[tokio::test]
552    async fn test_fetch_checkpoint_success() {
553        let (client, mock) = setup_test();
554
555        // Create test data - now returns zstd-compressed protobuf
556        let bytes = Bytes::from(test_checkpoint_data(1));
557        mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
558
559        // Fetch and verify
560        let result = client.checkpoint(1).await.unwrap();
561        assert_eq!(result.summary.sequence_number(), &1);
562    }
563
564    #[tokio::test]
565    async fn test_fetch_not_found() {
566        let (client, _) = setup_test();
567
568        // Try to fetch non-existent checkpoint
569        let result = client.checkpoint(1).await;
570        assert!(matches!(result, Err(IngestionError::NotFound(1))));
571    }
572
573    #[tokio::test]
574    async fn test_fetch_transient_error_with_retry() {
575        let (client, mock) = setup_test();
576
577        // Create test data using test_checkpoint
578        let bytes = Bytes::from(test_checkpoint_data(1));
579
580        // Add checkpoint to mock with 2 transient failures
581        mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
582        mock.transient_failures.insert(1, 2);
583
584        // Fetch and verify it succeeds after retries
585        let result = client.checkpoint(1).await.unwrap();
586        assert_eq!(*result.summary.sequence_number(), 1);
587
588        // Verify that exactly 2 retries were recorded
589        let retries = client
590            .metrics
591            .total_ingested_transient_retries
592            .with_label_values(&["mock_transient_error"])
593            .get();
594        assert_eq!(retries, 2);
595    }
596
597    #[tokio::test]
598    async fn test_wait_for_checkpoint_with_retry() {
599        let (client, mock) = setup_test();
600
601        // Create test data - now returns zstd-compressed protobuf
602        let bytes = Bytes::from(test_checkpoint_data(1));
603
604        // Add checkpoint to mock with 1 not_found failures
605        mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
606        mock.not_found_failures.insert(1, 1);
607
608        // Wait for checkpoint with short retry interval
609        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
610        assert_eq!(result.summary.sequence_number(), &1);
611
612        // Verify that exactly 1 retry was recorded
613        let retries = client.metrics.total_ingested_not_found_retries.get();
614        assert_eq!(retries, 1);
615    }
616
617    #[tokio::test]
618    async fn test_wait_for_checkpoint_instant() {
619        let (client, mock) = setup_test();
620
621        // Create test data using test_checkpoint
622        let bytes = Bytes::from(test_checkpoint_data(1));
623
624        // Add checkpoint to mock with no failures - data should be available immediately
625        mock.checkpoints.insert(1, CheckpointData::Raw(bytes));
626
627        // Wait for checkpoint with short retry interval
628        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
629        assert_eq!(result.summary.sequence_number(), &1);
630    }
631
632    #[tokio::test]
633    async fn test_wait_for_permanent_deserialization_error() {
634        let (client, mock) = setup_test();
635
636        // Add invalid data that will cause a deserialization error
637        mock.checkpoints
638            .insert(1, CheckpointData::Raw(Bytes::from("invalid data")));
639
640        // wait_for should keep retrying on deserialization errors and timeout
641        timeout(
642            Duration::from_secs(1),
643            client.wait_for(1, Duration::from_millis(50)),
644        )
645        .await
646        .unwrap_err();
647    }
648
649    #[tokio::test]
650    async fn test_fetch_non_retryable_error() {
651        let (client, mock) = setup_test();
652
653        mock.permanent_failures.insert(1, 1);
654
655        let result = client.checkpoint(1).await;
656        assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
657
658        // Verify that the non-retryable error metric was incremented
659        let errors = client
660            .metrics
661            .total_ingested_permanent_errors
662            .with_label_values(&["mock_permanent_error"])
663            .get();
664        assert_eq!(errors, 1);
665    }
666}