sui_data_ingestion_core/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use object_store::aws::AmazonS3ConfigKey;
6use object_store::gcp::GoogleConfigKey;
7use object_store::path::Path;
8use object_store::{ClientOptions, ObjectStore, RetryConfig};
9use std::str::FromStr;
10use std::time::Duration;
11use sui_types::messages_checkpoint::CheckpointSequenceNumber;
12use url::Url;
13
14pub fn create_remote_store_client(
15    url: String,
16    remote_store_options: Vec<(String, String)>,
17    timeout_secs: u64,
18) -> Result<Box<dyn ObjectStore>> {
19    let retry_config = RetryConfig {
20        max_retries: 0,
21        retry_timeout: Duration::from_secs(timeout_secs + 1),
22        ..Default::default()
23    };
24    let client_options = ClientOptions::new()
25        .with_timeout(Duration::from_secs(timeout_secs))
26        .with_allow_http(true);
27    let url = Url::parse(&url)?;
28    let mut scheme = url.scheme();
29    if url.host_str().unwrap_or_default().starts_with("s3") {
30        scheme = "s3";
31    }
32    match scheme {
33        "http" | "https" => {
34            let http_store = object_store::http::HttpBuilder::new()
35                .with_url(url)
36                .with_client_options(client_options)
37                .with_retry(retry_config)
38                .build()?;
39            Ok(Box::new(http_store))
40        }
41        "gs" => {
42            let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
43                .with_url(url.as_str())
44                .with_retry(retry_config)
45                .with_client_options(client_options);
46            for (key, value) in remote_store_options {
47                builder = builder.with_config(GoogleConfigKey::from_str(&key)?, value);
48            }
49            Ok(Box::new(builder.build()?))
50        }
51        "s3" => {
52            let mut builder = object_store::aws::AmazonS3Builder::new()
53                .with_url(url.as_str())
54                .with_retry(retry_config)
55                .with_client_options(client_options);
56            for (key, value) in remote_store_options {
57                builder = builder.with_config(AmazonS3ConfigKey::from_str(&key)?, value);
58            }
59            Ok(Box::new(builder.build()?))
60        }
61        "file" => Ok(Box::new(
62            object_store::local::LocalFileSystem::new_with_prefix(url.path())?,
63        )),
64        _ => Err(anyhow::anyhow!("Unsupported URL scheme: {}", url.scheme())),
65    }
66}
67
68pub async fn end_of_epoch_data(
69    url: String,
70    remote_store_options: Vec<(String, String)>,
71    timeout_secs: u64,
72) -> Result<Vec<CheckpointSequenceNumber>> {
73    let client = create_remote_store_client(url, remote_store_options, timeout_secs)?;
74    let response = client.get(&Path::from("epochs.json")).await?;
75    Ok(serde_json::from_slice(response.bytes().await?.as_ref())?)
76}