1use std::future::Future;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use backoff::Error as BE;
11use backoff::ExponentialBackoff;
12use backoff::backoff::Constant;
13use clap::ArgGroup;
14use mysten_network::callback::CallbackLayer;
15use object_store::ClientOptions;
16use object_store::ObjectStore;
17use object_store::aws::AmazonS3Builder;
18use object_store::azure::MicrosoftAzureBuilder;
19use object_store::gcp::GoogleCloudStorageBuilder;
20use object_store::http::HttpBuilder;
21use object_store::local::LocalFileSystem;
22use prometheus::Histogram;
23use sui_futures::future::with_slow_future_monitor;
24use sui_rpc::Client;
25use sui_rpc::client::HeadersInterceptor;
26use sui_types::digests::ChainIdentifier;
27use tokio::sync::OnceCell;
28use tracing::debug;
29use tracing::warn;
30use url::Url;
31
32use crate::ingestion::Error as IE;
33use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
34use crate::ingestion::Result as IngestionResult;
35use crate::ingestion::byte_count::ByteCountMakeCallbackHandler;
36use crate::ingestion::decode;
37use crate::ingestion::store_client::StoreIngestionClient;
38use crate::metrics::CheckpointLagMetricReporter;
39use crate::metrics::IngestionMetrics;
40use crate::types::full_checkpoint_content::Checkpoint;
41
42const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
44
45const SLOW_OPERATION_WARNING_THRESHOLD: Duration = Duration::from_secs(60);
51
52#[async_trait]
53pub(crate) trait IngestionClientTrait: Send + Sync {
54 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier>;
55
56 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult;
57
58 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64>;
59}
60
61#[derive(clap::Args, Clone, Debug)]
62#[command(group(ArgGroup::new("source").required(true).multiple(false)))]
63pub struct IngestionClientArgs {
64 #[arg(long, group = "source")]
66 pub remote_store_url: Option<Url>,
67
68 #[arg(long, group = "source")]
71 pub remote_store_s3: Option<String>,
72
73 #[arg(long, group = "source")]
76 pub remote_store_gcs: Option<String>,
77
78 #[arg(long, group = "source")]
81 pub remote_store_azure: Option<String>,
82
83 #[arg(long, group = "source")]
85 pub local_ingestion_path: Option<PathBuf>,
86
87 #[arg(long, group = "source")]
89 pub rpc_api_url: Option<Url>,
90
91 #[arg(long, env, requires = "rpc_api_url")]
93 pub rpc_username: Option<String>,
94
95 #[arg(long, env, requires = "rpc_api_url")]
97 pub rpc_password: Option<String>,
98
99 #[arg(long, default_value_t = Self::default().checkpoint_timeout_ms)]
102 pub checkpoint_timeout_ms: u64,
103
104 #[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
107 pub checkpoint_connection_timeout_ms: u64,
108}
109
110impl Default for IngestionClientArgs {
111 fn default() -> Self {
112 Self {
113 remote_store_url: None,
114 remote_store_s3: None,
115 remote_store_gcs: None,
116 remote_store_azure: None,
117 local_ingestion_path: None,
118 rpc_api_url: None,
119 rpc_username: None,
120 rpc_password: None,
121 checkpoint_timeout_ms: 120_000,
122 checkpoint_connection_timeout_ms: 120_000,
123 }
124 }
125}
126
127impl IngestionClientArgs {
128 fn client_options(&self) -> ClientOptions {
129 let mut options = ClientOptions::default();
130 options = if self.checkpoint_timeout_ms == 0 {
131 options.with_timeout_disabled()
132 } else {
133 let timeout = Duration::from_millis(self.checkpoint_timeout_ms);
134 options.with_timeout(timeout)
135 };
136 options = if self.checkpoint_connection_timeout_ms == 0 {
137 options.with_connect_timeout_disabled()
138 } else {
139 let timeout = Duration::from_millis(self.checkpoint_connection_timeout_ms);
140 options.with_connect_timeout(timeout)
141 };
142 options
143 }
144}
145
146#[derive(thiserror::Error, Debug)]
147pub enum CheckpointError {
148 #[error("Checkpoint not found")]
149 NotFound,
150 #[error("Failed to fetch checkpoint: {0}")]
151 Fetch(#[from] anyhow::Error),
152 #[error("Failed to decode checkpoint: {0}")]
153 Decode(#[from] decode::Error),
154}
155
156pub type CheckpointResult = Result<Checkpoint, CheckpointError>;
157
158#[derive(Clone)]
159pub struct IngestionClient {
160 client: Arc<dyn IngestionClientTrait>,
161 metrics: Arc<IngestionMetrics>,
163 checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
164 chain_id: OnceCell<ChainIdentifier>,
165}
166
167#[derive(Clone, Debug)]
168pub struct CheckpointEnvelope {
169 pub checkpoint: Arc<Checkpoint>,
170 pub chain_id: ChainIdentifier,
171}
172
173impl IngestionClient {
174 pub fn new(args: IngestionClientArgs, metrics: Arc<IngestionMetrics>) -> IngestionResult<Self> {
176 let retry = super::store_client::retry_config();
178 let client = if let Some(url) = args.remote_store_url.as_ref() {
179 let store = HttpBuilder::new()
180 .with_url(url.to_string())
181 .with_client_options(args.client_options().with_allow_http(true))
182 .with_retry(retry)
183 .build()
184 .map(Arc::new)?;
185 IngestionClient::with_store(store, metrics.clone())?
186 } else if let Some(bucket) = args.remote_store_s3.as_ref() {
187 let store = AmazonS3Builder::from_env()
188 .with_client_options(args.client_options())
189 .with_retry(retry)
190 .with_imdsv1_fallback()
191 .with_bucket_name(bucket)
192 .build()
193 .map(Arc::new)?;
194 IngestionClient::with_store(store, metrics.clone())?
195 } else if let Some(bucket) = args.remote_store_gcs.as_ref() {
196 let store = GoogleCloudStorageBuilder::from_env()
197 .with_client_options(args.client_options())
198 .with_retry(retry)
199 .with_bucket_name(bucket)
200 .build()
201 .map(Arc::new)?;
202 IngestionClient::with_store(store, metrics.clone())?
203 } else if let Some(container) = args.remote_store_azure.as_ref() {
204 let store = MicrosoftAzureBuilder::from_env()
205 .with_client_options(args.client_options())
206 .with_retry(retry)
207 .with_container_name(container)
208 .build()
209 .map(Arc::new)?;
210 IngestionClient::with_store(store, metrics.clone())?
211 } else if let Some(path) = args.local_ingestion_path.as_ref() {
212 let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
213 IngestionClient::with_store(store, metrics.clone())?
214 } else if let Some(rpc_api_url) = args.rpc_api_url.as_ref() {
215 IngestionClient::with_grpc(
216 rpc_api_url.clone(),
217 args.rpc_username,
218 args.rpc_password,
219 metrics.clone(),
220 )?
221 } else {
222 panic!(
223 "One of remote_store_url, remote_store_s3, remote_store_gcs, remote_store_azure, \
224 local_ingestion_path or rpc_api_url must be provided"
225 );
226 };
227
228 Ok(client)
229 }
230
231 pub fn with_store(
233 store: Arc<dyn ObjectStore>,
234 metrics: Arc<IngestionMetrics>,
235 ) -> IngestionResult<Self> {
236 let client = Arc::new(StoreIngestionClient::new(
237 store,
238 Some(metrics.total_ingested_bytes.clone()),
239 ));
240 Ok(Self::new_impl(client, metrics))
241 }
242
243 pub fn with_grpc(
245 url: Url,
246 username: Option<String>,
247 password: Option<String>,
248 metrics: Arc<IngestionMetrics>,
249 ) -> IngestionResult<Self> {
250 let byte_count_layer = CallbackLayer::new(ByteCountMakeCallbackHandler::new(
251 metrics.total_ingested_bytes.clone(),
252 ));
253 let client = Client::new(url.to_string())?
254 .with_max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE_BYTES)
255 .request_layer(byte_count_layer);
256 let client = if let Some(username) = username {
257 let mut headers = HeadersInterceptor::new();
258 headers.basic_auth(username, password);
259 client.with_headers(headers)
260 } else {
261 client
262 };
263 Ok(Self::new_impl(Arc::new(client), metrics))
264 }
265
266 pub(crate) fn new_impl(
267 client: Arc<dyn IngestionClientTrait>,
268 metrics: Arc<IngestionMetrics>,
269 ) -> Self {
270 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
271 metrics.ingested_checkpoint_timestamp_lag.clone(),
272 metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
273 metrics.latest_ingested_checkpoint.clone(),
274 );
275 IngestionClient {
276 client,
277 metrics,
278 checkpoint_lag_reporter,
279 chain_id: OnceCell::new(),
280 }
281 }
282
283 pub async fn wait_for(
289 &self,
290 checkpoint: u64,
291 retry_interval: Duration,
292 ) -> IngestionResult<CheckpointEnvelope> {
293 let backoff = Constant::new(retry_interval);
294 let fetch = || async move {
295 use backoff::Error as BE;
296 self.checkpoint(checkpoint).await.map_err(|e| match e {
297 IE::NotFound(checkpoint) => {
298 debug!(checkpoint, "Checkpoint not found, retrying...");
299 self.metrics.total_ingested_not_found_retries.inc();
300 BE::transient(e)
301 }
302 e => BE::permanent(e),
303 })
304 };
305
306 backoff::future::retry(backoff, fetch).await
307 }
308
309 pub async fn checkpoint(&self, cp_sequence_number: u64) -> IngestionResult<CheckpointEnvelope> {
318 let client = self.client.clone();
319 let checkpoint_data_fut = retry_transient_with_slow_monitor(
320 "checkpoint",
321 move || {
322 let client = client.clone();
323 async move {
324 client
325 .checkpoint(cp_sequence_number)
326 .await
327 .map_err(|err| match err {
328 CheckpointError::NotFound => {
331 BE::permanent(IE::NotFound(cp_sequence_number))
332 }
333 CheckpointError::Fetch(e) => self.metrics.inc_retry(
338 cp_sequence_number,
339 "fetch",
340 IE::FetchError(cp_sequence_number, e),
341 ),
342 CheckpointError::Decode(e) => self.metrics.inc_retry(
343 cp_sequence_number,
344 e.reason(),
345 IE::DecodeError(cp_sequence_number, e.into()),
346 ),
347 })
348 }
349 },
350 &self.metrics.ingested_checkpoint_latency,
351 );
352
353 let client = self.client.clone();
354 let chain_id_fut = self.chain_id.get_or_try_init(|| {
355 retry_transient_with_slow_monitor(
356 "chain_id",
357 move || {
358 let client = client.clone();
359 async move {
360 client
361 .chain_id()
362 .await
363 .map_err(|e| BE::transient(IE::ChainIdError(cp_sequence_number, e)))
364 }
365 },
366 &self.metrics.ingested_chain_id_latency,
367 )
368 });
369
370 let (checkpoint, chain_id) = tokio::try_join!(checkpoint_data_fut, chain_id_fut)?;
371
372 self.checkpoint_lag_reporter
373 .report_lag(cp_sequence_number, checkpoint.summary.timestamp_ms);
374
375 self.metrics.total_ingested_checkpoints.inc();
376
377 self.metrics
378 .total_ingested_transactions
379 .inc_by(checkpoint.transactions.len() as u64);
380
381 self.metrics.total_ingested_events.inc_by(
382 checkpoint
383 .transactions
384 .iter()
385 .map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
386 .sum(),
387 );
388
389 self.metrics
390 .total_ingested_objects
391 .inc_by(checkpoint.object_set.len() as u64);
392
393 Ok(CheckpointEnvelope {
394 checkpoint: Arc::new(checkpoint),
395 chain_id: *chain_id,
396 })
397 }
398
399 pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
400 self.client.latest_checkpoint_number().await
401 }
402}
403
404pub(crate) fn transient_backoff() -> ExponentialBackoff {
406 ExponentialBackoff {
407 max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
408 max_elapsed_time: None,
409 ..Default::default()
410 }
411}
412
413pub(crate) async fn retry_transient_with_slow_monitor<F, Fut, T>(
416 operation: &str,
417 make_future: F,
418 latency: &Histogram,
419) -> IngestionResult<T>
420where
421 F: Fn() -> Fut,
422 Fut: Future<Output = Result<T, backoff::Error<IE>>>,
423{
424 let request = || {
425 let fut = make_future();
426 async move {
427 with_slow_future_monitor(fut, SLOW_OPERATION_WARNING_THRESHOLD, || {
428 warn!(
429 operation,
430 threshold_ms = SLOW_OPERATION_WARNING_THRESHOLD.as_millis(),
431 "Slow operation detected"
432 );
433 })
434 .await
435 }
436 };
437
438 let guard = latency.start_timer();
439 let data = backoff::future::retry(transient_backoff(), request).await?;
440 let elapsed = guard.stop_and_record();
441
442 debug!(
443 operation,
444 elapsed_ms = elapsed * 1000.0,
445 "Fetched operation"
446 );
447
448 Ok(data)
449}
450
451#[cfg(test)]
452mod tests {
453 use std::sync::Arc;
454 use std::time::Duration;
455
456 use clap::Parser;
457 use clap::error::ErrorKind;
458 use dashmap::DashMap;
459 use prometheus::Registry;
460 use sui_types::digests::CheckpointDigest;
461 use sui_types::event::Event;
462 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
463
464 use crate::ingestion::decode;
465 use crate::ingestion::test_utils::test_checkpoint_data;
466
467 use super::*;
468
469 fn test_checkpoint(seq: u64) -> Checkpoint {
470 let bytes = test_checkpoint_data(seq);
471 decode::checkpoint(&bytes).unwrap()
472 }
473
474 fn test_checkpoint_with_data(seq: u64) -> Checkpoint {
476 TestCheckpointBuilder::new(seq)
477 .start_transaction(0)
478 .create_owned_object(0)
479 .with_events(vec![Event::random_for_testing()])
480 .finish_transaction()
481 .build_checkpoint()
482 }
483
484 #[derive(Debug, Parser)]
485 struct TestArgs {
486 #[clap(flatten)]
487 ingestion: IngestionClientArgs,
488 }
489
490 #[derive(Default)]
492 struct MockIngestionClient {
493 checkpoints: DashMap<u64, Checkpoint>,
494 not_found_failures: DashMap<u64, usize>,
495 fetch_failures: DashMap<u64, usize>,
496 decode_failures: DashMap<u64, usize>,
497 }
498
499 impl MockIngestionClient {
500 fn mock_chain_id() -> ChainIdentifier {
501 CheckpointDigest::new([1; 32]).into()
502 }
503 }
504
505 #[async_trait]
506 impl IngestionClientTrait for MockIngestionClient {
507 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
508 Ok(Self::mock_chain_id())
509 }
510
511 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
512 if let Some(mut remaining) = self.not_found_failures.get_mut(&checkpoint)
514 && *remaining > 0
515 {
516 *remaining -= 1;
517 return Err(CheckpointError::NotFound);
518 }
519
520 if let Some(mut remaining) = self.fetch_failures.get_mut(&checkpoint)
522 && *remaining > 0
523 {
524 *remaining -= 1;
525 return Err(CheckpointError::Fetch(anyhow::anyhow!("Mock fetch error")));
526 }
527
528 if let Some(mut remaining) = self.decode_failures.get_mut(&checkpoint)
530 && *remaining > 0
531 {
532 *remaining -= 1;
533 return Err(CheckpointError::Decode(decode::Error::Deserialization(
534 prost::DecodeError::new("Mock deserialization error"),
535 )));
536 }
537
538 self.checkpoints
540 .get(&checkpoint)
541 .as_deref()
542 .cloned()
543 .ok_or(CheckpointError::NotFound)
544 }
545
546 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
547 Ok(0)
548 }
549 }
550
551 fn setup_test() -> (IngestionClient, Arc<MockIngestionClient>) {
552 let registry = Registry::new_custom(Some("test".to_string()), None).unwrap();
553 let metrics = IngestionMetrics::new(None, ®istry);
554 let mock_client = Arc::new(MockIngestionClient::default());
555 let client = IngestionClient::new_impl(mock_client.clone(), metrics);
556 (client, mock_client)
557 }
558
559 #[test]
560 fn test_args_multiple_ingestion_sources_are_rejected() {
561 let err = TestArgs::try_parse_from([
562 "cmd",
563 "--remote-store-url",
564 "https://example.com",
565 "--rpc-api-url",
566 "http://localhost:8080",
567 ])
568 .unwrap_err();
569
570 assert_eq!(err.kind(), ErrorKind::ArgumentConflict);
571 }
572
573 #[test]
574 fn test_args_optional_credentials() {
575 let args = TestArgs::try_parse_from([
576 "cmd",
577 "--rpc-api-url",
578 "http://localhost:8080",
579 "--rpc-username",
580 "alice",
581 "--rpc-password",
582 "secret",
583 ])
584 .unwrap();
585
586 assert_eq!(args.ingestion.rpc_username.as_deref(), Some("alice"));
587 assert_eq!(args.ingestion.rpc_password.as_deref(), Some("secret"));
588 assert_eq!(
589 args.ingestion.rpc_api_url,
590 Some(Url::parse("http://localhost:8080").unwrap())
591 );
592 }
593
594 #[test]
595 fn test_args_credentials_require_rpc_url() {
596 let err = TestArgs::try_parse_from([
597 "cmd",
598 "--rpc-username",
599 "alice",
600 "--rpc-password",
601 "secret",
602 ])
603 .unwrap_err();
604
605 assert_eq!(err.kind(), ErrorKind::MissingRequiredArgument);
606 }
607
608 #[tokio::test]
609 async fn test_checkpoint_checkpoint_success() {
610 let (client, mock) = setup_test();
611
612 mock.checkpoints.insert(1, test_checkpoint_with_data(1));
613
614 let result = client.checkpoint(1).await.unwrap();
615 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
616 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
617 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
618 assert_eq!(client.metrics.total_ingested_transactions.get(), 1);
619 assert_eq!(client.metrics.total_ingested_events.get(), 1);
620 assert_eq!(client.metrics.total_ingested_objects.get(), 3);
622 }
623
624 #[tokio::test]
625 async fn test_checkpoint_not_found() {
626 let (client, _) = setup_test();
627
628 let result = client.checkpoint(1).await;
630 assert!(matches!(result, Err(IE::NotFound(1))));
631 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 0);
632 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
633 assert_eq!(client.metrics.total_ingested_events.get(), 0);
634 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
635 }
636
637 #[tokio::test]
638 async fn test_checkpoint_fetch_error_with_retry() {
639 let (client, mock) = setup_test();
640
641 mock.checkpoints.insert(1, test_checkpoint(1));
642 mock.fetch_failures.insert(1, 2);
643
644 let result = client.checkpoint(1).await.unwrap();
646 assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
647 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
648
649 let retries = client
651 .metrics
652 .total_ingested_transient_retries
653 .with_label_values(&["fetch"])
654 .get();
655 assert_eq!(retries, 2);
656 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
657 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
658 assert_eq!(client.metrics.total_ingested_events.get(), 0);
659 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
660 }
661
662 #[tokio::test]
663 async fn test_checkpoint_decode_error_with_retry() {
664 let (client, mock) = setup_test();
665
666 mock.checkpoints.insert(1, test_checkpoint(1));
667 mock.decode_failures.insert(1, 2);
668
669 let result = client.checkpoint(1).await.unwrap();
671 assert_eq!(*result.checkpoint.summary.sequence_number(), 1);
672 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
673
674 let retries = client
676 .metrics
677 .total_ingested_transient_retries
678 .with_label_values(&["deserialization"])
679 .get();
680 assert_eq!(retries, 2);
681 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
682 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
683 assert_eq!(client.metrics.total_ingested_events.get(), 0);
684 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
685 }
686
687 #[tokio::test]
688 async fn test_wait_for_checkpoint_with_retry() {
689 let (client, mock) = setup_test();
690
691 mock.checkpoints.insert(1, test_checkpoint(1));
692 mock.not_found_failures.insert(1, 1);
693
694 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
696 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
697 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
698
699 let retries = client.metrics.total_ingested_not_found_retries.get();
701 assert_eq!(retries, 1);
702 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
703 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
704 assert_eq!(client.metrics.total_ingested_events.get(), 0);
705 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
706 }
707
708 #[tokio::test]
709 async fn test_wait_for_checkpoint_instant() {
710 let (client, mock) = setup_test();
711
712 mock.checkpoints.insert(1, test_checkpoint(1));
713
714 let result = client.wait_for(1, Duration::from_millis(50)).await.unwrap();
715 assert_eq!(result.checkpoint.summary.sequence_number(), &1);
716 assert_eq!(result.chain_id, MockIngestionClient::mock_chain_id());
717 assert_eq!(client.metrics.total_ingested_checkpoints.get(), 1);
718 assert_eq!(client.metrics.total_ingested_transactions.get(), 0);
719 assert_eq!(client.metrics.total_ingested_events.get(), 0);
720 assert_eq!(client.metrics.total_ingested_objects.get(), 0);
721 }
722}