1use std::future::Future;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use backoff::Error as BE;
11use backoff::ExponentialBackoff;
12use backoff::backoff::Constant;
13use clap::ArgGroup;
14use mysten_network::callback::CallbackLayer;
15use object_store::ClientOptions;
16use object_store::ObjectStore;
17use object_store::aws::AmazonS3Builder;
18use object_store::azure::MicrosoftAzureBuilder;
19use object_store::gcp::GoogleCloudStorageBuilder;
20use object_store::http::HttpBuilder;
21use object_store::local::LocalFileSystem;
22use prometheus::Histogram;
23use reqwest::header::HeaderMap;
24use reqwest::header::HeaderName;
25use reqwest::header::HeaderValue;
26use sui_futures::future::with_slow_future_monitor;
27use sui_rpc::Client;
28use sui_rpc::client::HeadersInterceptor;
29use sui_types::digests::ChainIdentifier;
30use tokio::sync::OnceCell;
31use tracing::debug;
32use tracing::warn;
33use url::Url;
34
35use crate::ingestion::Error as IE;
36use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
37use crate::ingestion::Result as IngestionResult;
38use crate::ingestion::byte_count::ByteCountMakeCallbackHandler;
39use crate::ingestion::decode;
40use crate::ingestion::store_client::StoreIngestionClient;
41use crate::metrics::CheckpointLagMetricReporter;
42use crate::metrics::IngestionMetrics;
43use crate::types::full_checkpoint_content::Checkpoint;
44
45const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
47
48const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
54
55#[async_trait]
56pub trait IngestionClientTrait: Send + Sync {
57 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier>;
58
59 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
60
61 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64>;
62}
63
64#[derive(clap::Args, Clone, Debug)]
65#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
66pub struct IngestionClientArgs {
67 #[arg(long, group = "source")]
69 pub remote_store_url: Option<Url>,
70
71 #[arg(long, group = "source")]
74 pub remote_store_s3: Option<String>,
75
76 #[arg(long, group = "source")]
79 pub remote_store_gcs: Option<String>,
80
81 #[arg(long, group = "source")]
84 pub remote_store_azure: Option<String>,
85
86 #[arg(long = "remote-store-header", value_parser = parse_remote_store_header)]
89 pub remote_store_headers: Vec<(HeaderName, HeaderValue)>,
90
91 #[arg(long, group = "source")]
93 pub local_ingestion_path: Option<PathBuf>,
94
95 #[arg(long, group = "source")]
97 pub rpc_api_url: Option<Url>,
98
99 #[arg(long, env, requires = "rpc_api_url")]
101 pub rpc_username: Option<String>,
102
103 #[arg(long, env, requires = "rpc_api_url")]
105 pub rpc_password: Option<String>,
106
107 #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
110 pub checkpoint_timeout_ms: u64,
111
112 #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
115 pub checkpoint_connection_timeout_ms: u64,
116}
117
118impl Default for IngestionClientArgs {
119 fn default() -> Self {
120 Self {
121 remote_store_url: None,
122 remote_store_s3: None,
123 remote_store_gcs: None,
124 remote_store_azure: None,
125 remote_store_headers: vec![],
126 local_ingestion_path: None,
127 rpc_api_url: None,
128 rpc_username: None,
129 rpc_password: None,
130 checkpoint_timeout_ms: 120_000,
131 checkpoint_connection_timeout_ms: 120_000,
132 }
133 }
134}
135
136impl IngestionClientArgs {
137 fn client_options(&self) -> ClientOptions {
138 let mut options = ClientOptions::default();
139
140 options = if self.checkpoint_timeout_ms == 0 {
141 options.with_timeout_disabled()
142 } else {
143 let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
144 options.with_timeout(timeout)
145 };
146
147 options = if self.checkpoint_connection_timeout_ms == 0 {
148 options.with_connect_timeout_disabled()
149 } else {
150 let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
151 options.with_connect_timeout(timeout)
152 };
153
154 options = if !self.remote_store_headers.is_empty() {
155 let mut headers = HeaderMap::new();
156 for (name, value) in &self.remote_store_headers {
157 headers.append(name.clone(), value.clone());
158 }
159
160 options.with_default_headers(headers)
161 } else {
162 options
163 };
164
165 options
166 }
167}
168
169#[derive(thiserror::Error, Debug)]
170pub enum CheckpointError {
171 #[error("Checkpoint not found")]
172 NotFound,
173 #[error("Failed to fetch checkpoint: {0}")]
174 Fetch(#[from] anyhow::Error),
175 #[error("Failed to decode checkpoint: {0}")]
176 Decode(#[from] decode::Error),
177}
178
179pub type CheckpointResult = Result<Checkpoint, CheckpointError>;
180
181#[derive(Clone)]
182pub struct IngestionClient {
183 client: Arc<dyn IngestionClientTrait>,
184 metrics: Arc<IngestionMetrics>,
186 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
187 chain_id: OnceCell<ChainIdentifier>,
188}
189
190#[derive(Clone, Debug)]
191pub struct CheckpointEnvelope {
192 pub checkpoint: Arc<Checkpoint>,
193 pub chain_id: ChainIdentifier,
194}
195
196impl IngestionClient {
197 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
199 let retry = super::store_client::retry_config();
201 let client = if let Some(url) = args.remote_store_url.as_ref() {
202 let store = HttpBuilder::new()
203 .with_url(url.to_string())
204 .with_client_options(args.client_options().with_allow_http(true))
205 .with_retry(retry)
206 .build()
207 .map(Arc::new)?;
208 IngestionClient::with_store(store, metrics.clone())?
209 } else if let Some(bucket) = args.remote_store_s3.as_ref() {
210 let store = AmazonS3Builder::from_env()
211 .with_client_options(args.client_options())
212 .with_retry(retry)
213 .with_imdsv1_fallback()
214 .with_bucket_name(bucket)
215 .build()
216 .map(Arc::new)?;
217 IngestionClient::with_store(store, metrics.clone())?
218 } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
219 let store = GoogleCloudStorageBuilder::from_env()
220 .with_client_options(args.client_options())
221 .with_retry(retry)
222 .with_bucket_name(bucket)
223 .build()
224 .map(Arc::new)?;
225 IngestionClient::with_store(store, metrics.clone())?
226 } else if let Some(container) = args.remote_store_azure.as_ref() {
227 let store = MicrosoftAzureBuilder::from_env()
228 .with_client_options(args.client_options())
229 .with_retry(retry)
230 .with_container_name(container)
231 .build()
232 .map(Arc::new)?;
233 IngestionClient::with_store(store, metrics.clone())?
234 } else if let Some(path) = args.local_ingestion_path.as_ref() {
235 let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
236 IngestionClient::with_store(store, metrics.clone())?
237 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
238 IngestionClient::with_grpc(
239 rpc_api_url.clone(),
240 args.rpc_username,
241 args.rpc_password,
242 metrics.clone(),
243 )?
244 } else {
245 panic!(
246 "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
247 local_ingestion_path or rpc_api_url must be provided"
248 );
249 };
250
251 Ok(client)
252 }
253
254 pub fn with_store(
256 store: Arc<dyn ObjectStore>,
257 metrics: Arc<IngestionMetrics>,
258 ) -> IngestionResult<Self> {
259 let client = Arc::new(StoreIngestionClient::new(
260 store,
261 Some(metrics.total_ingested_bytes.clone()),
262 ));
263 Ok(Self::from_trait(client, metrics))
264 }
265
266 pub fn with_grpc(
268 url: Url,
269 username: Option<String>,
270 password: Option<String>,
271 metrics: Arc<IngestionMetrics>,
272 ) -> IngestionResult<Self> {
273 let byte_count_layer = CallbackLayer::new(ByteCountMakeCallbackHandler::new(
274 metrics.total_ingested_bytes.clone(),
275 ));
276 let client = Client::new(url.to_string())?
277 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
278 .request_layer(byte_count_layer);
279 let client = if let Some(username) = username {
280 let mut headers = HeadersInterceptor::new();
281 headers.basic_auth(username, password);
282 client.with_headers(headers)
283 } else {
284 client
285 };
286 Ok(Self::from_trait(Arc::new(client), metrics))
287 }
288
289 pub fn metrics(&self) -> &Arc<IngestionMetrics> {
296 &self.metrics
297 }
298
299 pub fn from_trait(
304 client: Arc<dyn IngestionClientTrait>,
305 metrics: Arc<IngestionMetrics>,
306 ) -> Self {
307 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
308 metrics.ingested_checkpoint_timestamp_lag.clone(),
309 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
310 metrics.latest_ingested_checkpoint.clone(),
311 );
312 IngestionClient {
313 client,
314 metrics,
315 checkpoint_lag_reporter,
316 chain_id: OnceCell::new(),
317 }
318 }
319
320 pub async fn wait_for(
326 &self,
327 checkpoint: u64,
328 retry_interval: Duration,
329 ) -> IngestionResult<CheckpointEnvelope> {
330 let backoff = Constant::new(retry_interval);
331 let fetch = || async move {
332 use backoff::Error as BE;
333 self.checkpoint(checkpoint).await.map_err(|e| match e {
334 IE::NotFound(checkpoint) => {
335 debug!(checkpoint, "Checkpoint not found, retrying...");
336 self.metrics.total_ingested_not_found_retries.inc();
337 BE::transient(e)
338 }
339 e => BE::permanent(e),
340 })
341 };
342
343 backoff::future::retry(backoff, fetch).await
344 }
345
346 pub async fn checkpoint(&self, cp_sequence_number: u64) -> IngestionResult<CheckpointEnvelope> {
355 let client = self.client.clone();
356 let checkpoint_data_fut = retry_transient_with_slow_monitor(
357 "checkpoint",
358 move || {
359 let client = client.clone();
360 async move {
361 client
362 .checkpoint(cp_sequence_number)
363 .await
364 .map_err(|err| match err {
365 CheckpointError::NotFound => {
368 BE::permanent(IE::NotFound(cp_sequence_number))
369 }
370 CheckpointError::Fetch(e) => self.metrics.inc_retry(
375 cp_sequence_number,
376 "fetch",
377 IE::FetchError(cp_sequence_number, e),
378 ),
379 CheckpointError::Decode(e) => self.metrics.inc_retry(
380 cp_sequence_number,
381 e.reason(),
382 IE::DecodeError(cp_sequence_number, e.into()),
383 ),
384 })
385 }
386 },
387 &self.metrics.ingested_checkpoint_latency,
388 );
389
390 let client = self.client.clone();
391 let chain_id_fut = self.chain_id.get_or_try_init(|| {
392 retry_transient_with_slow_monitor(
393 "chain_id",
394 move || {
395 let client = client.clone();
396 async move {
397 client
398 .chain_id()
399 .await
400 .map_err(|e| BE::transient(IE::ChainIdError(cp_sequence_number, e)))
401 }
402 },
403 &self.metrics.ingested_chain_id_latency,
404 )
405 });
406
407 let (checkpoint, chain_id) = tokio::try_join!(checkpoint_data_fut, chain_id_fut)?;
408
409 self.checkpoint_lag_reporter
410 .report_lag(cp_sequence_number, checkpoint.summary.timestamp_ms);
411
412 self.metrics.total_ingested_checkpoints.inc();
413
414 self.metrics
415 .total_ingested_transactions
416 .inc_by(checkpoint.transactions.len() as u64);
417
418 self.metrics.total_ingested_events.inc_by(
419 checkpoint
420 .transactions
421 .iter()
422 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
423 .sum(),
424 );
425
426 self.metrics
427 .total_ingested_objects
428 .inc_by(checkpoint.object_set.len() as u64);
429
430 Ok(CheckpointEnvelope {
431 checkpoint: Arc::new(checkpoint),
432 chain_id: *chain_id,
433 })
434 }
435
436 pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
437 self.client.latest_checkpoint_number().await
438 }
439}
440
441pub(crate) fn transient_backoff() -> ExponentialBackoff {
443 ExponentialBackoff {
444 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
445 max_elapsed_time: None,
446 ..Default::default()
447 }
448}
449
450pub(crate) async fn retry_transient_with_slow_monitor<F, Fut, T>(
453 operation: &str,
454 make_future: F,
455 latency: &Histogram,
456) -> IngestionResult<T>
457where
458 F: Fn() -> Fut,
459 Fut: Future<Output = Result<T, backoff::Error<IE>>>,
460{
461 let request = || {
462 let fut = make_future();
463 async move {
464 with_slow_future_monitor(fut, SLOW_OPERATION_WARNING_THRESHOLD, || {
465 warn!(
466 operation,
467 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
468 "Slow operation detected"
469 );
470 })
471 .await
472 }
473 };
474
475 let guard = latency.start_timer();
476 let data = backoff::future::retry(transient_backoff(), request).await?;
477 let elapsed = guard.stop_and_record();
478
479 debug!(
480 operation,
481 elapsed_ms = elapsed * 1000.0,
482 "Fetched operation"
483 );
484
485 Ok(data)
486}
487
488fn parse_remote_store_header(header: &str) -> Result<(HeaderName, HeaderValue), String> {
489 let (name, value) = header
490 .split_once(':')
491 .ok_or_else(|| "remote store header must be in `<name>:<value>` format".to_string())?;
492
493 let name = HeaderName::from_bytes(name.as_bytes())
494 .map_err(|err| format!("invalid remote store header name `{name}`: {err}"))?;
495 let value = HeaderValue::from_str(value)
496 .map_err(|err| format!("invalid remote store header value for `{name}`: {err}"))?;
497
498 Ok((name, value))
499}
500
501#[cfg(test)]
502pub(crate) mod tests {
503 use std::sync::Arc;
504 use std::time::Duration;
505
506 use clap::Parser;
507 use clap::error::ErrorKind;
508 use dashmap::DashMap;
509 use prometheus::Registry;
510 use sui_types::digests::CheckpointDigest;
511 use sui_types::event::Event;
512 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
513
514 use crate::ingestion::decode;
515 use crate::ingestion::test_utils::test_checkpoint_data;
516
517 use super::*;
518
519 fn test_checkpoint(seq: u64) -> Checkpoint {
520 let bytes = test_checkpoint_data(seq);
521 decode::checkpoint(&bytes).unwrap()
522 }
523
524 fn test_checkpoint_with_data(seq: u64) -> Checkpoint {
526 TestCheckpointBuilder::new(seq)
527 .start_transaction(0)
528 .create_owned_object(0)
529 .with_events(vec![Event::random_for_testing()])
530 .finish_transaction()
531 .build_checkpoint()
532 }
533
534 #[derive(Debug, Parser)]
535 struct TestArgs {
536 #[clap(flatten)]
537 ingestion: IngestionClientArgs,
538 }
539
540 #[derive(Default)]
548 pub(crate) struct MockIngestionClient {
549 pub checkpoints: DashMap<u64, Checkpoint>,
550 pub not_found_failures: DashMap<u64, usize>,
551 pub fetch_failures: DashMap<u64, usize>,
552 pub decode_failures: DashMap<u64, usize>,
553 pub latest_checkpoint: u64,
554 }
555
556 impl MockIngestionClient {
557 pub(crate) fn mock_chain_id() -> ChainIdentifier {
558 CheckpointDigest::new([1; 32]).into()
559 }
560
561 pub(crate) fn insert_checkpoints(&self, range: impl IntoIterator<Item = u64>) {
564 for seq in range {
565 self.checkpoints.insert(seq, test_checkpoint(seq));
566 }
567 }
568 }
569
570 #[async_trait]
571 impl IngestionClientTrait for MockIngestionClient {
572 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
573 Ok(Self::mock_chain_id())
574 }
575
576 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
577 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
578 && *remaining > 0
579 {
580 *remaining -= 1;
581 return Err(CheckpointError::NotFound);
582 }
583
584 if let Some(mut remaining) = self.fetch_failures.get_mut(&checkpoint)
585 && *remaining > 0
586 {
587 *remaining -= 1;
588 return Err(CheckpointError::Fetch(anyhow::anyhow!("Mock fetch error")));
589 }
590
591 if let Some(mut remaining) = self.decode_failures.get_mut(&checkpoint)
592 && *remaining > 0
593 {
594 *remaining -= 1;
595 return Err(CheckpointError::Decode(decode::Error::Deserialization(
596 prost::DecodeError::new("Mock deserialization error"),
597 )));
598 }
599
600 self.checkpoints
601 .get(&checkpoint)
602 .as_deref()
603 .cloned()
604 .ok_or(CheckpointError::NotFound)
605 }
606
607 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
608 Ok(self.latest_checkpoint)
609 }
610 }
611
612 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
613 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
614 let metrics = IngestionMetrics::new(None, ®istry);
615 let mock_client = Arc::new(MockIngestionClient::default());
616 let client = IngestionClient::from_trait(mock_client.clone(), metrics);
617 (client, mock_client)
618 }
619
620 #[test]
621 fn test_args_multiple_ingestion_sources_are_rejected() {
622 let err = TestArgs::try_parse_from([
623 "cmd",
624 "--remote-store-url",
625 "https://example.com",
626 "--rpc-api-url",
627 "http://localhost:8080",
628 ])
629 .unwrap_err();
630
631 assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
632 }
633
634 #[test]
635 fn test_args_optional_credentials() {
636 let args = TestArgs::try_parse_from([
637 "cmd",
638 "--rpc-api-url",
639 "http://localhost:8080",
640 "--rpc-username",
641 "alice",
642 "--rpc-password",
643 "secret",
644 ])
645 .unwrap();
646
647 assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
648 assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
649 assert_eq!(
650 args.ingestion.rpc_api_url,
651 Some(Url::parse("http://localhost:8080").unwrap())
652 );
653 }
654
655 #[test]
656 fn test_args_credentials_require_rpc_url() {
657 let err = TestArgs::try_parse_from([
658 "cmd",
659 "--rpc-username",
660 "alice",
661 "--rpc-password",
662 "secret",
663 ])
664 .unwrap_err();
665
666 assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
667 }
668
669 #[test]
670 fn test_args_remote_store_headers() {
671 let args = TestArgs::try_parse_from([
672 "cmd",
673 "--remote-store-gcs",
674 "bucket",
675 "--remote-store-header",
676 "x-goog-user-project:my-project",
677 "--remote-store-header",
678 "authorization:Bearer abc:def",
679 ])
680 .unwrap();
681
682 assert_eq!(args.ingestion.remote_store_headers.len(), 2);
683 assert_eq!(
684 args.ingestion.remote_store_headers[0].0,
685 HeaderName::from_static("x-goog-user-project")
686 );
687 assert_eq!(
688 args.ingestion.remote_store_headers[0].1,
689 HeaderValue::from_static("my-project")
690 );
691 assert_eq!(
692 args.ingestion.remote_store_headers[1].0,
693 HeaderName::from_static("authorization")
694 );
695 assert_eq!(
696 args.ingestion.remote_store_headers[1].1,
697 HeaderValue::from_static("Bearer abc:def")
698 );
699 }
700
701 #[test]
702 fn test_args_remote_store_header_requires_delimiter() {
703 let err = TestArgs::try_parse_from([
704 "cmd",
705 "--remote-store-gcs",
706 "bucket",
707 "--remote-store-header",
708 "x-goog-user-project",
709 ])
710 .unwrap_err();
711
712 assert_eq!(err.kind(), ErrorKind::ValueValidation);
713 }
714
715 #[test]
716 fn test_args_remote_store_header_rejects_invalid_name() {
717 let err = TestArgs::try_parse_from([
718 "cmd",
719 "--remote-store-gcs",
720 "bucket",
721 "--remote-store-header",
722 "bad name:value",
723 ])
724 .unwrap_err();
725
726 assert_eq!(err.kind(), ErrorKind::ValueValidation);
727 }
728
729 #[test]
730 fn test_args_remote_store_header_rejects_invalid_value() {
731 let err = TestArgs::try_parse_from([
732 "cmd",
733 "--remote-store-gcs",
734 "bucket",
735 "--remote-store-header",
736 "x-test:bad\nvalue",
737 ])
738 .unwrap_err();
739
740 assert_eq!(err.kind(), ErrorKind::ValueValidation);
741 }
742
743 #[tokio::test]
744 async fn test_checkpoint_checkpoint_success() {
745 let (client, mock) = setup_test();
746
747 mock.checkpoints.insert(1, test_checkpoint_with_data(1));
748
749 let result = client.checkpoint(1).await.unwrap();
750 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
751 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
752 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
753 assert_eq!(client.metrics.total_ingested_transactions.get(), 1);
754 assert_eq!(client.metrics.total_ingested_events.get(), 1);
755 assert_eq!(client.metrics.total_ingested_objects.get(), 3);
757 }
758
759 #[tokio::test]
760 async fn test_checkpoint_not_found() {
761 let (client, _) = setup_test();
762
763 let result = client.checkpoint(1).await;
765 assert!(matches!(result, Err(IE::NotFound(1))));
766 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 0);
767 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
768 assert_eq!(client.metrics.total_ingested_events.get(), 0);
769 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
770 }
771
772 #[tokio::test]
773 async fn test_checkpoint_fetch_error_with_retry() {
774 let (client, mock) = setup_test();
775
776 mock.checkpoints.insert(1, test_checkpoint(1));
777 mock.fetch_failures.insert(1, 2);
778
779 let result = client.checkpoint(1).await.unwrap();
781 assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
782 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
783
784 let retries = client
786 .metrics
787 .total_ingested_transient_retries
788 .with_label_values(&["fetch"])
789 .get();
790 assert_eq!(retries, 2);
791 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
792 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
793 assert_eq!(client.metrics.total_ingested_events.get(), 0);
794 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
795 }
796
797 #[tokio::test]
798 async fn test_checkpoint_decode_error_with_retry() {
799 let (client, mock) = setup_test();
800
801 mock.checkpoints.insert(1, test_checkpoint(1));
802 mock.decode_failures.insert(1, 2);
803
804 let result = client.checkpoint(1).await.unwrap();
806 assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
807 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
808
809 let retries = client
811 .metrics
812 .total_ingested_transient_retries
813 .with_label_values(&["deserialization"])
814 .get();
815 assert_eq!(retries, 2);
816 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
817 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
818 assert_eq!(client.metrics.total_ingested_events.get(), 0);
819 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
820 }
821
822 #[tokio::test]
823 async fn test_wait_for_checkpoint_with_retry() {
824 let (client, mock) = setup_test();
825
826 mock.checkpoints.insert(1, test_checkpoint(1));
827 mock.not_found_failures.insert(1, 1);
828
829 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
831 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
832 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
833
834 let retries = client.metrics.total_ingested_not_found_retries.get();
836 assert_eq!(retries, 1);
837 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
838 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
839 assert_eq!(client.metrics.total_ingested_events.get(), 0);
840 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
841 }
842
843 #[tokio::test]
844 async fn test_wait_for_checkpoint_instant() {
845 let (client, mock) = setup_test();
846
847 mock.checkpoints.insert(1, test_checkpoint(1));
848
849 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
850 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
851 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
852 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
853 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
854 assert_eq!(client.metrics.total_ingested_events.get(), 0);
855 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
856 }
857}