sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use sui_futures::future::with_slow_future_monitor;
14use sui_rpc::Client;
15use sui_rpc::client::HeadersInterceptor;
16use sui_storage::blob::Blob;
17use tracing::debug;
18use tracing::error;
19use tracing::warn;
20use url::Url;
21
22use crate::ingestion::Error as IngestionError;
23use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
24use crate::ingestion::Result as IngestionResult;
25use crate::ingestion::local_client::LocalIngestionClient;
26use crate::ingestion::remote_client::RemoteIngestionClient;
27use crate::metrics::CheckpointLagMetricReporter;
28use crate::metrics::IngestionMetrics;
29use crate::types::full_checkpoint_content::Checkpoint;
30use crate::types::full_checkpoint_content::CheckpointData;
31
32/// Wait at most this long between retries for transient errors.
33const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
34
35/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
36///
37/// Operations that take longer than this duration will trigger a warning log, but will
38/// continue executing without being canceled. This helps identify network issues or
39/// slow remote stores without interrupting the ingestion process.
40const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
41
42#[async_trait]
43pub(crate) trait IngestionClientTrait: Send + Sync {
44    async fn fetch(&self, checkpoint: u64) -> FetchResult;
45}
46
47#[derive(clap::Args, Clone, Debug, Default)]
48#[group(required = true)]
49pub struct IngestionClientArgs {
50    /// Remote Store to fetch checkpoints from.
51    #[clap(long, group = "source")]
52    pub remote_store_url: Option<Url>,
53
54    /// Path to the local ingestion directory.
55    /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
56    #[clap(long, group = "source")]
57    pub local_ingestion_path: Option<PathBuf>,
58
59    /// Sui fullnode gRPC url to fetch checkpoints from.
60    /// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used.
61    #[clap(long, env, group = "source")]
62    pub rpc_api_url: Option<Url>,
63
64    /// Optional username for the gRPC service.
65    #[clap(long, env)]
66    pub rpc_username: Option<String>,
67
68    /// Optional password for the gRPC service.
69    #[clap(long, env)]
70    pub rpc_password: Option<String>,
71}
72
73#[derive(thiserror::Error, Debug)]
74pub enum FetchError {
75    #[error("Checkpoint not found")]
76    NotFound,
77    #[error("Failed to fetch checkpoint due to {reason}: {error}")]
78    Transient {
79        reason: &'static str,
80        #[source]
81        error: anyhow::Error,
82    },
83    #[error("Permanent error in {reason}: {error}")]
84    Permanent {
85        reason: &'static str,
86        #[source]
87        error: anyhow::Error,
88    },
89}
90
91pub type FetchResult = Result<FetchData, FetchError>;
92
93#[derive(Clone)]
94#[allow(clippy::large_enum_variant)]
95pub enum FetchData {
96    Raw(Bytes),
97    Checkpoint(Checkpoint),
98}
99
100#[derive(Clone)]
101pub struct IngestionClient {
102    client: Arc<dyn IngestionClientTrait>,
103    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
104    metrics: Arc<IngestionMetrics>,
105    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
106}
107
108impl IngestionClient {
109    /// Construct a new ingestion client. Its source is determined by `args`.
110    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
111        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
112        let client = if let Some(url) = args.remote_store_url.as_ref() {
113            IngestionClient::new_remote(url.clone(), metrics.clone())?
114        } else if let Some(path) = args.local_ingestion_path.as_ref() {
115            IngestionClient::new_local(path.clone(), metrics.clone())
116        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
117            IngestionClient::new_rpc(
118                rpc_api_url.clone(),
119                args.rpc_username,
120                args.rpc_password,
121                metrics.clone(),
122            )?
123        } else {
124            panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
125        };
126
127        Ok(client)
128    }
129
130    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
131    /// HTTP).
132    pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
133        let client = Arc::new(RemoteIngestionClient::new(url)?);
134        Ok(Self::new_impl(client, metrics))
135    }
136
137    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
138    /// HTTP), with a configured request timeout.
139    pub fn new_remote_with_timeout(
140        url: Url,
141        timeout: std::time::Duration,
142        metrics: Arc<IngestionMetrics>,
143    ) -> IngestionResult<Self> {
144        let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
145        Ok(Self::new_impl(client, metrics))
146    }
147
148    /// An ingestion client that fetches checkpoints from a local directory.
149    pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
150        let client = Arc::new(LocalIngestionClient::new(path));
151        Self::new_impl(client, metrics)
152    }
153
154    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
155    pub fn new_rpc(
156        url: Url,
157        username: Option<String>,
158        password: Option<String>,
159        metrics: Arc<IngestionMetrics>,
160    ) -> IngestionResult<Self> {
161        let client = if let Some(username) = username {
162            let mut headers = HeadersInterceptor::new();
163            headers.basic_auth(username, password);
164            Client::new(url.to_string())?
165                .with_headers(headers)
166                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167        } else {
168            Client::new(url.to_string())?
169                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
170        };
171        Ok(Self::new_impl(Arc::new(client), metrics))
172    }
173
174    pub(crate) fn new_impl(
175        client: Arc<dyn IngestionClientTrait>,
176        metrics: Arc<IngestionMetrics>,
177    ) -> Self {
178        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
179            metrics.ingested_checkpoint_timestamp_lag.clone(),
180            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
181            metrics.latest_ingested_checkpoint.clone(),
182        );
183        IngestionClient {
184            client,
185            metrics,
186            checkpoint_lag_reporter,
187        }
188    }
189
190    /// Fetch checkpoint data by sequence number.
191    ///
192    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
193    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
194    /// by the `retry_interval` parameter.
195    pub async fn wait_for(
196        &self,
197        checkpoint: u64,
198        retry_interval: Duration,
199    ) -> IngestionResult<Arc<Checkpoint>> {
200        let backoff = Constant::new(retry_interval);
201        let fetch = || async move {
202            use backoff::Error as BE;
203            self.fetch(checkpoint).await.map_err(|e| match e {
204                IngestionError::NotFound(checkpoint) => {
205                    debug!(checkpoint, "Checkpoint not found, retrying...");
206                    self.metrics.total_ingested_not_found_retries.inc();
207                    BE::transient(e)
208                }
209                e => BE::permanent(e),
210            })
211        };
212
213        backoff::future::retry(backoff, fetch).await
214    }
215
216    /// Fetch checkpoint data by sequence number.
217    ///
218    /// Repeatedly retries transient errors with an exponential backoff (up to
219    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are either defined by the client
220    /// implementation that returns a [FetchError::Transient] error variant, or within this
221    /// function if we fail to deserialize the result as [Checkpoint].
222    ///
223    /// The function will immediately return if the checkpoint is not found.
224    pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
225        let client = self.client.clone();
226        let request = move || {
227            let client = client.clone();
228            async move {
229                let fetch_data = with_slow_future_monitor(
230                    client.fetch(checkpoint),
231                    SLOW_OPERATION_WARNING_THRESHOLD,
232                    /* on_threshold_exceeded =*/
233                    || {
234                        warn!(
235                            checkpoint,
236                            threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
237                            "Slow checkpoint fetch operation detected"
238                        );
239                    },
240                )
241                .await
242                .map_err(|err| match err {
243                    FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
244                    FetchError::Transient { reason, error } => self.metrics.inc_retry(
245                        checkpoint,
246                        reason,
247                        IngestionError::FetchError(checkpoint, error),
248                    ),
249                    FetchError::Permanent { reason, error } => {
250                        error!(checkpoint, reason, "Permanent fetch error: {error}");
251                        self.metrics
252                            .total_ingested_permanent_errors
253                            .with_label_values(&[reason])
254                            .inc();
255                        BE::permanent(IngestionError::FetchError(checkpoint, error))
256                    }
257                })?;
258
259                Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
260                    FetchData::Raw(bytes) => {
261                        self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
262                        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
263                            self.metrics.inc_retry(
264                                checkpoint,
265                                "deserialization",
266                                IngestionError::DeserializationError(checkpoint, e),
267                            )
268                        })?;
269                        checkpoint.into()
270                    }
271                    FetchData::Checkpoint(data) => {
272                        // We are not recording size metric for Checkpoint data (from RPC client).
273                        // TODO: Record the metric when we have a good way to get the size information
274                        data
275                    }
276                })
277            }
278        };
279
280        // Keep backing off until we are waiting for the max interval, but don't give up.
281        let backoff = ExponentialBackoff {
282            max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
283            max_elapsed_time: None,
284            ..Default::default()
285        };
286
287        let guard = self.metrics.ingested_checkpoint_latency.start_timer();
288        let data = backoff::future::retry(backoff, request).await?;
289        let elapsed = guard.stop_and_record();
290
291        debug!(
292            checkpoint,
293            elapsed_ms = elapsed * 1000.0,
294            "Fetched checkpoint"
295        );
296
297        self.checkpoint_lag_reporter
298            .report_lag(checkpoint, data.summary.timestamp_ms);
299
300        self.metrics.total_ingested_checkpoints.inc();
301
302        self.metrics
303            .total_ingested_transactions
304            .inc_by(data.transactions.len() as u64);
305
306        self.metrics.total_ingested_events.inc_by(
307            data.transactions
308                .iter()
309                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
310                .sum(),
311        );
312
313        self.metrics
314            .total_ingested_objects
315            .inc_by(data.object_set.len() as u64);
316
317        Ok(Arc::new(data))
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use dashmap::DashMap;
324    use prometheus::Registry;
325    use std::sync::Arc;
326    use std::time::Duration;
327    use tokio::time::timeout;
328
329    use crate::ingestion::test_utils::test_checkpoint_data;
330
331    use super::*;
332
333    /// Mock implementation of IngestionClientTrait for testing
334    #[derive(Default)]
335    struct MockIngestionClient {
336        checkpoints: DashMap<u64, FetchData>,
337        transient_failures: DashMap<u64, usize>,
338        not_found_failures: DashMap<u64, usize>,
339        permanent_failures: DashMap<u64, usize>,
340    }
341
342    #[async_trait]
343    impl IngestionClientTrait for MockIngestionClient {
344        async fn fetch(&self, checkpoint: u64) -> FetchResult {
345            // Check for not found failures
346            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
347                && *remaining > 0
348            {
349                *remaining -= 1;
350                return Err(FetchError::NotFound);
351            }
352
353            // Check for non-retryable failures
354            if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
355                && *remaining > 0
356            {
357                *remaining -= 1;
358                return Err(FetchError::Permanent {
359                    reason: "mock_permanent_error",
360                    error: anyhow::anyhow!("Mock permanent error"),
361                });
362            }
363
364            // Check for transient failures
365            if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
366                && *remaining > 0
367            {
368                *remaining -= 1;
369                return Err(FetchError::Transient {
370                    reason: "mock_transient_error",
371                    error: anyhow::anyhow!("Mock transient error"),
372                });
373            }
374
375            // Return the checkpoint data if it exists
376            self.checkpoints
377                .get(&checkpoint)
378                .as_deref()
379                .cloned()
380                .ok_or(FetchError::NotFound)
381        }
382    }
383
384    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
385        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
386        let metrics = IngestionMetrics::new(None, &registry);
387        let mock_client = Arc::new(MockIngestionClient::default());
388        let client = IngestionClient::new_impl(mock_client.clone(), metrics);
389        (client, mock_client)
390    }
391
392    #[tokio::test]
393    async fn test_fetch_raw_bytes_success() {
394        let (client, mock) = setup_test();
395
396        // Create test data using test_checkpoint
397        let bytes = Bytes::from(test_checkpoint_data(1));
398        mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
399
400        // Fetch and verify
401        let result = client.fetch(1).await.unwrap();
402        assert_eq!(result.summary.sequence_number(), &1);
403    }
404
405    #[tokio::test]
406    async fn test_fetch_checkpoint_success() {
407        let (client, mock) = setup_test();
408
409        // Create test data using test_checkpoint
410        let bytes = test_checkpoint_data(1);
411        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
412        mock.checkpoints
413            .insert(1, FetchData::Checkpoint(checkpoint.into()));
414
415        // Fetch and verify
416        let result = client.fetch(1).await.unwrap();
417        assert_eq!(result.summary.sequence_number(), &1);
418    }
419
420    #[tokio::test]
421    async fn test_fetch_not_found() {
422        let (client, _) = setup_test();
423
424        // Try to fetch non-existent checkpoint
425        let result = client.fetch(1).await;
426        assert!(matches!(result, Err(IngestionError::NotFound(1))));
427    }
428
429    #[tokio::test]
430    async fn test_fetch_transient_error_with_retry() {
431        let (client, mock) = setup_test();
432
433        // Create test data using test_checkpoint
434        let bytes = test_checkpoint_data(1);
435        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
436
437        // Add checkpoint to mock with 2 transient failures
438        mock.checkpoints
439            .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
440        mock.transient_failures.insert(1, 2);
441
442        // Fetch and verify it succeeds after retries
443        let result = client.fetch(1).await.unwrap();
444        assert_eq!(*result.summary.sequence_number(), 1);
445
446        // Verify that exactly 2 retries were recorded
447        let retries = client
448            .metrics
449            .total_ingested_transient_retries
450            .with_label_values(&["mock_transient_error"])
451            .get();
452        assert_eq!(retries, 2);
453    }
454
455    #[tokio::test]
456    async fn test_wait_for_checkpoint_with_retry() {
457        let (client, mock) = setup_test();
458
459        // Create test data using test_checkpoint
460        let bytes = test_checkpoint_data(1);
461        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
462
463        // Add checkpoint to mock with 1 not_found failures
464        mock.checkpoints
465            .insert(1, FetchData::Checkpoint(checkpoint.into()));
466        mock.not_found_failures.insert(1, 1);
467
468        // Wait for checkpoint with short retry interval
469        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
470        assert_eq!(result.summary.sequence_number(), &1);
471
472        // Verify that exactly 1 retry was recorded
473        let retries = client.metrics.total_ingested_not_found_retries.get();
474        assert_eq!(retries, 1);
475    }
476
477    #[tokio::test]
478    async fn test_wait_for_checkpoint_instant() {
479        let (client, mock) = setup_test();
480
481        // Create test data using test_checkpoint
482        let bytes = test_checkpoint_data(1);
483        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
484
485        // Add checkpoint to mock with no failures - data should be available immediately
486        mock.checkpoints
487            .insert(1, FetchData::Checkpoint(checkpoint.into()));
488
489        // Wait for checkpoint with short retry interval
490        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
491        assert_eq!(result.summary.sequence_number(), &1);
492    }
493
494    #[tokio::test]
495    async fn test_wait_for_permanent_deserialization_error() {
496        let (client, mock) = setup_test();
497
498        // Add invalid data that will cause a deserialization error
499        mock.checkpoints
500            .insert(1, FetchData::Raw(Bytes::from("invalid data")));
501
502        // wait_for should keep retrying on deserialization errors and timeout
503        timeout(
504            Duration::from_secs(1),
505            client.wait_for(1, Duration::from_millis(50)),
506        )
507        .await
508        .unwrap_err();
509    }
510
511    #[tokio::test]
512    async fn test_fetch_non_retryable_error() {
513        let (client, mock) = setup_test();
514
515        mock.permanent_failures.insert(1, 1);
516
517        let result = client.fetch(1).await;
518        assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
519
520        // Verify that the non-retryable error metric was incremented
521        let errors = client
522            .metrics
523            .total_ingested_permanent_errors
524            .with_label_values(&["mock_permanent_error"])
525            .get();
526        assert_eq!(errors, 1);
527    }
528}