sui_storage/object_store/http/
mod.rs1mod gcs;
5mod local;
6mod s3;
7
8use std::sync::Arc;
9
10use crate::object_store::http::gcs::GoogleCloudStorage;
11use crate::object_store::http::local::LocalStorage;
12use crate::object_store::http::s3::AmazonS3;
13use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
14
15use crate::object_store::ObjectStoreGetExt;
16use anyhow::{Context, Result, anyhow};
17use chrono::{DateTime, Utc};
18use futures::{StreamExt, TryStreamExt};
19use object_store::path::Path;
20use object_store::{Error, GetResult, GetResultPayload, ObjectMeta};
21use reqwest::header::{CONTENT_LENGTH, ETAG, HeaderMap, LAST_MODIFIED};
22use reqwest::{Client, Method};
23
24pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
29 .remove(b'-')
30 .remove(b'.')
31 .remove(b'_')
32 .remove(b'~');
33const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
34static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
35
36pub trait HttpDownloaderBuilder {
37 fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>>;
38}
39
40impl HttpDownloaderBuilder for ObjectStoreConfig {
41 fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>> {
42 match self.object_store {
43 Some(ObjectStoreType::File) => {
44 Ok(LocalStorage::new(self.directory.as_ref().unwrap()).map(Arc::new)?)
45 }
46 Some(ObjectStoreType::S3) => {
47 let bucket_endpoint = if let Some(endpoint) = &self.aws_endpoint {
48 if self.aws_virtual_hosted_style_request {
49 endpoint.clone()
50 } else {
51 let bucket = self.bucket.as_ref().unwrap();
52 format!("{endpoint}/{bucket}")
53 }
54 } else {
55 let bucket = self.bucket.as_ref().unwrap();
56 let region = self.aws_region.as_ref().unwrap();
57 if self.aws_virtual_hosted_style_request {
58 format!("https://{bucket}.s3.{region}.amazonaws.com")
59 } else {
60 format!("https://s3.{region}.amazonaws.com/{bucket}")
61 }
62 };
63 Ok(AmazonS3::new(&bucket_endpoint).map(Arc::new)?)
64 }
65 Some(ObjectStoreType::GCS) => {
66 Ok(GoogleCloudStorage::new(self.bucket.as_ref().unwrap()).map(Arc::new)?)
67 }
68 _ => Err(anyhow!("At least one storage backend should be provided")),
69 }
70 }
71}
72
73async fn get(
74 url: &str,
75 store: &'static str,
76 location: &Path,
77 client: &Client,
78) -> Result<GetResult> {
79 let request = client.request(Method::GET, url);
80 let response = request.send().await.context("failed to get")?;
81 let meta = header_meta(location, response.headers()).context("Failed to get header")?;
82 let stream = response
83 .bytes_stream()
84 .map_err(|source| Error::Generic {
85 store,
86 source: Box::new(source),
87 })
88 .boxed();
89 Ok(GetResult {
90 range: 0..meta.size,
91 payload: GetResultPayload::Stream(stream),
92 meta,
93 attributes: object_store::Attributes::new(),
94 })
95}
96
97fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> {
98 let last_modified = headers
99 .get(LAST_MODIFIED)
100 .context("Missing last modified")?;
101
102 let content_length = headers
103 .get(CONTENT_LENGTH)
104 .context("Missing content length")?;
105
106 let last_modified = last_modified.to_str().context("bad header")?;
107 let last_modified = DateTime::parse_from_rfc2822(last_modified)
108 .context("invalid last modified")?
109 .with_timezone(&Utc);
110
111 let content_length = content_length.to_str().context("bad header")?;
112 let content_length = content_length.parse().context("invalid content length")?;
113
114 let e_tag = headers.get(ETAG).context("missing etag")?;
115 let e_tag = e_tag.to_str().context("bad header")?;
116
117 Ok(ObjectMeta {
118 location: location.clone(),
119 last_modified,
120 size: content_length,
121 e_tag: Some(e_tag.to_string()),
122 version: None,
123 })
124}
125
126#[cfg(test)]
127mod tests {
128 use crate::object_store::http::HttpDownloaderBuilder;
129 use object_store::path::Path;
130 use std::fs;
131 use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
132 use tempfile::TempDir;
133
134 #[tokio::test]
135 pub async fn test_local_download() -> anyhow::Result<()> {
136 let input = TempDir::new()?;
137 let input_path = input.path();
138 let child = input_path.join("child");
139 fs::create_dir(&child)?;
140 let file1 = child.join("file1");
141 fs::write(file1, b"Lorem ipsum")?;
142 let grandchild = child.join("grand_child");
143 fs::create_dir(&grandchild)?;
144 let file2 = grandchild.join("file2");
145 fs::write(file2, b"Lorem ipsum")?;
146
147 let input_store = ObjectStoreConfig {
148 object_store: Some(ObjectStoreType::File),
149 directory: Some(input_path.to_path_buf()),
150 ..Default::default()
151 }
152 .make_http()?;
153
154 let downloaded = input_store.get_bytes(&Path::from("child/file1")).await?;
155 assert_eq!(downloaded.to_vec(), b"Lorem ipsum");
156 Ok(())
157 }
158}