sui_storage/object_store/
mod.rs1use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use bytes::Bytes;
7use futures::stream::BoxStream;
8use object_store::path::Path;
9use object_store::{DynObjectStore, ObjectMeta, ObjectStore};
10use std::sync::Arc;
11
12pub mod http;
13pub mod util;
14
15#[async_trait]
16pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
17    async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
19}
20
21macro_rules! as_ref_get_ext_impl {
22    ($type:ty) => {
23        #[async_trait]
24        impl ObjectStoreGetExt for $type {
25            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
26                self.as_ref().get_bytes(src).await
27            }
28        }
29    };
30}
31
32as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
33as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
34
35macro_rules! as_ref_get_impl {
36    ($type:ty) => {
37        #[async_trait]
38        impl ObjectStoreGetExt for $type {
39            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
40                self.get(src)
41                    .await
42                    .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
43                    .bytes()
44                    .await
45                    .map_err(|e| {
46                        anyhow!(
47                            "Failed to collect GET result for file {} into bytes with error: {:?}",
48                            src,
49                            e
50                        )
51                    })
52            }
53        }
54    };
55}
56
57as_ref_get_impl!(Arc<dyn ObjectStore>);
58as_ref_get_impl!(Box<dyn ObjectStore>);
59
60#[async_trait]
61pub trait ObjectStoreListExt: Send + Sync + 'static {
62    async fn list_objects(
64        &self,
65        src: Option<&Path>,
66    ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
67}
68
69macro_rules! as_ref_list_ext_impl {
70    ($type:ty) => {
71        #[async_trait]
72        impl ObjectStoreListExt for $type {
73            async fn list_objects(
74                &self,
75                src: Option<&Path>,
76            ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
77                self.as_ref().list_objects(src).await
78            }
79        }
80    };
81}
82
83as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
84as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
85
86#[async_trait]
87impl ObjectStoreListExt for Arc<DynObjectStore> {
88    async fn list_objects(
89        &self,
90        src: Option<&Path>,
91    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
92        self.list(src)
93    }
94}
95
96#[async_trait]
97pub trait ObjectStorePutExt: Send + Sync + 'static {
98    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
100}
101
102macro_rules! as_ref_put_ext_impl {
103    ($type:ty) => {
104        #[async_trait]
105        impl ObjectStorePutExt for $type {
106            async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
107                self.as_ref().put_bytes(src, bytes).await
108            }
109        }
110    };
111}
112
113as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
114as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
115
116#[async_trait]
117impl ObjectStorePutExt for Arc<DynObjectStore> {
118    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
119        self.put(src, bytes.into()).await?;
120        Ok(())
121    }
122}
123
124#[async_trait]
125pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
126    async fn delete_object(&self, src: &Path) -> Result<()>;
128}
129
130macro_rules! as_ref_delete_ext_impl {
131    ($type:ty) => {
132        #[async_trait]
133        impl ObjectStoreDeleteExt for $type {
134            async fn delete_object(&self, src: &Path) -> Result<()> {
135                self.as_ref().delete_object(src).await
136            }
137        }
138    };
139}
140
141as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
142as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
143
144#[async_trait]
145
146impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
147    async fn delete_object(&self, src: &Path) -> Result<()> {
148        self.delete(src).await?;
149        Ok(())
150    }
151}