sui_indexer_alt_framework/ingestion/
ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::future::Future;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use backoff::Error as BE;
11use backoff::ExponentialBackoff;
12use backoff::backoff::Constant;
13use clap::ArgGroup;
14use mysten_network::callback::CallbackLayer;
15use object_store::ClientOptions;
16use object_store::ObjectStore;
17use object_store::aws::AmazonS3Builder;
18use object_store::azure::MicrosoftAzureBuilder;
19use object_store::gcp::GoogleCloudStorageBuilder;
20use object_store::http::HttpBuilder;
21use object_store::local::LocalFileSystem;
22use prometheus::Histogram;
23use sui_futures::future::with_slow_future_monitor;
24use sui_rpc::Client;
25use sui_rpc::client::HeadersInterceptor;
26use sui_types::digests::ChainIdentifier;
27use tokio::sync::OnceCell;
28use tracing::debug;
29use tracing::warn;
30use url::Url;
31
32use crate::ingestion::Error as IE;
33use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
34use crate::ingestion::Result as IngestionResult;
35use crate::ingestion::byte_count::ByteCountMakeCallbackHandler;
36use crate::ingestion::decode;
37use crate::ingestion::store_client::StoreIngestionClient;
38use crate::metrics::CheckpointLagMetricReporter;
39use crate::metrics::IngestionMetrics;
40use crate::types::full_checkpoint_content::Checkpoint;
41
42/// Wait at most this long between retries for transient errors.
43const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
44
45/// Threshold for logging warnings about slow HTTP operations during checkpoint fetching.
46///
47/// Operations that take longer than this duration will trigger a warning log, but will
48/// continue executing without being canceled. This helps identify network issues or
49/// slow remote stores without interrupting the ingestion process.
50const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
51
52#[async_trait]
53pub(crate) trait IngestionClientTrait: Send + Sync {
54    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier>;
55
56    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
57
58    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64>;
59}
60
61#[derive(clap::Args, Clone, Debug)]
62#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
63pub struct IngestionClientArgs {
64    /// Remote Store to fetch checkpoints from over HTTP.
65    #[arg(long, group = "source")]
66    pub remote_store_url: Option<Url>,
67
68    /// Fetch checkpoints from AWS S3. Provide the bucket name or endpoint-and-bucket.
69    /// (env: AWS_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
70    #[arg(long, group = "source")]
71    pub remote_store_s3: Option<String>,
72
73    /// Fetch checkpoints from Google Cloud Storage. Provide the bucket name.
74    /// (env: GOOGLE_SERVICE_ACCOUNT_PATH)
75    #[arg(long, group = "source")]
76    pub remote_store_gcs: Option<String>,
77
78    /// Fetch checkpoints from Azure Blob Storage. Provide the container name.
79    /// (env: AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCESS_KEY)
80    #[arg(long, group = "source")]
81    pub remote_store_azure: Option<String>,
82
83    /// Path to the local ingestion directory.
84    #[arg(long, group = "source")]
85    pub local_ingestion_path: Option<PathBuf>,
86
87    /// Sui fullnode gRPC url to fetch checkpoints from.
88    #[arg(long, group = "source")]
89    pub rpc_api_url: Option<Url>,
90
91    /// Optional username for the gRPC service.
92    #[arg(long, env, requires = "rpc_api_url")]
93    pub rpc_username: Option<String>,
94
95    /// Optional password for the gRPC service.
96    #[arg(long, env, requires = "rpc_api_url")]
97    pub rpc_password: Option<String>,
98
99    /// How long to wait for a checkpoint file to be downloaded (milliseconds). Set to 0 to disable
100    /// the timeout.
101    #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
102    pub checkpoint_timeout_ms: u64,
103
104    /// How long to wait while establishing a connection to the checkpoint store (milliseconds).
105    /// Set to 0 to disable the timeout.
106    #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
107    pub checkpoint_connection_timeout_ms: u64,
108}
109
110impl Default for IngestionClientArgs {
111    fn default() -> Self {
112        Self {
113            remote_store_url: None,
114            remote_store_s3: None,
115            remote_store_gcs: None,
116            remote_store_azure: None,
117            local_ingestion_path: None,
118            rpc_api_url: None,
119            rpc_username: None,
120            rpc_password: None,
121            checkpoint_timeout_ms: 120_000,
122            checkpoint_connection_timeout_ms: 120_000,
123        }
124    }
125}
126
127impl IngestionClientArgs {
128    fn client_options(&self) -> ClientOptions {
129        let mut options = ClientOptions::default();
130        options = if self.checkpoint_timeout_ms == 0 {
131            options.with_timeout_disabled()
132        } else {
133            let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
134            options.with_timeout(timeout)
135        };
136        options = if self.checkpoint_connection_timeout_ms == 0 {
137            options.with_connect_timeout_disabled()
138        } else {
139            let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
140            options.with_connect_timeout(timeout)
141        };
142        options
143    }
144}
145
146#[derive(thiserror::Error, Debug)]
147pub enum CheckpointError {
148    #[error("Checkpoint not found")]
149    NotFound,
150    #[error("Failed to fetch checkpoint: {0}")]
151    Fetch(#[from] anyhow::Error),
152    #[error("Failed to decode checkpoint: {0}")]
153    Decode(#[from] decode::Error),
154}
155
156pub type CheckpointResult = Result<Checkpoint, CheckpointError>;
157
158#[derive(Clone)]
159pub struct IngestionClient {
160    client: Arc<dyn IngestionClientTrait>,
161    /// Wrap the metrics in an `Arc` to keep copies of the client cheap.
162    metrics: Arc<IngestionMetrics>,
163    checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
164    chain_id: OnceCell<ChainIdentifier>,
165}
166
167#[derive(Clone, Debug)]
168pub struct CheckpointEnvelope {
169    pub checkpoint: Arc<Checkpoint>,
170    pub chain_id: ChainIdentifier,
171}
172
173impl IngestionClient {
174    /// Construct a new ingestion client. Its source is determined by `args`.
175    pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
176        // TODO: Support stacking multiple ingestion clients for redundancy/failover.
177        let retry = super::store_client::retry_config();
178        let client = if let Some(url) = args.remote_store_url.as_ref() {
179            let store = HttpBuilder::new()
180                .with_url(url.to_string())
181                .with_client_options(args.client_options().with_allow_http(true))
182                .with_retry(retry)
183                .build()
184                .map(Arc::new)?;
185            IngestionClient::with_store(store, metrics.clone())?
186        } else if let Some(bucket) = args.remote_store_s3.as_ref() {
187            let store = AmazonS3Builder::from_env()
188                .with_client_options(args.client_options())
189                .with_retry(retry)
190                .with_imdsv1_fallback()
191                .with_bucket_name(bucket)
192                .build()
193                .map(Arc::new)?;
194            IngestionClient::with_store(store, metrics.clone())?
195        } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
196            let store = GoogleCloudStorageBuilder::from_env()
197                .with_client_options(args.client_options())
198                .with_retry(retry)
199                .with_bucket_name(bucket)
200                .build()
201                .map(Arc::new)?;
202            IngestionClient::with_store(store, metrics.clone())?
203        } else if let Some(container) = args.remote_store_azure.as_ref() {
204            let store = MicrosoftAzureBuilder::from_env()
205                .with_client_options(args.client_options())
206                .with_retry(retry)
207                .with_container_name(container)
208                .build()
209                .map(Arc::new)?;
210            IngestionClient::with_store(store, metrics.clone())?
211        } else if let Some(path) = args.local_ingestion_path.as_ref() {
212            let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
213            IngestionClient::with_store(store, metrics.clone())?
214        } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
215            IngestionClient::with_grpc(
216                rpc_api_url.clone(),
217                args.rpc_username,
218                args.rpc_password,
219                metrics.clone(),
220            )?
221        } else {
222            panic!(
223                "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
224                local_ingestion_path or rpc_api_url must be provided"
225            );
226        };
227
228        Ok(client)
229    }
230
231    /// An ingestion client that fetches checkpoints from a remote object store.
232    pub fn with_store(
233        store: Arc<dyn ObjectStore>,
234        metrics: Arc<IngestionMetrics>,
235    ) -> IngestionResult<Self> {
236        let client = Arc::new(StoreIngestionClient::new(
237            store,
238            Some(metrics.total_ingested_bytes.clone()),
239        ));
240        Ok(Self::new_impl(client, metrics))
241    }
242
243    /// An ingestion client that fetches checkpoints from a fullnode, over gRPC.
244    pub fn with_grpc(
245        url: Url,
246        username: Option<String>,
247        password: Option<String>,
248        metrics: Arc<IngestionMetrics>,
249    ) -> IngestionResult<Self> {
250        let byte_count_layer = CallbackLayer::new(ByteCountMakeCallbackHandler::new(
251            metrics.total_ingested_bytes.clone(),
252        ));
253        let client = Client::new(url.to_string())?
254            .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
255            .request_layer(byte_count_layer);
256        let client = if let Some(username) = username {
257            let mut headers = HeadersInterceptor::new();
258            headers.basic_auth(username, password);
259            client.with_headers(headers)
260        } else {
261            client
262        };
263        Ok(Self::new_impl(Arc::new(client), metrics))
264    }
265
266    pub(crate) fn new_impl(
267        client: Arc<dyn IngestionClientTrait>,
268        metrics: Arc<IngestionMetrics>,
269    ) -> Self {
270        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
271            metrics.ingested_checkpoint_timestamp_lag.clone(),
272            metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
273            metrics.latest_ingested_checkpoint.clone(),
274        );
275        IngestionClient {
276            client,
277            metrics,
278            checkpoint_lag_reporter,
279            chain_id: OnceCell::new(),
280        }
281    }
282
283    /// Fetch checkpoint data by sequence number.
284    ///
285    /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if
286    /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled
287    /// by the `retry_interval` parameter.
288    pub async fn wait_for(
289        &self,
290        checkpoint: u64,
291        retry_interval: Duration,
292    ) -> IngestionResult<CheckpointEnvelope> {
293        let backoff = Constant::new(retry_interval);
294        let fetch = || async move {
295            use backoff::Error as BE;
296            self.checkpoint(checkpoint).await.map_err(|e| match e {
297                IE::NotFound(checkpoint) => {
298                    debug!(checkpoint, "Checkpoint not found, retrying...");
299                    self.metrics.total_ingested_not_found_retries.inc();
300                    BE::transient(e)
301                }
302                e => BE::permanent(e),
303            })
304        };
305
306        backoff::future::retry(backoff, fetch).await
307    }
308
309    /// Fetch checkpoint data by sequence number.
310    ///
311    /// Repeatedly retries transient errors with an exponential backoff (up to
312    /// `MAX_TRANSIENT_RETRY_INTERVAL`). Transient errors are defined by the client
313    /// implementation that returns a [CheckpointError::Fetch] or [CheckpointError::Decode] error
314    /// variant.
315    ///
316    /// The function will immediately return if the checkpoint is not found.
317    pub async fn checkpoint(&self, cp_sequence_number: u64) -> IngestionResult<CheckpointEnvelope> {
318        let client = self.client.clone();
319        let checkpoint_data_fut = retry_transient_with_slow_monitor(
320            "checkpoint",
321            move || {
322                let client = client.clone();
323                async move {
324                    client
325                        .checkpoint(cp_sequence_number)
326                        .await
327                        .map_err(|err| match err {
328                            // Not found errors are marked as permanent here, but retried in
329                            // `wait_for` in case the checkpoint becomes available in the future.
330                            CheckpointError::NotFound => {
331                                BE::permanent(IE::NotFound(cp_sequence_number))
332                            }
333                            // Retry fetch and decode errors in case the root cause is in the
334                            // upstream checkpoint data source. If the upstream checkpoint data
335                            // source is corrected, then the indexer will automatically recover
336                            // the next time the read is attempted.
337                            CheckpointError::Fetch(e) => self.metrics.inc_retry(
338                                cp_sequence_number,
339                                "fetch",
340                                IE::FetchError(cp_sequence_number, e),
341                            ),
342                            CheckpointError::Decode(e) => self.metrics.inc_retry(
343                                cp_sequence_number,
344                                e.reason(),
345                                IE::DecodeError(cp_sequence_number, e.into()),
346                            ),
347                        })
348                }
349            },
350            &self.metrics.ingested_checkpoint_latency,
351        );
352
353        let client = self.client.clone();
354        let chain_id_fut = self.chain_id.get_or_try_init(|| {
355            retry_transient_with_slow_monitor(
356                "chain_id",
357                move || {
358                    let client = client.clone();
359                    async move {
360                        client
361                            .chain_id()
362                            .await
363                            .map_err(|e| BE::transient(IE::ChainIdError(cp_sequence_number, e)))
364                    }
365                },
366                &self.metrics.ingested_chain_id_latency,
367            )
368        });
369
370        let (checkpoint, chain_id) = tokio::try_join!(checkpoint_data_fut, chain_id_fut)?;
371
372        self.checkpoint_lag_reporter
373            .report_lag(cp_sequence_number, checkpoint.summary.timestamp_ms);
374
375        self.metrics.total_ingested_checkpoints.inc();
376
377        self.metrics
378            .total_ingested_transactions
379            .inc_by(checkpoint.transactions.len() as u64);
380
381        self.metrics.total_ingested_events.inc_by(
382            checkpoint
383                .transactions
384                .iter()
385                .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
386                .sum(),
387        );
388
389        self.metrics
390            .total_ingested_objects
391            .inc_by(checkpoint.object_set.len() as u64);
392
393        Ok(CheckpointEnvelope {
394            checkpoint: Arc::new(checkpoint),
395            chain_id: *chain_id,
396        })
397    }
398
399    pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
400        self.client.latest_checkpoint_number().await
401    }
402}
403
404/// Keep backing off until we are waiting for the max interval, but don't give up.
405pub(crate) fn transient_backoff() -> ExponentialBackoff {
406    ExponentialBackoff {
407        max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
408        max_elapsed_time: None,
409        ..Default::default()
410    }
411}
412
413/// Retry a fallible async operation with exponential backoff and slow-operation monitoring.
414/// Records the total time (including retries) in the provided latency histogram.
415pub(crate) async fn retry_transient_with_slow_monitor<F, Fut, T>(
416    operation: &str,
417    make_future: F,
418    latency: &Histogram,
419) -> IngestionResult<T>
420where
421    F: Fn() -> Fut,
422    Fut: Future<Output = Result<T, backoff::Error<IE>>>,
423{
424    let request = || {
425        let fut = make_future();
426        async move {
427            with_slow_future_monitor(fut, SLOW_OPERATION_WARNING_THRESHOLD, || {
428                warn!(
429                    operation,
430                    threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
431                    "Slow operation detected"
432                );
433            })
434            .await
435        }
436    };
437
438    let guard = latency.start_timer();
439    let data = backoff::future::retry(transient_backoff(), request).await?;
440    let elapsed = guard.stop_and_record();
441
442    debug!(
443        operation,
444        elapsed_ms = elapsed * 1000.0,
445        "Fetched operation"
446    );
447
448    Ok(data)
449}
450
451#[cfg(test)]
452mod tests {
453    use std::sync::Arc;
454    use std::time::Duration;
455
456    use clap::Parser;
457    use clap::error::ErrorKind;
458    use dashmap::DashMap;
459    use prometheus::Registry;
460    use sui_types::digests::CheckpointDigest;
461    use sui_types::event::Event;
462    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
463
464    use crate::ingestion::decode;
465    use crate::ingestion::test_utils::test_checkpoint_data;
466
467    use super::*;
468
469    fn test_checkpoint(seq: u64) -> Checkpoint {
470        let bytes = test_checkpoint_data(seq);
471        decode::checkpoint(&bytes).unwrap()
472    }
473
474    /// Build a checkpoint with one transaction containing one event and one created object.
475    fn test_checkpoint_with_data(seq: u64) -> Checkpoint {
476        TestCheckpointBuilder::new(seq)
477            .start_transaction(0)
478            .create_owned_object(0)
479            .with_events(vec![Event::random_for_testing()])
480            .finish_transaction()
481            .build_checkpoint()
482    }
483
484    #[derive(Debug, Parser)]
485    struct TestArgs {
486        #[clap(flatten)]
487        ingestion: IngestionClientArgs,
488    }
489
490    /// Mock implementation of IngestionClientTrait for testing
491    #[derive(Default)]
492    struct MockIngestionClient {
493        checkpoints: DashMap<u64, Checkpoint>,
494        not_found_failures: DashMap<u64, usize>,
495        fetch_failures: DashMap<u64, usize>,
496        decode_failures: DashMap<u64, usize>,
497    }
498
499    impl MockIngestionClient {
500        fn mock_chain_id() -> ChainIdentifier {
501            CheckpointDigest::new([1; 32]).into()
502        }
503    }
504
505    #[async_trait]
506    impl IngestionClientTrait for MockIngestionClient {
507        async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
508            Ok(Self::mock_chain_id())
509        }
510
511        async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
512            // Check for not found failures
513            if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
514                && *remaining > 0
515            {
516                *remaining -= 1;
517                return Err(CheckpointError::NotFound);
518            }
519
520            // Check for fetch errors
521            if let Some(mut remaining) = self.fetch_failures.get_mut(&checkpoint)
522                && *remaining > 0
523            {
524                *remaining -= 1;
525                return Err(CheckpointError::Fetch(anyhow::anyhow!("Mock fetch error")));
526            }
527
528            // Check for decode errors
529            if let Some(mut remaining) = self.decode_failures.get_mut(&checkpoint)
530                && *remaining > 0
531            {
532                *remaining -= 1;
533                return Err(CheckpointError::Decode(decode::Error::Deserialization(
534                    prost::DecodeError::new("Mock deserialization error"),
535                )));
536            }
537
538            // Return the checkpoint data if it exists
539            self.checkpoints
540                .get(&checkpoint)
541                .as_deref()
542                .cloned()
543                .ok_or(CheckpointError::NotFound)
544        }
545
546        async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
547            Ok(0)
548        }
549    }
550
551    fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
552        let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
553        let metrics = IngestionMetrics::new(None, &registry);
554        let mock_client = Arc::new(MockIngestionClient::default());
555        let client = IngestionClient::new_impl(mock_client.clone(), metrics);
556        (client, mock_client)
557    }
558
559    #[test]
560    fn test_args_multiple_ingestion_sources_are_rejected() {
561        let err = TestArgs::try_parse_from([
562            "cmd",
563            "--remote-store-url",
564            "https://example.com",
565            "--rpc-api-url",
566            "http://localhost:8080",
567        ])
568        .unwrap_err();
569
570        assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
571    }
572
573    #[test]
574    fn test_args_optional_credentials() {
575        let args = TestArgs::try_parse_from([
576            "cmd",
577            "--rpc-api-url",
578            "http://localhost:8080",
579            "--rpc-username",
580            "alice",
581            "--rpc-password",
582            "secret",
583        ])
584        .unwrap();
585
586        assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
587        assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
588        assert_eq!(
589            args.ingestion.rpc_api_url,
590            Some(Url::parse("http://localhost:8080").unwrap())
591        );
592    }
593
594    #[test]
595    fn test_args_credentials_require_rpc_url() {
596        let err = TestArgs::try_parse_from([
597            "cmd",
598            "--rpc-username",
599            "alice",
600            "--rpc-password",
601            "secret",
602        ])
603        .unwrap_err();
604
605        assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
606    }
607
608    #[tokio::test]
609    async fn test_checkpoint_checkpoint_success() {
610        let (client, mock) = setup_test();
611
612        mock.checkpoints.insert(1, test_checkpoint_with_data(1));
613
614        let result = client.checkpoint(1).await.unwrap();
615        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
616        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
617        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
618        assert_eq!(client.metrics.total_ingested_transactions.get(), 1);
619        assert_eq!(client.metrics.total_ingested_events.get(), 1);
620        // 1 created object + 2 gas object versions (input + output)
621        assert_eq!(client.metrics.total_ingested_objects.get(), 3);
622    }
623
624    #[tokio::test]
625    async fn test_checkpoint_not_found() {
626        let (client, _) = setup_test();
627
628        // Try to fetch non-existent checkpoint
629        let result = client.checkpoint(1).await;
630        assert!(matches!(result, Err(IE::NotFound(1))));
631        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 0);
632        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
633        assert_eq!(client.metrics.total_ingested_events.get(), 0);
634        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
635    }
636
637    #[tokio::test]
638    async fn test_checkpoint_fetch_error_with_retry() {
639        let (client, mock) = setup_test();
640
641        mock.checkpoints.insert(1, test_checkpoint(1));
642        mock.fetch_failures.insert(1, 2);
643
644        // Fetch and verify it succeeds after retries
645        let result = client.checkpoint(1).await.unwrap();
646        assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
647        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
648
649        // Verify that exactly 2 retries were recorded
650        let retries = client
651            .metrics
652            .total_ingested_transient_retries
653            .with_label_values(&["fetch"])
654            .get();
655        assert_eq!(retries, 2);
656        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
657        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
658        assert_eq!(client.metrics.total_ingested_events.get(), 0);
659        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
660    }
661
662    #[tokio::test]
663    async fn test_checkpoint_decode_error_with_retry() {
664        let (client, mock) = setup_test();
665
666        mock.checkpoints.insert(1, test_checkpoint(1));
667        mock.decode_failures.insert(1, 2);
668
669        // Fetch and verify it succeeds after retries
670        let result = client.checkpoint(1).await.unwrap();
671        assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
672        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
673
674        // Verify that exactly 2 retries were recorded
675        let retries = client
676            .metrics
677            .total_ingested_transient_retries
678            .with_label_values(&["deserialization"])
679            .get();
680        assert_eq!(retries, 2);
681        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
682        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
683        assert_eq!(client.metrics.total_ingested_events.get(), 0);
684        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
685    }
686
687    #[tokio::test]
688    async fn test_wait_for_checkpoint_with_retry() {
689        let (client, mock) = setup_test();
690
691        mock.checkpoints.insert(1, test_checkpoint(1));
692        mock.not_found_failures.insert(1, 1);
693
694        // Wait for checkpoint with short retry interval
695        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
696        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
697        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
698
699        // Verify that exactly 1 retry was recorded
700        let retries = client.metrics.total_ingested_not_found_retries.get();
701        assert_eq!(retries, 1);
702        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
703        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
704        assert_eq!(client.metrics.total_ingested_events.get(), 0);
705        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
706    }
707
708    #[tokio::test]
709    async fn test_wait_for_checkpoint_instant() {
710        let (client, mock) = setup_test();
711
712        mock.checkpoints.insert(1, test_checkpoint(1));
713
714        let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
715        assert_eq!(result.checkpoint.summary.sequence_number(), &1);
716        assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
717        assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
718        assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
719        assert_eq!(client.metrics.total_ingested_events.get(), 0);
720        assert_eq!(client.metrics.total_ingested_objects.get(), 0);
721    }
722}