sui_data_ingestion_core/
util.rs1use anyhow::Result;
5use object_store::aws::AmazonS3ConfigKey;
6use object_store::gcp::GoogleConfigKey;
7use object_store::path::Path;
8use object_store::{ClientOptions, ObjectStore, RetryConfig};
9use std::str::FromStr;
10use std::time::Duration;
11use sui_types::messages_checkpoint::CheckpointSequenceNumber;
12use url::Url;
13
14pub fn create_remote_store_client(
15 url: String,
16 remote_store_options: Vec<(String, String)>,
17 timeout_secs: u64,
18) -> Result<Box<dyn ObjectStore>> {
19 let retry_config = RetryConfig {
20 max_retries: 0,
21 retry_timeout: Duration::from_secs(timeout_secs + 1),
22 ..Default::default()
23 };
24 let client_options = ClientOptions::new()
25 .with_timeout(Duration::from_secs(timeout_secs))
26 .with_allow_http(true);
27 let url = Url::parse(&url)?;
28 let mut scheme = url.scheme();
29 if url.host_str().unwrap_or_default().starts_with("s3") {
30 scheme = "s3";
31 }
32 match scheme {
33 "http" | "https" => {
34 let http_store = object_store::http::HttpBuilder::new()
35 .with_url(url)
36 .with_client_options(client_options)
37 .with_retry(retry_config)
38 .build()?;
39 Ok(Box::new(http_store))
40 }
41 "gs" => {
42 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
43 .with_url(url.as_str())
44 .with_retry(retry_config)
45 .with_client_options(client_options);
46 for (key, value) in remote_store_options {
47 builder = builder.with_config(GoogleConfigKey::from_str(&key)?, value);
48 }
49 Ok(Box::new(builder.build()?))
50 }
51 "s3" => {
52 let mut builder = object_store::aws::AmazonS3Builder::new()
53 .with_url(url.as_str())
54 .with_retry(retry_config)
55 .with_client_options(client_options);
56 for (key, value) in remote_store_options {
57 builder = builder.with_config(AmazonS3ConfigKey::from_str(&key)?, value);
58 }
59 Ok(Box::new(builder.build()?))
60 }
61 "file" => Ok(Box::new(
62 object_store::local::LocalFileSystem::new_with_prefix(url.path())?,
63 )),
64 _ => Err(anyhow::anyhow!("Unsupported URL scheme: {}", url.scheme())),
65 }
66}
67
68pub async fn end_of_epoch_data(
69 url: String,
70 remote_store_options: Vec<(String, String)>,
71 timeout_secs: u64,
72) -> Result<Vec<CheckpointSequenceNumber>> {
73 let client = create_remote_store_client(url, remote_store_options, timeout_secs)?;
74 let response = client.get(&Path::from("epochs.json")).await?;
75 Ok(serde_json::from_slice(response.bytes().await?.as_ref())?)
76}