sui_storage/object_store/http/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod gcs;
5mod local;
6mod s3;
7
8use std::sync::Arc;
9
10use crate::object_store::http::gcs::GoogleCloudStorage;
11use crate::object_store::http::local::LocalStorage;
12use crate::object_store::http::s3::AmazonS3;
13use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
14
15use crate::object_store::ObjectStoreGetExt;
16use anyhow::{Context, Result, anyhow};
17use chrono::{DateTime, Utc};
18use futures::{StreamExt, TryStreamExt};
19use object_store::path::Path;
20use object_store::{Error, GetResult, GetResultPayload, ObjectMeta};
21use reqwest::header::{CONTENT_LENGTH, ETAG, HeaderMap, LAST_MODIFIED};
22use reqwest::{Client, Method};
23
24// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
25//
26// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
27// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
28pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
29    .remove(b'-')
30    .remove(b'.')
31    .remove(b'_')
32    .remove(b'~');
33const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
34static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
35
36pub trait HttpDownloaderBuilder {
37    fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>>;
38}
39
40impl HttpDownloaderBuilder for ObjectStoreConfig {
41    fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>> {
42        match self.object_store {
43            Some(ObjectStoreType::File) => {
44                Ok(LocalStorage::new(self.directory.as_ref().unwrap()).map(Arc::new)?)
45            }
46            Some(ObjectStoreType::S3) => {
47                let bucket_endpoint = if let Some(endpoint) = &self.aws_endpoint {
48                    if self.aws_virtual_hosted_style_request {
49                        endpoint.clone()
50                    } else {
51                        let bucket = self.bucket.as_ref().unwrap();
52                        format!("{endpoint}/{bucket}")
53                    }
54                } else {
55                    let bucket = self.bucket.as_ref().unwrap();
56                    let region = self.aws_region.as_ref().unwrap();
57                    if self.aws_virtual_hosted_style_request {
58                        format!("https://{bucket}.s3.{region}.amazonaws.com")
59                    } else {
60                        format!("https://s3.{region}.amazonaws.com/{bucket}")
61                    }
62                };
63                Ok(AmazonS3::new(&bucket_endpoint).map(Arc::new)?)
64            }
65            Some(ObjectStoreType::GCS) => {
66                Ok(GoogleCloudStorage::new(self.bucket.as_ref().unwrap()).map(Arc::new)?)
67            }
68            _ => Err(anyhow!("At least one storage backend should be provided")),
69        }
70    }
71}
72
73async fn get(
74    url: &str,
75    store: &'static str,
76    location: &Path,
77    client: &Client,
78) -> Result<GetResult> {
79    let request = client.request(Method::GET, url);
80    let response = request.send().await.context("failed to get")?;
81    let meta = header_meta(location, response.headers()).context("Failed to get header")?;
82    let stream = response
83        .bytes_stream()
84        .map_err(|source| Error::Generic {
85            store,
86            source: Box::new(source),
87        })
88        .boxed();
89    Ok(GetResult {
90        range: 0..meta.size,
91        payload: GetResultPayload::Stream(stream),
92        meta,
93        attributes: object_store::Attributes::new(),
94    })
95}
96
97fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> {
98    let last_modified = headers
99        .get(LAST_MODIFIED)
100        .context("Missing last modified")?;
101
102    let content_length = headers
103        .get(CONTENT_LENGTH)
104        .context("Missing content length")?;
105
106    let last_modified = last_modified.to_str().context("bad header")?;
107    let last_modified = DateTime::parse_from_rfc2822(last_modified)
108        .context("invalid last modified")?
109        .with_timezone(&Utc);
110
111    let content_length = content_length.to_str().context("bad header")?;
112    let content_length = content_length.parse().context("invalid content length")?;
113
114    let e_tag = headers.get(ETAG).context("missing etag")?;
115    let e_tag = e_tag.to_str().context("bad header")?;
116
117    Ok(ObjectMeta {
118        location: location.clone(),
119        last_modified,
120        size: content_length,
121        e_tag: Some(e_tag.to_string()),
122        version: None,
123    })
124}
125
126#[cfg(test)]
127mod tests {
128    use crate::object_store::http::HttpDownloaderBuilder;
129    use object_store::path::Path;
130    use std::fs;
131    use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
132    use tempfile::TempDir;
133
134    #[tokio::test]
135    pub async fn test_local_download() -> anyhow::Result<()> {
136        let input = TempDir::new()?;
137        let input_path = input.path();
138        let child = input_path.join("child");
139        fs::create_dir(&child)?;
140        let file1 = child.join("file1");
141        fs::write(file1, b"Lorem ipsum")?;
142        let grandchild = child.join("grand_child");
143        fs::create_dir(&grandchild)?;
144        let file2 = grandchild.join("file2");
145        fs::write(file2, b"Lorem ipsum")?;
146
147        let input_store = ObjectStoreConfig {
148            object_store: Some(ObjectStoreType::File),
149            directory: Some(input_path.to_path_buf()),
150            ..Default::default()
151        }
152        .make_http()?;
153
154        let downloaded = input_store.get_bytes(&Path::from("child/file1")).await?;
155        assert_eq!(downloaded.to_vec(), b"Lorem ipsum");
156        Ok(())
157    }
158}