sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::future::Future;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use backoff::Error as BE;
11use backoff::ExponentialBackoff;
12use backoff::backoff::Constant;
13use clap::ArgGroup;
14use mysten_network::callback::CallbackLayer;
15use object_store::ClientOptions;
16use object_store::ObjectStore;
17use object_store::aws::AmazonS3Builder;
18use object_store::azure::MicrosoftAzureBuilder;
19use object_store::gcp::GoogleCloudStorageBuilder;
20use object_store::http::HttpBuilder;
21use object_store::local::LocalFileSystem;
22use prometheus::Histogram;
23use reqwest::header::HeaderMap;
24use reqwest::header::HeaderName;
25use reqwest::header::HeaderValue;
26use sui_futures::future::with_slow_future_monitor;
27use sui_rpc::Client;
28use sui_rpc::client::HeadersInterceptor;
29use sui_types::digests::ChainIdentifier;
30use tokio::sync::OnceCell;
31use tracing::debug;
32use tracing::warn;
33use url::Url;
34
35use crate::ingestion::Error as IE;
36use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
37use crate::ingestion::Result as IngestionResult;
38use crate::ingestion::byte_count::ByteCountMakeCallbackHandler;
39use crate::ingestion::decode;
40use crate::ingestion::store_client::StoreIngestionClient;
41use crate::metrics::CheckpointLagMetricReporter;
42use crate::metrics::IngestionMetrics;
43use crate::types::full_checkpoint_content::Checkpoint;
44
45/// Wait at most this long between retries for transient errors.
46const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
47
48/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
49///
50/// Operations that take longer than this duration will trigger a warning log, but will
51/// continue executing without being canceled. This helps identify network issues or
52/// slow remote stores without interrupting the ingestion process.
53const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
54
55#[async_trait]
56pub trait IngestionClientTrait: Send + Sync {
57    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier>;
58
59    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
60
61    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64>;
62}
63
64#[derive(clap::Args, Clone, Debug)]
65#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
66pub struct IngestionClientArgs {
67    /// Remote Store to fetch checkpoints from over HTTP.
68    #[arg(long, group = "source")]
69    pub remote_store_url: Option<Url>,
70
71    /// Fetch checkpoints from AWS S3. Provide the bucket name or endpoint-and-bucket.
72    /// (env: AWS_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
73    #[arg(long, group = "source")]
74    pub remote_store_s3: Option<String>,
75
76    /// Fetch checkpoints from Google Cloud Storage. Provide the bucket name.
77    /// (env: GOOGLE_SERVICE_ACCOUNT_PATH)
78    #[arg(long, group = "source")]
79    pub remote_store_gcs: Option<String>,
80
81    /// Fetch checkpoints from Azure Blob Storage. Provide the container name.
82    /// (env: AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCESS_KEY)
83    #[arg(long, group = "source")]
84    pub remote_store_azure: Option<String>,
85
86    /// Default header to include in remote store requests, as `<name>:<value>`.
87    /// Can be provided multiple times.
88    #[arg(long = "remote-store-header", value_parser = parse_remote_store_header)]
89    pub remote_store_headers: Vec<(HeaderName, HeaderValue)>,
90
91    /// Path to the local ingestion directory.
92    #[arg(long, group = "source")]
93    pub local_ingestion_path: Option<PathBuf>,
94
95    /// Sui fullnode gRPC url to fetch checkpoints from.
96    #[arg(long, group = "source")]
97    pub rpc_api_url: Option<Url>,
98
99    /// Optional username for the gRPC service.
100    #[arg(long, env, requires = "rpc_api_url")]
101    pub rpc_username: Option<String>,
102
103    /// Optional password for the gRPC service.
104    #[arg(long, env, requires = "rpc_api_url")]
105    pub rpc_password: Option<String>,
106
107    /// How long to wait for a checkpoint file to be downloaded (milliseconds). Set to 0 to disable
108    /// the timeout.
109    #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
110    pub checkpoint_timeout_ms: u64,
111
112    /// How long to wait while establishing a connection to the checkpoint store (milliseconds).
113    /// Set to 0 to disable the timeout.
114    #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
115    pub checkpoint_connection_timeout_ms: u64,
116}
117
118impl Default for IngestionClientArgs {
119    fn default() -> Self {
120        Self {
121            remote_store_url: None,
122            remote_store_s3: None,
123            remote_store_gcs: None,
124            remote_store_azure: None,
125            remote_store_headers: vec![],
126            local_ingestion_path: None,
127            rpc_api_url: None,
128            rpc_username: None,
129            rpc_password: None,
130            checkpoint_timeout_ms: 120_000,
131            checkpoint_connection_timeout_ms: 120_000,
132        }
133    }
134}
135
136impl IngestionClientArgs {
137    fn client_options(&self) -> ClientOptions {
138        let mut options = ClientOptions::default();
139
140        options = if self.checkpoint_timeout_ms == 0 {
141            options.with_timeout_disabled()
142        } else {
143            let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
144            options.with_timeout(timeout)
145        };
146
147        options = if self.checkpoint_connection_timeout_ms == 0 {
148            options.with_connect_timeout_disabled()
149        } else {
150            let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
151            options.with_connect_timeout(timeout)
152        };
153
154        options = if !self.remote_store_headers.is_empty() {
155            let mut headers = HeaderMap::new();
156            for (name, value) in &self.remote_store_headers {
157                headers.append(name.clone(), value.clone());
158            }
159
160            options.with_default_headers(headers)
161        } else {
162            options
163        };
164
165        options
166    }
167}
168
169#[derive(thiserror::Error, Debug)]
170pub enum CheckpointError {
171    #[error("Checkpoint not found")]
172    NotFound,
173    #[error("Failed to fetch checkpoint: {0}")]
174    Fetch(#[from] anyhow::Error),
175    #[error("Failed to decode checkpoint: {0}")]
176    Decode(#[from] decode::Error),
177}
178
179pub type CheckpointResult = Result<Checkpoint, CheckpointError>;
180
181#[derive(Clone)]
182pub struct IngestionClient {
183    client: Arc<dyn IngestionClientTrait>,
184    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
185    metrics: Arc<IngestionMetrics>,
186    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
187    chain_id: OnceCell<ChainIdentifier>,
188}
189
190#[derive(Clone, Debug)]
191pub struct CheckpointEnvelope {
192    pub checkpoint: Arc<Checkpoint>,
193    pub chain_id: ChainIdentifier,
194}
195
196impl IngestionClient {
197    /// Construct a new ingestion client. Its source is determined by `args`.
198    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
199        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
200        let retry = super::store_client::retry_config();
201        let client = if let Some(url) = args.remote_store_url.as_ref() {
202            let store = HttpBuilder::new()
203                .with_url(url.to_string())
204                .with_client_options(args.client_options().with_allow_http(true))
205                .with_retry(retry)
206                .build()
207                .map(Arc::new)?;
208            IngestionClient::with_store(store, metrics.clone())?
209        } else if let Some(bucket) = args.remote_store_s3.as_ref() {
210            let store = AmazonS3Builder::from_env()
211                .with_client_options(args.client_options())
212                .with_retry(retry)
213                .with_imdsv1_fallback()
214                .with_bucket_name(bucket)
215                .build()
216                .map(Arc::new)?;
217            IngestionClient::with_store(store, metrics.clone())?
218        } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
219            let store = GoogleCloudStorageBuilder::from_env()
220                .with_client_options(args.client_options())
221                .with_retry(retry)
222                .with_bucket_name(bucket)
223                .build()
224                .map(Arc::new)?;
225            IngestionClient::with_store(store, metrics.clone())?
226        } else if let Some(container) = args.remote_store_azure.as_ref() {
227            let store = MicrosoftAzureBuilder::from_env()
228                .with_client_options(args.client_options())
229                .with_retry(retry)
230                .with_container_name(container)
231                .build()
232                .map(Arc::new)?;
233            IngestionClient::with_store(store, metrics.clone())?
234        } else if let Some(path) = args.local_ingestion_path.as_ref() {
235            let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
236            IngestionClient::with_store(store, metrics.clone())?
237        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
238            IngestionClient::with_grpc(
239                rpc_api_url.clone(),
240                args.rpc_username,
241                args.rpc_password,
242                metrics.clone(),
243            )?
244        } else {
245            panic!(
246                "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
247                local_ingestion_path or rpc_api_url must be provided"
248            );
249        };
250
251        Ok(client)
252    }
253
254    /// An ingestion client that fetches checkpoints from a remote object store.
255    pub fn with_store(
256        store: Arc<dyn ObjectStore>,
257        metrics: Arc<IngestionMetrics>,
258    ) -> IngestionResult<Self> {
259        let client = Arc::new(StoreIngestionClient::new(
260            store,
261            Some(metrics.total_ingested_bytes.clone()),
262        ));
263        Ok(Self::from_trait(client, metrics))
264    }
265
266    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
267    pub fn with_grpc(
268        url: Url,
269        username: Option<String>,
270        password: Option<String>,
271        metrics: Arc<IngestionMetrics>,
272    ) -> IngestionResult<Self> {
273        let byte_count_layer = CallbackLayer::new(ByteCountMakeCallbackHandler::new(
274            metrics.total_ingested_bytes.clone(),
275        ));
276        let client = Client::new(url.to_string())?
277            .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
278            .request_layer(byte_count_layer);
279        let client = if let Some(username) = username {
280            let mut headers = HeadersInterceptor::new();
281            headers.basic_auth(username, password);
282            client.with_headers(headers)
283        } else {
284            client
285        };
286        Ok(Self::from_trait(Arc::new(client), metrics))
287    }
288
289    /// The metrics handle this client reports against. Callers constructing peer services (e.g. an
290    /// [`IngestionService`]) against the same client should reuse this Arc rather than building a
291    /// second [`IngestionMetrics`] from the same registry, which would double-register the metric
292    /// vectors.
293    ///
294    /// [`IngestionService`]: crate::ingestion::IngestionService
295    pub fn metrics(&self) -> &Arc<IngestionMetrics> {
296        &self.metrics
297    }
298
299    /// Wrap an arbitrary [`IngestionClientTrait`] implementation in an [`IngestionClient`]. Use
300    /// this when the source of checkpoints is not one of the built-in remote object stores or gRPC
301    /// endpoints — for example, when embedding the indexer in a fullnode that already has
302    /// checkpoint data on hand.
303    pub fn from_trait(
304        client: Arc<dyn IngestionClientTrait>,
305        metrics: Arc<IngestionMetrics>,
306    ) -> Self {
307        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
308            metrics.ingested_checkpoint_timestamp_lag.clone(),
309            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
310            metrics.latest_ingested_checkpoint.clone(),
311        );
312        IngestionClient {
313            client,
314            metrics,
315            checkpoint_lag_reporter,
316            chain_id: OnceCell::new(),
317        }
318    }
319
320    /// Fetch checkpoint data by sequence number.
321    ///
322    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
323    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
324    /// by the `retry_interval` parameter.
325    pub async fn wait_for(
326        &self,
327        checkpoint: u64,
328        retry_interval: Duration,
329    ) -> IngestionResult<CheckpointEnvelope> {
330        let backoff = Constant::new(retry_interval);
331        let fetch = || async move {
332            use backoff::Error as BE;
333            self.checkpoint(checkpoint).await.map_err(|e| match e {
334                IE::NotFound(checkpoint) => {
335                    debug!(checkpoint, "Checkpoint not found, retrying...");
336                    self.metrics.total_ingested_not_found_retries.inc();
337                    BE::transient(e)
338                }
339                e => BE::permanent(e),
340            })
341        };
342
343        backoff::future::retry(backoff, fetch).await
344    }
345
346    /// Fetch checkpoint data by sequence number.
347    ///
348    /// Repeatedly retries transient errors with an exponential backoff (up to
349    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are defined by the client
350    /// implementation that returns a [CheckpointError::Fetch] or [CheckpointError::Decode] error
351    /// variant.
352    ///
353    /// The function will immediately return if the checkpoint is not found.
354    pub async fn checkpoint(&self, cp_sequence_number: u64) -> IngestionResult<CheckpointEnvelope> {
355        let client = self.client.clone();
356        let checkpoint_data_fut = retry_transient_with_slow_monitor(
357            "checkpoint",
358            move || {
359                let client = client.clone();
360                async move {
361                    client
362                        .checkpoint(cp_sequence_number)
363                        .await
364                        .map_err(|err| match err {
365                            // Not found errors are marked as permanent here, but retried in
366                            // `wait_for` in case the checkpoint becomes available in the future.
367                            CheckpointError::NotFound => {
368                                BE::permanent(IE::NotFound(cp_sequence_number))
369                            }
370                            // Retry fetch and decode errors in case the root cause is in the
371                            // upstream checkpoint data source. If the upstream checkpoint data
372                            // source is corrected, then the indexer will automatically recover
373                            // the next time the read is attempted.
374                            CheckpointError::Fetch(e) => self.metrics.inc_retry(
375                                cp_sequence_number,
376                                "fetch",
377                                IE::FetchError(cp_sequence_number, e),
378                            ),
379                            CheckpointError::Decode(e) => self.metrics.inc_retry(
380                                cp_sequence_number,
381                                e.reason(),
382                                IE::DecodeError(cp_sequence_number, e.into()),
383                            ),
384                        })
385                }
386            },
387            &self.metrics.ingested_checkpoint_latency,
388        );
389
390        let client = self.client.clone();
391        let chain_id_fut = self.chain_id.get_or_try_init(|| {
392            retry_transient_with_slow_monitor(
393                "chain_id",
394                move || {
395                    let client = client.clone();
396                    async move {
397                        client
398                            .chain_id()
399                            .await
400                            .map_err(|e| BE::transient(IE::ChainIdError(cp_sequence_number, e)))
401                    }
402                },
403                &self.metrics.ingested_chain_id_latency,
404            )
405        });
406
407        let (checkpoint, chain_id) = tokio::try_join!(checkpoint_data_fut, chain_id_fut)?;
408
409        self.checkpoint_lag_reporter
410            .report_lag(cp_sequence_number, checkpoint.summary.timestamp_ms);
411
412        self.metrics.total_ingested_checkpoints.inc();
413
414        self.metrics
415            .total_ingested_transactions
416            .inc_by(checkpoint.transactions.len() as u64);
417
418        self.metrics.total_ingested_events.inc_by(
419            checkpoint
420                .transactions
421                .iter()
422                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
423                .sum(),
424        );
425
426        self.metrics
427            .total_ingested_objects
428            .inc_by(checkpoint.object_set.len() as u64);
429
430        Ok(CheckpointEnvelope {
431            checkpoint: Arc::new(checkpoint),
432            chain_id: *chain_id,
433        })
434    }
435
436    pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
437        self.client.latest_checkpoint_number().await
438    }
439}
440
441/// Keep backing off until we are waiting for the max interval, but don't give up.
442pub(crate) fn transient_backoff() -> ExponentialBackoff {
443    ExponentialBackoff {
444        max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
445        max_elapsed_time: None,
446        ..Default::default()
447    }
448}
449
450/// Retry a fallible async operation with exponential backoff and slow-operation monitoring.
451/// Records the total time (including retries) in the provided latency histogram.
452pub(crate) async fn retry_transient_with_slow_monitor<F, Fut, T>(
453    operation: &str,
454    make_future: F,
455    latency: &Histogram,
456) -> IngestionResult<T>
457where
458    F: Fn() -> Fut,
459    Fut: Future<Output = Result<T, backoff::Error<IE>>>,
460{
461    let request = || {
462        let fut = make_future();
463        async move {
464            with_slow_future_monitor(fut, SLOW_OPERATION_WARNING_THRESHOLD, || {
465                warn!(
466                    operation,
467                    threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
468                    "Slow operation detected"
469                );
470            })
471            .await
472        }
473    };
474
475    let guard = latency.start_timer();
476    let data = backoff::future::retry(transient_backoff(), request).await?;
477    let elapsed = guard.stop_and_record();
478
479    debug!(
480        operation,
481        elapsed_ms = elapsed * 1000.0,
482        "Fetched operation"
483    );
484
485    Ok(data)
486}
487
488fn parse_remote_store_header(header: &str) -> Result<(HeaderName, HeaderValue), String> {
489    let (name, value) = header
490        .split_once(':')
491        .ok_or_else(|| "remote store header must be in `<name>:<value>` format".to_string())?;
492
493    let name = HeaderName::from_bytes(name.as_bytes())
494        .map_err(|err| format!("invalid remote store header name `{name}`: {err}"))?;
495    let value = HeaderValue::from_str(value)
496        .map_err(|err| format!("invalid remote store header value for `{name}`: {err}"))?;
497
498    Ok((name, value))
499}
500
501#[cfg(test)]
502pub(crate) mod tests {
503    use std::sync::Arc;
504    use std::time::Duration;
505
506    use clap::Parser;
507    use clap::error::ErrorKind;
508    use dashmap::DashMap;
509    use prometheus::Registry;
510    use sui_types::digests::CheckpointDigest;
511    use sui_types::event::Event;
512    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
513
514    use crate::ingestion::decode;
515    use crate::ingestion::test_utils::test_checkpoint_data;
516
517    use super::*;
518
519    fn test_checkpoint(seq: u64) -> Checkpoint {
520        let bytes = test_checkpoint_data(seq);
521        decode::checkpoint(&bytes).unwrap()
522    }
523
524    /// Build a checkpoint with one transaction containing one event and one created object.
525    fn test_checkpoint_with_data(seq: u64) -> Checkpoint {
526        TestCheckpointBuilder::new(seq)
527            .start_transaction(0)
528            .create_owned_object(0)
529            .with_events(vec![Event::random_for_testing()])
530            .finish_transaction()
531            .build_checkpoint()
532    }
533
534    #[derive(Debug, Parser)]
535    struct TestArgs {
536        #[clap(flatten)]
537        ingestion: IngestionClientArgs,
538    }
539
540    /// Mock implementation of IngestionClientTrait for testing.
541    ///
542    /// - `checkpoints`: pre-inserted checkpoints returned by `checkpoint()`. Sequences not
543    ///   in the map return `CheckpointError::NotFound`.
544    /// - `not_found_failures` / `fetch_failures` / `decode_failures`: number of times to
545    ///   return the corresponding error for a given sequence number before succeeding.
546    /// - `latest_checkpoint`: value returned by `latest_checkpoint_number()`.
547    #[derive(Default)]
548    pub(crate) struct MockIngestionClient {
549        pub checkpoints: DashMap<u64, Checkpoint>,
550        pub not_found_failures: DashMap<u64, usize>,
551        pub fetch_failures: DashMap<u64, usize>,
552        pub decode_failures: DashMap<u64, usize>,
553        pub latest_checkpoint: u64,
554    }
555
556    impl MockIngestionClient {
557        pub(crate) fn mock_chain_id() -> ChainIdentifier {
558            CheckpointDigest::new([1; 32]).into()
559        }
560
561        /// Populate `checkpoints` with synthetic test checkpoints for the given sequence
562        /// numbers.
563        pub(crate) fn insert_checkpoints(&self, range: impl IntoIterator<Item = u64>) {
564            for seq in range {
565                self.checkpoints.insert(seq, test_checkpoint(seq));
566            }
567        }
568    }
569
570    #[async_trait]
571    impl IngestionClientTrait for MockIngestionClient {
572        async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
573            Ok(Self::mock_chain_id())
574        }
575
576        async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
577            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
578                && *remaining > 0
579            {
580                *remaining -= 1;
581                return Err(CheckpointError::NotFound);
582            }
583
584            if let Some(mut remaining) = self.fetch_failures.get_mut(&checkpoint)
585                && *remaining > 0
586            {
587                *remaining -= 1;
588                return Err(CheckpointError::Fetch(anyhow::anyhow!("Mock fetch error")));
589            }
590
591            if let Some(mut remaining) = self.decode_failures.get_mut(&checkpoint)
592                && *remaining > 0
593            {
594                *remaining -= 1;
595                return Err(CheckpointError::Decode(decode::Error::Deserialization(
596                    prost::DecodeError::new("Mock deserialization error"),
597                )));
598            }
599
600            self.checkpoints
601                .get(&checkpoint)
602                .as_deref()
603                .cloned()
604                .ok_or(CheckpointError::NotFound)
605        }
606
607        async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
608            Ok(self.latest_checkpoint)
609        }
610    }
611
612    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
613        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
614        let metrics = IngestionMetrics::new(None, &registry);
615        let mock_client = Arc::new(MockIngestionClient::default());
616        let client = IngestionClient::from_trait(mock_client.clone(), metrics);
617        (client, mock_client)
618    }
619
620    #[test]
621    fn test_args_multiple_ingestion_sources_are_rejected() {
622        let err = TestArgs::try_parse_from([
623            "cmd",
624            "--remote-store-url",
625            "https://example.com",
626            "--rpc-api-url",
627            "http://localhost:8080",
628        ])
629        .unwrap_err();
630
631        assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
632    }
633
634    #[test]
635    fn test_args_optional_credentials() {
636        let args = TestArgs::try_parse_from([
637            "cmd",
638            "--rpc-api-url",
639            "http://localhost:8080",
640            "--rpc-username",
641            "alice",
642            "--rpc-password",
643            "secret",
644        ])
645        .unwrap();
646
647        assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
648        assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
649        assert_eq!(
650            args.ingestion.rpc_api_url,
651            Some(Url::parse("http://localhost:8080").unwrap())
652        );
653    }
654
655    #[test]
656    fn test_args_credentials_require_rpc_url() {
657        let err = TestArgs::try_parse_from([
658            "cmd",
659            "--rpc-username",
660            "alice",
661            "--rpc-password",
662            "secret",
663        ])
664        .unwrap_err();
665
666        assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
667    }
668
669    #[test]
670    fn test_args_remote_store_headers() {
671        let args = TestArgs::try_parse_from([
672            "cmd",
673            "--remote-store-gcs",
674            "bucket",
675            "--remote-store-header",
676            "x-goog-user-project:my-project",
677            "--remote-store-header",
678            "authorization:Bearer abc:def",
679        ])
680        .unwrap();
681
682        assert_eq!(args.ingestion.remote_store_headers.len(), 2);
683        assert_eq!(
684            args.ingestion.remote_store_headers[0].0,
685            HeaderName::from_static("x-goog-user-project")
686        );
687        assert_eq!(
688            args.ingestion.remote_store_headers[0].1,
689            HeaderValue::from_static("my-project")
690        );
691        assert_eq!(
692            args.ingestion.remote_store_headers[1].0,
693            HeaderName::from_static("authorization")
694        );
695        assert_eq!(
696            args.ingestion.remote_store_headers[1].1,
697            HeaderValue::from_static("Bearer abc:def")
698        );
699    }
700
701    #[test]
702    fn test_args_remote_store_header_requires_delimiter() {
703        let err = TestArgs::try_parse_from([
704            "cmd",
705            "--remote-store-gcs",
706            "bucket",
707            "--remote-store-header",
708            "x-goog-user-project",
709        ])
710        .unwrap_err();
711
712        assert_eq!(err.kind(), ErrorKind::ValueValidation);
713    }
714
715    #[test]
716    fn test_args_remote_store_header_rejects_invalid_name() {
717        let err = TestArgs::try_parse_from([
718            "cmd",
719            "--remote-store-gcs",
720            "bucket",
721            "--remote-store-header",
722            "bad name:value",
723        ])
724        .unwrap_err();
725
726        assert_eq!(err.kind(), ErrorKind::ValueValidation);
727    }
728
729    #[test]
730    fn test_args_remote_store_header_rejects_invalid_value() {
731        let err = TestArgs::try_parse_from([
732            "cmd",
733            "--remote-store-gcs",
734            "bucket",
735            "--remote-store-header",
736            "x-test:bad\nvalue",
737        ])
738        .unwrap_err();
739
740        assert_eq!(err.kind(), ErrorKind::ValueValidation);
741    }
742
743    #[tokio::test]
744    async fn test_checkpoint_checkpoint_success() {
745        let (client, mock) = setup_test();
746
747        mock.checkpoints.insert(1, test_checkpoint_with_data(1));
748
749        let result = client.checkpoint(1).await.unwrap();
750        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
751        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
752        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
753        assert_eq!(client.metrics.total_ingested_transactions.get(), 1);
754        assert_eq!(client.metrics.total_ingested_events.get(), 1);
755        // 1 created object + 2 gas object versions (input + output)
756        assert_eq!(client.metrics.total_ingested_objects.get(), 3);
757    }
758
759    #[tokio::test]
760    async fn test_checkpoint_not_found() {
761        let (client, _) = setup_test();
762
763        // Try to fetch non-existent checkpoint
764        let result = client.checkpoint(1).await;
765        assert!(matches!(result, Err(IE::NotFound(1))));
766        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 0);
767        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
768        assert_eq!(client.metrics.total_ingested_events.get(), 0);
769        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
770    }
771
772    #[tokio::test]
773    async fn test_checkpoint_fetch_error_with_retry() {
774        let (client, mock) = setup_test();
775
776        mock.checkpoints.insert(1, test_checkpoint(1));
777        mock.fetch_failures.insert(1, 2);
778
779        // Fetch and verify it succeeds after retries
780        let result = client.checkpoint(1).await.unwrap();
781        assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
782        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
783
784        // Verify that exactly 2 retries were recorded
785        let retries = client
786            .metrics
787            .total_ingested_transient_retries
788            .with_label_values(&["fetch"])
789            .get();
790        assert_eq!(retries, 2);
791        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
792        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
793        assert_eq!(client.metrics.total_ingested_events.get(), 0);
794        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
795    }
796
797    #[tokio::test]
798    async fn test_checkpoint_decode_error_with_retry() {
799        let (client, mock) = setup_test();
800
801        mock.checkpoints.insert(1, test_checkpoint(1));
802        mock.decode_failures.insert(1, 2);
803
804        // Fetch and verify it succeeds after retries
805        let result = client.checkpoint(1).await.unwrap();
806        assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
807        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
808
809        // Verify that exactly 2 retries were recorded
810        let retries = client
811            .metrics
812            .total_ingested_transient_retries
813            .with_label_values(&["deserialization"])
814            .get();
815        assert_eq!(retries, 2);
816        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
817        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
818        assert_eq!(client.metrics.total_ingested_events.get(), 0);
819        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
820    }
821
822    #[tokio::test]
823    async fn test_wait_for_checkpoint_with_retry() {
824        let (client, mock) = setup_test();
825
826        mock.checkpoints.insert(1, test_checkpoint(1));
827        mock.not_found_failures.insert(1, 1);
828
829        // Wait for checkpoint with short retry interval
830        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
831        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
832        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
833
834        // Verify that exactly 1 retry was recorded
835        let retries = client.metrics.total_ingested_not_found_retries.get();
836        assert_eq!(retries, 1);
837        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
838        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
839        assert_eq!(client.metrics.total_ingested_events.get(), 0);
840        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
841    }
842
843    #[tokio::test]
844    async fn test_wait_for_checkpoint_instant() {
845        let (client, mock) = setup_test();
846
847        mock.checkpoints.insert(1, test_checkpoint(1));
848
849        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
850        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
851        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
852        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
853        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
854        assert_eq!(client.metrics.total_ingested_events.get(), 0);
855        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
856    }
857}