sui_config/
object_storage_config.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::{Context, Result, anyhow};
5
6use clap::*;
7use object_store::aws::AmazonS3Builder;
8use object_store::{ClientOptions, DynObjectStore};
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::{env, fs};
14use tracing::info;
15
16/// Object-store type.
17#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]
18pub enum ObjectStoreType {
19    /// Local file system
20    File,
21    /// AWS S3
22    S3,
23    /// Google Cloud Store
24    GCS,
25    /// Azure Blob Store
26    Azure,
27}
28
29#[derive(Default, Debug, Clone, Deserialize, Serialize, Args)]
30#[serde(rename_all = "kebab-case")]
31pub struct ObjectStoreConfig {
32    /// Which object storage to use. If not specified, defaults to local file system.
33    #[serde(skip_serializing_if = "Option::is_none")]
34    #[arg(value_enum)]
35    pub object_store: Option<ObjectStoreType>,
36    /// Path of the local directory. Only relevant is `--object-store` is File
37    #[serde(skip_serializing_if = "Option::is_none")]
38    #[arg(long)]
39    pub directory: Option<PathBuf>,
40    /// Name of the bucket to use for the object store. Must also set
41    /// `--object-store` to a cloud object storage to have any effect.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    #[arg(long)]
44    pub bucket: Option<String>,
45    /// When using Amazon S3 as the object store, set this to an access key that
46    /// has permission to read from and write to the specified S3 bucket.
47    #[serde(skip_serializing_if = "Option::is_none")]
48    #[arg(long)]
49    pub aws_access_key_id: Option<String>,
50    /// When using Amazon S3 as the object store, set this to the secret access
51    /// key that goes with the specified access key ID.
52    #[serde(skip_serializing_if = "Option::is_none")]
53    #[arg(long)]
54    pub aws_secret_access_key: Option<String>,
55    /// When using Amazon S3 as the object store, set this to bucket endpoint
56    #[serde(skip_serializing_if = "Option::is_none")]
57    #[arg(long)]
58    pub aws_endpoint: Option<String>,
59    /// When using Amazon S3 as the object store, set this to the region
60    /// that goes with the specified bucket
61    #[serde(skip_serializing_if = "Option::is_none")]
62    #[arg(long)]
63    pub aws_region: Option<String>,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    #[arg(long)]
66    pub aws_profile: Option<String>,
67    /// Enable virtual hosted style requests
68    #[serde(default)]
69    #[arg(long, default_value_t = true)]
70    pub aws_virtual_hosted_style_request: bool,
71    /// Allow unencrypted HTTP connection to AWS.
72    #[serde(default)]
73    #[arg(long, default_value_t = true)]
74    pub aws_allow_http: bool,
75    /// When using Google Cloud Storage as the object store, set this to the
76    /// path to the JSON file that contains the Google credentials.
77    #[serde(skip_serializing_if = "Option::is_none")]
78    #[arg(long)]
79    pub google_service_account: Option<String>,
80    /// When using Google Cloud Storage as the object store and writing to a
81    /// bucket with Requester Pays enabled, set this to the project_id
82    /// you want to associate the write cost with.
83    #[serde(skip_serializing_if = "Option::is_none")]
84    #[arg(long)]
85    pub google_project_id: Option<String>,
86    /// When using Microsoft Azure as the object store, set this to the
87    /// azure account name
88    #[serde(skip_serializing_if = "Option::is_none")]
89    #[arg(long)]
90    pub azure_storage_account: Option<String>,
91    /// When using Microsoft Azure as the object store, set this to one of the
92    /// keys in storage account settings
93    #[serde(skip_serializing_if = "Option::is_none")]
94    #[arg(long)]
95    pub azure_storage_access_key: Option<String>,
96    #[serde(default = "default_object_store_connection_limit")]
97    #[arg(long, default_value_t = 20)]
98    pub object_store_connection_limit: usize,
99    #[serde(default)]
100    #[arg(long, default_value_t = false)]
101    pub no_sign_request: bool,
102}
103
104fn default_object_store_connection_limit() -> usize {
105    20
106}
107
108fn no_timeout_options() -> ClientOptions {
109    ClientOptions::new()
110        .with_timeout_disabled()
111        .with_connect_timeout_disabled()
112        .with_pool_idle_timeout(std::time::Duration::from_secs(300))
113}
114
115impl ObjectStoreConfig {
116    fn new_local_fs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
117        info!(directory=?self.directory, object_store_type="File", "Object Store");
118        if let Some(path) = &self.directory {
119            fs::create_dir_all(path).context(anyhow!(
120                "Failed to create local directory: {}",
121                path.display()
122            ))?;
123            let store = object_store::local::LocalFileSystem::new_with_prefix(path)
124                .context(anyhow!("Failed to create local object store"))?;
125            Ok(Arc::new(store))
126        } else {
127            Err(anyhow!("No directory provided for local fs storage"))
128        }
129    }
130    fn new_s3(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
131        use object_store::limit::LimitStore;
132
133        info!(bucket=?self.bucket, object_store_type="S3", "Object Store");
134
135        let mut builder = AmazonS3Builder::new()
136            .with_client_options(no_timeout_options())
137            .with_imdsv1_fallback();
138
139        if self.aws_virtual_hosted_style_request {
140            builder = builder.with_virtual_hosted_style_request(true);
141        }
142        if self.aws_allow_http {
143            builder = builder.with_allow_http(true);
144        }
145        if let Some(region) = &self.aws_region {
146            builder = builder.with_region(region);
147        }
148        if let Some(bucket) = &self.bucket {
149            builder = builder.with_bucket_name(bucket);
150        }
151
152        if let Some(key_id) = &self.aws_access_key_id {
153            builder = builder.with_access_key_id(key_id);
154        } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_ACCESS_KEY_ID") {
155            builder = builder.with_access_key_id(secret);
156        } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_ACCESS_KEY_ID") {
157            builder = builder.with_access_key_id(secret);
158        } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_ACCESS_KEY_ID") {
159            builder = builder.with_access_key_id(secret);
160        }
161
162        if let Some(secret) = &self.aws_secret_access_key {
163            builder = builder.with_secret_access_key(secret);
164        } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_SECRET_ACCESS_KEY") {
165            builder = builder.with_secret_access_key(secret);
166        } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_SECRET_ACCESS_KEY") {
167            builder = builder.with_secret_access_key(secret);
168        } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_SECRET_ACCESS_KEY") {
169            builder = builder.with_secret_access_key(secret);
170        }
171
172        if let Some(endpoint) = &self.aws_endpoint {
173            builder = builder.with_endpoint(endpoint);
174        }
175        Ok(Arc::new(LimitStore::new(
176            builder.build().context("Invalid s3 config")?,
177            self.object_store_connection_limit,
178        )))
179    }
180    fn new_gcs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
181        use object_store::gcp::GoogleCloudStorageBuilder;
182        use object_store::limit::LimitStore;
183
184        info!(bucket=?self.bucket, object_store_type="GCS", "Object Store");
185
186        let mut builder = GoogleCloudStorageBuilder::new();
187
188        if let Some(bucket) = &self.bucket {
189            builder = builder.with_bucket_name(bucket);
190        }
191        if let Some(account) = &self.google_service_account {
192            builder = builder.with_service_account_path(account);
193        }
194
195        let mut client_options = no_timeout_options();
196        if let Some(google_project_id) = &self.google_project_id {
197            let x_project_header = HeaderName::from_static("x-goog-user-project");
198            let iam_req_header = HeaderName::from_static("userproject");
199
200            let mut headers = HeaderMap::new();
201            headers.insert(x_project_header, HeaderValue::from_str(google_project_id)?);
202            headers.insert(iam_req_header, HeaderValue::from_str(google_project_id)?);
203            client_options = client_options.with_default_headers(headers);
204        }
205        builder = builder.with_client_options(client_options);
206
207        Ok(Arc::new(LimitStore::new(
208            builder.build().context("Invalid gcs config")?,
209            self.object_store_connection_limit,
210        )))
211    }
212    fn new_azure(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
213        use object_store::azure::MicrosoftAzureBuilder;
214        use object_store::limit::LimitStore;
215
216        info!(bucket=?self.bucket, account=?self.azure_storage_account,
217          object_store_type="Azure", "Object Store");
218
219        let mut builder = MicrosoftAzureBuilder::new().with_client_options(no_timeout_options());
220
221        if let Some(bucket) = &self.bucket {
222            builder = builder.with_container_name(bucket);
223        }
224        if let Some(account) = &self.azure_storage_account {
225            builder = builder.with_account(account)
226        }
227        if let Some(key) = &self.azure_storage_access_key {
228            builder = builder.with_access_key(key)
229        }
230
231        Ok(Arc::new(LimitStore::new(
232            builder.build().context("Invalid azure config")?,
233            self.object_store_connection_limit,
234        )))
235    }
236    pub fn make(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
237        match &self.object_store {
238            Some(ObjectStoreType::File) => self.new_local_fs(),
239            Some(ObjectStoreType::S3) => self.new_s3(),
240            Some(ObjectStoreType::GCS) => self.new_gcs(),
241            Some(ObjectStoreType::Azure) => self.new_azure(),
242            _ => Err(anyhow!("At least one storage backend should be provided")),
243        }
244    }
245}