sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::Error as BE;
9use backoff::ExponentialBackoff;
10use backoff::backoff::Constant;
11use sui_rpc::Client;
12use sui_rpc::client::HeadersInterceptor;
13use sui_storage::blob::Blob;
14use tokio_util::bytes::Bytes;
15use tracing::{debug, error, warn};
16use url::Url;
17
18use crate::ingestion::Error as IngestionError;
19use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
20use crate::ingestion::Result as IngestionResult;
21use crate::ingestion::local_client::LocalIngestionClient;
22use crate::ingestion::remote_client::RemoteIngestionClient;
23use crate::metrics::CheckpointLagMetricReporter;
24use crate::metrics::IngestionMetrics;
25use crate::task::with_slow_future_monitor;
26use crate::types::full_checkpoint_content::{Checkpoint, CheckpointData};
27use async_trait::async_trait;
28
29/// Wait at most this long between retries for transient errors.
30const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
31
32/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
33///
34/// Operations that take longer than this duration will trigger a warning log, but will
35/// continue executing without being canceled. This helps identify network issues or
36/// slow remote stores without interrupting the ingestion process.
37const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
38
39#[async_trait]
40pub(crate) trait IngestionClientTrait: Send + Sync {
41    async fn fetch(&self, checkpoint: u64) -> FetchResult;
42}
43
44#[derive(clap::Args, Clone, Debug, Default)]
45#[group(required = true)]
46pub struct IngestionClientArgs {
47    /// Remote Store to fetch checkpoints from.
48    #[clap(long, group = "source")]
49    pub remote_store_url: Option<Url>,
50
51    /// Path to the local ingestion directory.
52    /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used.
53    #[clap(long, group = "source")]
54    pub local_ingestion_path: Option<PathBuf>,
55
56    /// Sui fullnode gRPC url to fetch checkpoints from.
57    /// If all remote_store_url, local_ingestion_path and rpc_api_url are provided, remote_store_url will be used.
58    #[clap(long, env, group = "source")]
59    pub rpc_api_url: Option<Url>,
60
61    /// Optional username for the gRPC service.
62    #[clap(long, env)]
63    pub rpc_username: Option<String>,
64
65    /// Optional password for the gRPC service.
66    #[clap(long, env)]
67    pub rpc_password: Option<String>,
68}
69
70#[derive(thiserror::Error, Debug)]
71pub enum FetchError {
72    #[error("Checkpoint not found")]
73    NotFound,
74    #[error("Failed to fetch checkpoint due to {reason}: {error}")]
75    Transient {
76        reason: &'static str,
77        #[source]
78        error: anyhow::Error,
79    },
80    #[error("Permenent error in {reason}: {error}")]
81    Permanent {
82        reason: &'static str,
83        #[source]
84        error: anyhow::Error,
85    },
86}
87
88pub type FetchResult = Result<FetchData, FetchError>;
89
90#[derive(Clone)]
91#[allow(clippy::large_enum_variant)]
92pub enum FetchData {
93    Raw(Bytes),
94    Checkpoint(Checkpoint),
95}
96
97#[derive(Clone)]
98pub struct IngestionClient {
99    client: Arc<dyn IngestionClientTrait>,
100    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
101    metrics: Arc<IngestionMetrics>,
102    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
103}
104
105impl IngestionClient {
106    /// Construct a new ingestion client. Its source is determined by `args`.
107    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
108        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
109        let client = if let Some(url) = args.remote_store_url.as_ref() {
110            IngestionClient::new_remote(url.clone(), metrics.clone())?
111        } else if let Some(path) = args.local_ingestion_path.as_ref() {
112            IngestionClient::new_local(path.clone(), metrics.clone())
113        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
114            IngestionClient::new_rpc(
115                rpc_api_url.clone(),
116                args.rpc_username,
117                args.rpc_password,
118                metrics.clone(),
119            )?
120        } else {
121            panic!("One of remote_store_url, local_ingestion_path or rpc_api_url must be provided");
122        };
123
124        Ok(client)
125    }
126
127    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
128    /// HTTP).
129    pub fn new_remote(url: Url, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
130        let client = Arc::new(RemoteIngestionClient::new(url)?);
131        Ok(Self::new_impl(client, metrics))
132    }
133
134    /// An ingestion client that fetches checkpoints from a remote store (an object store, over
135    /// HTTP), with a configured request timeout.
136    pub fn new_remote_with_timeout(
137        url: Url,
138        timeout: std::time::Duration,
139        metrics: Arc<IngestionMetrics>,
140    ) -> IngestionResult<Self> {
141        let client = Arc::new(RemoteIngestionClient::new_with_timeout(url, timeout)?);
142        Ok(Self::new_impl(client, metrics))
143    }
144
145    /// An ingestion client that fetches checkpoints from a local directory.
146    pub fn new_local(path: PathBuf, metrics: Arc<IngestionMetrics>) -> Self {
147        let client = Arc::new(LocalIngestionClient::new(path));
148        Self::new_impl(client, metrics)
149    }
150
151    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
152    pub fn new_rpc(
153        url: Url,
154        username: Option<String>,
155        password: Option<String>,
156        metrics: Arc<IngestionMetrics>,
157    ) -> IngestionResult<Self> {
158        let client = if let Some(username) = username {
159            let mut headers = HeadersInterceptor::new();
160            headers.basic_auth(username, password);
161            Client::new(url.to_string())?
162                .with_headers(headers)
163                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
164        } else {
165            Client::new(url.to_string())?
166                .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
167        };
168        Ok(Self::new_impl(Arc::new(client), metrics))
169    }
170
171    pub(crate) fn new_impl(
172        client: Arc<dyn IngestionClientTrait>,
173        metrics: Arc<IngestionMetrics>,
174    ) -> Self {
175        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
176            metrics.ingested_checkpoint_timestamp_lag.clone(),
177            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
178            metrics.latest_ingested_checkpoint.clone(),
179        );
180        IngestionClient {
181            client,
182            metrics,
183            checkpoint_lag_reporter,
184        }
185    }
186
187    /// Fetch checkpoint data by sequence number.
188    ///
189    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
190    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
191    /// by the `retry_interval` parameter.
192    pub async fn wait_for(
193        &self,
194        checkpoint: u64,
195        retry_interval: Duration,
196    ) -> IngestionResult<Arc<Checkpoint>> {
197        let backoff = Constant::new(retry_interval);
198        let fetch = || async move {
199            use backoff::Error as BE;
200            self.fetch(checkpoint).await.map_err(|e| match e {
201                IngestionError::NotFound(checkpoint) => {
202                    debug!(checkpoint, "Checkpoint not found, retrying...");
203                    self.metrics.total_ingested_not_found_retries.inc();
204                    BE::transient(e)
205                }
206                e => BE::permanent(e),
207            })
208        };
209
210        backoff::future::retry(backoff, fetch).await
211    }
212
213    /// Fetch checkpoint data by sequence number.
214    ///
215    /// Repeatedly retries transient errors with an exponential backoff (up to
216    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are either defined by the client
217    /// implementation that returns a [FetchError::Transient] error variant, or within this
218    /// function if we fail to deserialize the result as [Checkpoint].
219    ///
220    /// The function will immediately return if the checkpoint is not found.
221    pub async fn fetch(&self, checkpoint: u64) -> IngestionResult<Arc<Checkpoint>> {
222        let client = self.client.clone();
223        let request = move || {
224            let client = client.clone();
225            async move {
226                let fetch_data = with_slow_future_monitor(
227                    client.fetch(checkpoint),
228                    SLOW_OPERATION_WARNING_THRESHOLD,
229                    /* on_threshold_exceeded =*/
230                    || {
231                        warn!(
232                            checkpoint,
233                            threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
234                            "Slow checkpoint fetch operation detected"
235                        );
236                    },
237                )
238                .await
239                .map_err(|err| match err {
240                    FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
241                    FetchError::Transient { reason, error } => self.metrics.inc_retry(
242                        checkpoint,
243                        reason,
244                        IngestionError::FetchError(checkpoint, error),
245                    ),
246                    FetchError::Permanent { reason, error } => {
247                        error!(checkpoint, reason, "Permanent fetch error: {error}");
248                        self.metrics
249                            .total_ingested_permanent_errors
250                            .with_label_values(&[reason])
251                            .inc();
252                        BE::permanent(IngestionError::FetchError(checkpoint, error))
253                    }
254                })?;
255
256                Ok::<Checkpoint, backoff::Error<IngestionError>>(match fetch_data {
257                    FetchData::Raw(bytes) => {
258                        self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
259                        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
260                            self.metrics.inc_retry(
261                                checkpoint,
262                                "deserialization",
263                                IngestionError::DeserializationError(checkpoint, e),
264                            )
265                        })?;
266                        checkpoint.into()
267                    }
268                    FetchData::Checkpoint(data) => {
269                        // We are not recording size metric for Checkpoint data (from RPC client).
270                        // TODO: Record the metric when we have a good way to get the size information
271                        data
272                    }
273                })
274            }
275        };
276
277        // Keep backing off until we are waiting for the max interval, but don't give up.
278        let backoff = ExponentialBackoff {
279            max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
280            max_elapsed_time: None,
281            ..Default::default()
282        };
283
284        let guard = self.metrics.ingested_checkpoint_latency.start_timer();
285        let data = backoff::future::retry(backoff, request).await?;
286        let elapsed = guard.stop_and_record();
287
288        debug!(
289            checkpoint,
290            elapsed_ms = elapsed * 1000.0,
291            "Fetched checkpoint"
292        );
293
294        self.checkpoint_lag_reporter
295            .report_lag(checkpoint, data.summary.timestamp_ms);
296
297        self.metrics.total_ingested_checkpoints.inc();
298
299        self.metrics
300            .total_ingested_transactions
301            .inc_by(data.transactions.len() as u64);
302
303        self.metrics.total_ingested_events.inc_by(
304            data.transactions
305                .iter()
306                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
307                .sum(),
308        );
309
310        self.metrics
311            .total_ingested_objects
312            .inc_by(data.object_set.len() as u64);
313
314        Ok(Arc::new(data))
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use dashmap::DashMap;
321    use prometheus::Registry;
322    use std::sync::Arc;
323    use std::time::Duration;
324    use tokio::time::timeout;
325    use tokio_util::bytes::Bytes;
326
327    use crate::ingestion::test_utils::test_checkpoint_data;
328
329    use super::*;
330
331    /// Mock implementation of IngestionClientTrait for testing
332    #[derive(Default)]
333    struct MockIngestionClient {
334        checkpoints: DashMap<u64, FetchData>,
335        transient_failures: DashMap<u64, usize>,
336        not_found_failures: DashMap<u64, usize>,
337        permanent_failures: DashMap<u64, usize>,
338    }
339
340    #[async_trait]
341    impl IngestionClientTrait for MockIngestionClient {
342        async fn fetch(&self, checkpoint: u64) -> FetchResult {
343            // Check for not found failures
344            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
345                && *remaining > 0
346            {
347                *remaining -= 1;
348                return Err(FetchError::NotFound);
349            }
350
351            // Check for non-retryable failures
352            if let Some(mut remaining) = self.permanent_failures.get_mut(&checkpoint)
353                && *remaining > 0
354            {
355                *remaining -= 1;
356                return Err(FetchError::Permanent {
357                    reason: "mock_permanent_error",
358                    error: anyhow::anyhow!("Mock permanent error"),
359                });
360            }
361
362            // Check for transient failures
363            if let Some(mut remaining) = self.transient_failures.get_mut(&checkpoint)
364                && *remaining > 0
365            {
366                *remaining -= 1;
367                return Err(FetchError::Transient {
368                    reason: "mock_transient_error",
369                    error: anyhow::anyhow!("Mock transient error"),
370                });
371            }
372
373            // Return the checkpoint data if it exists
374            self.checkpoints
375                .get(&checkpoint)
376                .as_deref()
377                .cloned()
378                .ok_or(FetchError::NotFound)
379        }
380    }
381
382    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
383        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
384        let metrics = IngestionMetrics::new(None, &registry);
385        let mock_client = Arc::new(MockIngestionClient::default());
386        let client = IngestionClient::new_impl(mock_client.clone(), metrics);
387        (client, mock_client)
388    }
389
390    #[tokio::test]
391    async fn test_fetch_raw_bytes_success() {
392        let (client, mock) = setup_test();
393
394        // Create test data using test_checkpoint
395        let bytes = Bytes::from(test_checkpoint_data(1));
396        mock.checkpoints.insert(1, FetchData::Raw(bytes.clone()));
397
398        // Fetch and verify
399        let result = client.fetch(1).await.unwrap();
400        assert_eq!(result.summary.sequence_number(), &1);
401    }
402
403    #[tokio::test]
404    async fn test_fetch_checkpoint_success() {
405        let (client, mock) = setup_test();
406
407        // Create test data using test_checkpoint
408        let bytes = test_checkpoint_data(1);
409        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
410        mock.checkpoints
411            .insert(1, FetchData::Checkpoint(checkpoint.into()));
412
413        // Fetch and verify
414        let result = client.fetch(1).await.unwrap();
415        assert_eq!(result.summary.sequence_number(), &1);
416    }
417
418    #[tokio::test]
419    async fn test_fetch_not_found() {
420        let (client, _) = setup_test();
421
422        // Try to fetch non-existent checkpoint
423        let result = client.fetch(1).await;
424        assert!(matches!(result, Err(IngestionError::NotFound(1))));
425    }
426
427    #[tokio::test]
428    async fn test_fetch_transient_error_with_retry() {
429        let (client, mock) = setup_test();
430
431        // Create test data using test_checkpoint
432        let bytes = test_checkpoint_data(1);
433        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
434
435        // Add checkpoint to mock with 2 transient failures
436        mock.checkpoints
437            .insert(1, FetchData::Checkpoint(checkpoint.clone().into()));
438        mock.transient_failures.insert(1, 2);
439
440        // Fetch and verify it succeeds after retries
441        let result = client.fetch(1).await.unwrap();
442        assert_eq!(*result.summary.sequence_number(), 1);
443
444        // Verify that exactly 2 retries were recorded
445        let retries = client
446            .metrics
447            .total_ingested_transient_retries
448            .with_label_values(&["mock_transient_error"])
449            .get();
450        assert_eq!(retries, 2);
451    }
452
453    #[tokio::test]
454    async fn test_wait_for_checkpoint_with_retry() {
455        let (client, mock) = setup_test();
456
457        // Create test data using test_checkpoint
458        let bytes = test_checkpoint_data(1);
459        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
460
461        // Add checkpoint to mock with 1 not_found failures
462        mock.checkpoints
463            .insert(1, FetchData::Checkpoint(checkpoint.into()));
464        mock.not_found_failures.insert(1, 1);
465
466        // Wait for checkpoint with short retry interval
467        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
468        assert_eq!(result.summary.sequence_number(), &1);
469
470        // Verify that exactly 1 retry was recorded
471        let retries = client.metrics.total_ingested_not_found_retries.get();
472        assert_eq!(retries, 1);
473    }
474
475    #[tokio::test]
476    async fn test_wait_for_checkpoint_instant() {
477        let (client, mock) = setup_test();
478
479        // Create test data using test_checkpoint
480        let bytes = test_checkpoint_data(1);
481        let checkpoint: CheckpointData = Blob::from_bytes(&bytes).unwrap();
482
483        // Add checkpoint to mock with no failures - data should be available immediately
484        mock.checkpoints
485            .insert(1, FetchData::Checkpoint(checkpoint.into()));
486
487        // Wait for checkpoint with short retry interval
488        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
489        assert_eq!(result.summary.sequence_number(), &1);
490    }
491
492    #[tokio::test]
493    async fn test_wait_for_permanent_deserialization_error() {
494        let (client, mock) = setup_test();
495
496        // Add invalid data that will cause a deserialization error
497        mock.checkpoints
498            .insert(1, FetchData::Raw(Bytes::from("invalid data")));
499
500        // wait_for should keep retrying on deserialization errors and timeout
501        timeout(
502            Duration::from_secs(1),
503            client.wait_for(1, Duration::from_millis(50)),
504        )
505        .await
506        .unwrap_err();
507    }
508
509    #[tokio::test]
510    async fn test_fetch_non_retryable_error() {
511        let (client, mock) = setup_test();
512
513        mock.permanent_failures.insert(1, 1);
514
515        let result = client.fetch(1).await;
516        assert!(matches!(result, Err(IngestionError::FetchError(1, _))));
517
518        // Verify that the non-retryable error metric was incremented
519        let errors = client
520            .metrics
521            .total_ingested_permanent_errors
522            .with_label_values(&["mock_permanent_error"])
523            .get();
524        assert_eq!(errors, 1);
525    }
526}