sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use backoff::Error as BE;
10use backoff::ExponentialBackoff;
11use backoff::backoff::Constant;
12use bytes::Bytes;
13use sui_futures::future::with_slow_future_monitor;
14use sui_rpc::Client;
15use sui_rpc::client::HeadersInterceptor;
16use sui_storage::blob::Blob;
17use tracing::{debug, error, warn};
18use url::Url;
19
20use crate::ingestion::Error as IngestionError;
21use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
22use crate::ingestion::Result as IngestionResult;
23use crate::ingestion::local_client::LocalIngestionClient;
24use crate::ingestion::remote_client::RemoteIngestionClient;
25use crate::metrics::CheckpointLagMetricReporter;
26use crate::metrics::IngestionMetrics;
27use crate::types::full_checkpoint_content::{Checkpoint, CheckpointData};
28
29/// Wait at most this long between retries for transient errors.
30const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
31
32/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
33///
34/// Operations that take longer than this duration will trigger a warning log, but will
35/// continue executing without being canceled. This helps identify network issues or
36/// slow remote stores without interrupting the ingestion process.
37const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
38
39#[async_trait]
40pub(crate) trait IngestionClientTrait: Send + Sync {
41    async fn fetch(&self, checkpoint: u64) -> FetchResult;
42}
43
44#[derive(clap::Args, Clone, Debug, Default)]
45#[group(required = true)]
46pub struct IngestionClientArgs {
47    /// Remote Store to fetch checkpoints from.
48    #[clap(long, group = "source")]
49    pub remote_store_url: Option<Url>,
50
51    /// Path to the local ingestion directory.
52    /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
53    #[clap(long, group = "source")]
54    pub local_ingestion_path: Option<PathBuf>,
55
56    /// Sui fullnode gRPC url to fetch checkpoints from.
57    /// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used.
58    #[clap(long, env, group = "source")]
59    pub rpc_api_url: Option<Url>,
60
61    /// Optional username for the gRPC service.
62    #[clap(long, env)]
63    pub rpc_username: Option<String>,
64
65    /// Optional password for the gRPC service.
66    #[clap(long, env)]
67    pub rpc_password: Option<String>,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum FetchError {
72    #[error("Checkpoint not found")]
73    NotFound,
74    #[error("Failed to fetch checkpoint due to {reason}: {error}")]
75    Transient {
76        reason: &'static str,
77        #[source]
78        error: anyhow::Error,
79    },
80    #[error("Permenent error in {reason}: {error}")]
81    Permanent {
82        reason: &'static str,
83        #[source]
84        error: anyhow::Error,
85    },
86}
87
88pub type FetchResult = Result<FetchData, FetchError>;
89
90#[derive(Clone)]
91#[allow(clippy::large_enum_variant)]
92pub enum FetchData {
93    Raw(Bytes),
94    Checkpoint(Checkpoint),
95}
96
97#[derive(Clone)]
98pub struct IngestionClient {
99    client: Arc<dyn IngestionClientTrait>,
100    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
101    metrics: Arc<IngestionMetrics>,
102    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
103}
104
105impl IngestionClient {
106    /// Construct a new ingestion client. Its source is determined by `args`.
107    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
108        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
109        let client = if let Some(url) = args.remote_store_url.as_ref() {
110            IngestionClient::new_remote(url.clone(), metrics.clone())?
111        } else if let Some(path) = args.local_ingestion_path.as_ref() {
112            IngestionClient::new_local(path.clone(), metrics.clone())
113        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
114            IngestionClient::new_rpc(
115                rpc_api_url.clone(),
116                args.rpc_username,
117                args.rpc_password,
118                metrics.clone(),
119            )?
120        } else {
121            panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
122        };
123
124        Ok(client)
125    }
126
127    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
128    /// HTTP).
129    pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
130        let client = Arc::new(RemoteIngestionClient::new(url)?);
131        Ok(Self::new_impl(client, metrics))
132    }
133
134    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
135    /// HTTP), with a configured request timeout.
136    pub fn new_remote_with_timeout(
137        url: Url,
138        timeout: std::time::Duration,
139        metrics: Arc<IngestionMetrics>,
140    ) -> IngestionResult<Self> {
141        let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
142        Ok(Self::new_impl(client, metrics))
143    }
144
145    /// An ingestion client that fetches checkpoints from a local directory.
146    pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
147        let client = Arc::new(LocalIngestionClient::new(path));
148        Self::new_impl(client, metrics)
149    }
150
151    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
152    pub fn new_rpc(
153        url: Url,
154        username: Option<String>,
155        password: Option<String>,
156        metrics: Arc<IngestionMetrics>,
157    ) -> IngestionResult<Self> {
158        let client = if let Some(username) = username {
159            let mut headers = HeadersInterceptor::new();
160            headers.basic_auth(username, password);
161            Client::new(url.to_string())?
162                .with_headers(headers)
163                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
164        } else {
165            Client::new(url.to_string())?
166                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167        };
168        Ok(Self::new_impl(Arc::new(client), metrics))
169    }
170
171    pub(crate) fn new_impl(
172        client: Arc<dyn IngestionClientTrait>,
173        metrics: Arc<IngestionMetrics>,
174    ) -> Self {
175        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
176            metrics.ingested_checkpoint_timestamp_lag.clone(),
177            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
178            metrics.latest_ingested_checkpoint.clone(),
179        );
180        IngestionClient {
181            client,
182            metrics,
183            checkpoint_lag_reporter,
184        }
185    }
186
187    /// Fetch checkpoint data by sequence number.
188    ///
189    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
190    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
191    /// by the `retry_interval` parameter.
192    pub async fn wait_for(
193        &self,
194        checkpoint: u64,
195        retry_interval: Duration,
196    ) -> IngestionResult<Arc<Checkpoint>> {
197        let backoff = Constant::new(retry_interval);
198        let fetch = || async move {
199            use backoff::Error as BE;
200            self.fetch(checkpoint).await.map_err(|e| match e {
201                IngestionError::NotFound(checkpoint) => {
202                    debug!(checkpoint, "Checkpoint not found, retrying...");
203                    self.metrics.total_ingested_not_found_retries.inc();
204                    BE::transient(e)
205                }
206                e => BE::permanent(e),
207            })
208        };
209
210        backoff::future::retry(backoff, fetch).await
211    }
212
213    /// Fetch checkpoint data by sequence number.
214    ///
215    /// Repeatedly retries transient errors with an exponential backoff (up to
216    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are either defined by the client
217    /// implementation that returns a [FetchError::Transient] error variant, or within this
218    /// function if we fail to deserialize the result as [Checkpoint].
219    ///
220    /// The function will immediately return if the checkpoint is not found.
221    pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
222        let client = self.client.clone();
223        let request = move || {
224            let client = client.clone();
225            async move {
226                let fetch_data = with_slow_future_monitor(
227                    client.fetch(checkpoint),
228                    SLOW_OPERATION_WARNING_THRESHOLD,
229                    /* on_threshold_exceeded =*/
230                    || {
231                        warn!(
232                            checkpoint,
233                            threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
234                            "Slow checkpoint fetch operation detected"
235                        );
236                    },
237                )
238                .await
239                .map_err(|err| match err {
240                    FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
241                    FetchError::Transient { reason, error } => self.metrics.inc_retry(
242                        checkpoint,
243                        reason,
244                        IngestionError::FetchError(checkpoint, error),
245                    ),
246                    FetchError::Permanent { reason, error } => {
247                        error!(checkpoint, reason, "Permanent fetch error: {error}");
248                        self.metrics
249                            .total_ingested_permanent_errors
250                            .with_label_values(&[reason])
251                            .inc();
252                        BE::permanent(IngestionError::FetchError(checkpoint, error))
253                    }
254                })?;
255
256                Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
257                    FetchData::Raw(bytes) => {
258                        self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
259                        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
260                            self.metrics.inc_retry(
261                                checkpoint,
262                                "deserialization",
263                                IngestionError::DeserializationError(checkpoint, e),
264                            )
265                        })?;
266                        checkpoint.into()
267                    }
268                    FetchData::Checkpoint(data) => {
269                        // We are not recording size metric for Checkpoint data (from RPC client).
270                        // TODO: Record the metric when we have a good way to get the size information
271                        data
272                    }
273                })
274            }
275        };
276
277        // Keep backing off until we are waiting for the max interval, but don't give up.
278        let backoff = ExponentialBackoff {
279            max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
280            max_elapsed_time: None,
281            ..Default::default()
282        };
283
284        let guard = self.metrics.ingested_checkpoint_latency.start_timer();
285        let data = backoff::future::retry(backoff, request).await?;
286        let elapsed = guard.stop_and_record();
287
288        debug!(
289            checkpoint,
290            elapsed_ms = elapsed * 1000.0,
291            "Fetched checkpoint"
292        );
293
294        self.checkpoint_lag_reporter
295            .report_lag(checkpoint, data.summary.timestamp_ms);
296
297        self.metrics.total_ingested_checkpoints.inc();
298
299        self.metrics
300            .total_ingested_transactions
301            .inc_by(data.transactions.len() as u64);
302
303        self.metrics.total_ingested_events.inc_by(
304            data.transactions
305                .iter()
306                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
307                .sum(),
308        );
309
310        self.metrics
311            .total_ingested_objects
312            .inc_by(data.object_set.len() as u64);
313
314        Ok(Arc::new(data))
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use dashmap::DashMap;
321    use prometheus::Registry;
322    use std::sync::Arc;
323    use std::time::Duration;
324    use tokio::time::timeout;
325
326    use crate::ingestion::test_utils::test_checkpoint_data;
327
328    use super::*;
329
330    /// Mock implementation of IngestionClientTrait for testing
331    #[derive(Default)]
332    struct MockIngestionClient {
333        checkpoints: DashMap<u64, FetchData>,
334        transient_failures: DashMap<u64, usize>,
335        not_found_failures: DashMap<u64, usize>,
336        permanent_failures: DashMap<u64, usize>,
337    }
338
339    #[async_trait]
340    impl IngestionClientTrait for MockIngestionClient {
341        async fn fetch(&self, checkpoint: u64) -> FetchResult {
342            // Check for not found failures
343            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
344                && *remaining > 0
345            {
346                *remaining -= 1;
347                return Err(FetchError::NotFound);
348            }
349
350            // Check for non-retryable failures
351            if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
352                && *remaining > 0
353            {
354                *remaining -= 1;
355                return Err(FetchError::Permanent {
356                    reason: "mock_permanent_error",
357                    error: anyhow::anyhow!("Mock permanent error"),
358                });
359            }
360
361            // Check for transient failures
362            if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
363                && *remaining > 0
364            {
365                *remaining -= 1;
366                return Err(FetchError::Transient {
367                    reason: "mock_transient_error",
368                    error: anyhow::anyhow!("Mock transient error"),
369                });
370            }
371
372            // Return the checkpoint data if it exists
373            self.checkpoints
374                .get(&checkpoint)
375                .as_deref()
376                .cloned()
377                .ok_or(FetchError::NotFound)
378        }
379    }
380
381    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
382        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
383        let metrics = IngestionMetrics::new(None, &registry);
384        let mock_client = Arc::new(MockIngestionClient::default());
385        let client = IngestionClient::new_impl(mock_client.clone(), metrics);
386        (client, mock_client)
387    }
388
389    #[tokio::test]
390    async fn test_fetch_raw_bytes_success() {
391        let (client, mock) = setup_test();
392
393        // Create test data using test_checkpoint
394        let bytes = Bytes::from(test_checkpoint_data(1));
395        mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
396
397        // Fetch and verify
398        let result = client.fetch(1).await.unwrap();
399        assert_eq!(result.summary.sequence_number(), &1);
400    }
401
402    #[tokio::test]
403    async fn test_fetch_checkpoint_success() {
404        let (client, mock) = setup_test();
405
406        // Create test data using test_checkpoint
407        let bytes = test_checkpoint_data(1);
408        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
409        mock.checkpoints
410            .insert(1, FetchData::Checkpoint(checkpoint.into()));
411
412        // Fetch and verify
413        let result = client.fetch(1).await.unwrap();
414        assert_eq!(result.summary.sequence_number(), &1);
415    }
416
417    #[tokio::test]
418    async fn test_fetch_not_found() {
419        let (client, _) = setup_test();
420
421        // Try to fetch non-existent checkpoint
422        let result = client.fetch(1).await;
423        assert!(matches!(result, Err(IngestionError::NotFound(1))));
424    }
425
426    #[tokio::test]
427    async fn test_fetch_transient_error_with_retry() {
428        let (client, mock) = setup_test();
429
430        // Create test data using test_checkpoint
431        let bytes = test_checkpoint_data(1);
432        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
433
434        // Add checkpoint to mock with 2 transient failures
435        mock.checkpoints
436            .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
437        mock.transient_failures.insert(1, 2);
438
439        // Fetch and verify it succeeds after retries
440        let result = client.fetch(1).await.unwrap();
441        assert_eq!(*result.summary.sequence_number(), 1);
442
443        // Verify that exactly 2 retries were recorded
444        let retries = client
445            .metrics
446            .total_ingested_transient_retries
447            .with_label_values(&["mock_transient_error"])
448            .get();
449        assert_eq!(retries, 2);
450    }
451
452    #[tokio::test]
453    async fn test_wait_for_checkpoint_with_retry() {
454        let (client, mock) = setup_test();
455
456        // Create test data using test_checkpoint
457        let bytes = test_checkpoint_data(1);
458        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
459
460        // Add checkpoint to mock with 1 not_found failures
461        mock.checkpoints
462            .insert(1, FetchData::Checkpoint(checkpoint.into()));
463        mock.not_found_failures.insert(1, 1);
464
465        // Wait for checkpoint with short retry interval
466        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
467        assert_eq!(result.summary.sequence_number(), &1);
468
469        // Verify that exactly 1 retry was recorded
470        let retries = client.metrics.total_ingested_not_found_retries.get();
471        assert_eq!(retries, 1);
472    }
473
474    #[tokio::test]
475    async fn test_wait_for_checkpoint_instant() {
476        let (client, mock) = setup_test();
477
478        // Create test data using test_checkpoint
479        let bytes = test_checkpoint_data(1);
480        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
481
482        // Add checkpoint to mock with no failures - data should be available immediately
483        mock.checkpoints
484            .insert(1, FetchData::Checkpoint(checkpoint.into()));
485
486        // Wait for checkpoint with short retry interval
487        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
488        assert_eq!(result.summary.sequence_number(), &1);
489    }
490
491    #[tokio::test]
492    async fn test_wait_for_permanent_deserialization_error() {
493        let (client, mock) = setup_test();
494
495        // Add invalid data that will cause a deserialization error
496        mock.checkpoints
497            .insert(1, FetchData::Raw(Bytes::from("invalid data")));
498
499        // wait_for should keep retrying on deserialization errors and timeout
500        timeout(
501            Duration::from_secs(1),
502            client.wait_for(1, Duration::from_millis(50)),
503        )
504        .await
505        .unwrap_err();
506    }
507
508    #[tokio::test]
509    async fn test_fetch_non_retryable_error() {
510        let (client, mock) = setup_test();
511
512        mock.permanent_failures.insert(1, 1);
513
514        let result = client.fetch(1).await;
515        assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
516
517        // Verify that the non-retryable error metric was incremented
518        let errors = client
519            .metrics
520            .total_ingested_permanent_errors
521            .with_label_values(&["mock_permanent_error"])
522            .get();
523        assert_eq!(errors, 1);
524    }
525}