sui_config/
object_storage_config.rs1use anyhow::{Context, Result, anyhow};
5
6use clap::*;
7use object_store::aws::AmazonS3Builder;
8use object_store::{ClientOptions, DynObjectStore};
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use std::sync::Arc;
13use std::{env, fs};
14use tracing::info;
15
16#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]
18pub enum ObjectStoreType {
19 File,
21 S3,
23 GCS,
25 Azure,
27}
28
29#[derive(Default, Debug, Clone, Deserialize, Serialize, Args)]
30#[serde(rename_all = "kebab-case")]
31pub struct ObjectStoreConfig {
32 #[serde(skip_serializing_if = "Option::is_none")]
34 #[arg(value_enum)]
35 pub object_store: Option<ObjectStoreType>,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 #[arg(long)]
39 pub directory: Option<PathBuf>,
40 #[serde(skip_serializing_if = "Option::is_none")]
43 #[arg(long)]
44 pub bucket: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
48 #[arg(long)]
49 pub aws_access_key_id: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
53 #[arg(long)]
54 pub aws_secret_access_key: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
57 #[arg(long)]
58 pub aws_endpoint: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
62 #[arg(long)]
63 pub aws_region: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 #[arg(long)]
66 pub aws_profile: Option<String>,
67 #[serde(default)]
69 #[arg(long, default_value_t = true)]
70 pub aws_virtual_hosted_style_request: bool,
71 #[serde(default)]
73 #[arg(long, default_value_t = true)]
74 pub aws_allow_http: bool,
75 #[serde(skip_serializing_if = "Option::is_none")]
78 #[arg(long)]
79 pub google_service_account: Option<String>,
80 #[serde(skip_serializing_if = "Option::is_none")]
84 #[arg(long)]
85 pub google_project_id: Option<String>,
86 #[serde(skip_serializing_if = "Option::is_none")]
89 #[arg(long)]
90 pub azure_storage_account: Option<String>,
91 #[serde(skip_serializing_if = "Option::is_none")]
94 #[arg(long)]
95 pub azure_storage_access_key: Option<String>,
96 #[serde(default = "default_object_store_connection_limit")]
97 #[arg(long, default_value_t = 20)]
98 pub object_store_connection_limit: usize,
99 #[serde(default)]
100 #[arg(long, default_value_t = false)]
101 pub no_sign_request: bool,
102}
103
104fn default_object_store_connection_limit() -> usize {
105 20
106}
107
108fn no_timeout_options() -> ClientOptions {
109 ClientOptions::new()
110 .with_timeout_disabled()
111 .with_connect_timeout_disabled()
112 .with_pool_idle_timeout(std::time::Duration::from_secs(300))
113}
114
115impl ObjectStoreConfig {
116 fn new_local_fs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
117 info!(directory=?self.directory, object_store_type="File", "Object Store");
118 if let Some(path) = &self.directory {
119 fs::create_dir_all(path).context(anyhow!(
120 "Failed to create local directory: {}",
121 path.display()
122 ))?;
123 let store = object_store::local::LocalFileSystem::new_with_prefix(path)
124 .context(anyhow!("Failed to create local object store"))?;
125 Ok(Arc::new(store))
126 } else {
127 Err(anyhow!("No directory provided for local fs storage"))
128 }
129 }
130 fn new_s3(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
131 use object_store::limit::LimitStore;
132
133 info!(bucket=?self.bucket, object_store_type="S3", "Object Store");
134
135 let mut builder = AmazonS3Builder::new()
136 .with_client_options(no_timeout_options())
137 .with_imdsv1_fallback();
138
139 if self.aws_virtual_hosted_style_request {
140 builder = builder.with_virtual_hosted_style_request(true);
141 }
142 if self.aws_allow_http {
143 builder = builder.with_allow_http(true);
144 }
145 if let Some(region) = &self.aws_region {
146 builder = builder.with_region(region);
147 }
148 if let Some(bucket) = &self.bucket {
149 builder = builder.with_bucket_name(bucket);
150 }
151
152 if let Some(key_id) = &self.aws_access_key_id {
153 builder = builder.with_access_key_id(key_id);
154 } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_ACCESS_KEY_ID") {
155 builder = builder.with_access_key_id(secret);
156 } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_ACCESS_KEY_ID") {
157 builder = builder.with_access_key_id(secret);
158 } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_ACCESS_KEY_ID") {
159 builder = builder.with_access_key_id(secret);
160 }
161
162 if let Some(secret) = &self.aws_secret_access_key {
163 builder = builder.with_secret_access_key(secret);
164 } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_SECRET_ACCESS_KEY") {
165 builder = builder.with_secret_access_key(secret);
166 } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_SECRET_ACCESS_KEY") {
167 builder = builder.with_secret_access_key(secret);
168 } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_SECRET_ACCESS_KEY") {
169 builder = builder.with_secret_access_key(secret);
170 }
171
172 if let Some(endpoint) = &self.aws_endpoint {
173 builder = builder.with_endpoint(endpoint);
174 }
175 Ok(Arc::new(LimitStore::new(
176 builder.build().context("Invalid s3 config")?,
177 self.object_store_connection_limit,
178 )))
179 }
180 fn new_gcs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
181 use object_store::gcp::GoogleCloudStorageBuilder;
182 use object_store::limit::LimitStore;
183
184 info!(bucket=?self.bucket, object_store_type="GCS", "Object Store");
185
186 let mut builder = GoogleCloudStorageBuilder::new();
187
188 if let Some(bucket) = &self.bucket {
189 builder = builder.with_bucket_name(bucket);
190 }
191 if let Some(account) = &self.google_service_account {
192 builder = builder.with_service_account_path(account);
193 }
194
195 let mut client_options = no_timeout_options();
196 if let Some(google_project_id) = &self.google_project_id {
197 let x_project_header = HeaderName::from_static("x-goog-user-project");
198 let iam_req_header = HeaderName::from_static("userproject");
199
200 let mut headers = HeaderMap::new();
201 headers.insert(x_project_header, HeaderValue::from_str(google_project_id)?);
202 headers.insert(iam_req_header, HeaderValue::from_str(google_project_id)?);
203 client_options = client_options.with_default_headers(headers);
204 }
205 builder = builder.with_client_options(client_options);
206
207 Ok(Arc::new(LimitStore::new(
208 builder.build().context("Invalid gcs config")?,
209 self.object_store_connection_limit,
210 )))
211 }
212 fn new_azure(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
213 use object_store::azure::MicrosoftAzureBuilder;
214 use object_store::limit::LimitStore;
215
216 info!(bucket=?self.bucket, account=?self.azure_storage_account,
217 object_store_type="Azure", "Object Store");
218
219 let mut builder = MicrosoftAzureBuilder::new().with_client_options(no_timeout_options());
220
221 if let Some(bucket) = &self.bucket {
222 builder = builder.with_container_name(bucket);
223 }
224 if let Some(account) = &self.azure_storage_account {
225 builder = builder.with_account(account)
226 }
227 if let Some(key) = &self.azure_storage_access_key {
228 builder = builder.with_access_key(key)
229 }
230
231 Ok(Arc::new(LimitStore::new(
232 builder.build().context("Invalid azure config")?,
233 self.object_store_connection_limit,
234 )))
235 }
236 pub fn make(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
237 match &self.object_store {
238 Some(ObjectStoreType::File) => self.new_local_fs(),
239 Some(ObjectStoreType::S3) => self.new_s3(),
240 Some(ObjectStoreType::GCS) => self.new_gcs(),
241 Some(ObjectStoreType::Azure) => self.new_azure(),
242 _ => Err(anyhow!("At least one storage backend should be provided")),
243 }
244 }
245}