sui_storage/object_store/
mod.rs1use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use bytes::Bytes;
7use futures::stream::BoxStream;
8use object_store::path::Path;
9use object_store::{DynObjectStore, ObjectMeta, ObjectStore};
10use std::sync::Arc;
11
12pub mod http;
13pub mod util;
14
15#[async_trait]
16pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
17 async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
19}
20
21macro_rules! as_ref_get_ext_impl {
22 ($type:ty) => {
23 #[async_trait]
24 impl ObjectStoreGetExt for $type {
25 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
26 self.as_ref().get_bytes(src).await
27 }
28 }
29 };
30}
31
32as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
33as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
34
35macro_rules! as_ref_get_impl {
36 ($type:ty) => {
37 #[async_trait]
38 impl ObjectStoreGetExt for $type {
39 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
40 self.get(src)
41 .await
42 .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
43 .bytes()
44 .await
45 .map_err(|e| {
46 anyhow!(
47 "Failed to collect GET result for file {} into bytes with error: {:?}",
48 src,
49 e
50 )
51 })
52 }
53 }
54 };
55}
56
57as_ref_get_impl!(Arc<dyn ObjectStore>);
58as_ref_get_impl!(Box<dyn ObjectStore>);
59
60#[async_trait]
61pub trait ObjectStoreListExt: Send + Sync + 'static {
62 async fn list_objects(
64 &self,
65 src: Option<&Path>,
66 ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
67}
68
69macro_rules! as_ref_list_ext_impl {
70 ($type:ty) => {
71 #[async_trait]
72 impl ObjectStoreListExt for $type {
73 async fn list_objects(
74 &self,
75 src: Option<&Path>,
76 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
77 self.as_ref().list_objects(src).await
78 }
79 }
80 };
81}
82
83as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
84as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
85
86#[async_trait]
87impl ObjectStoreListExt for Arc<DynObjectStore> {
88 async fn list_objects(
89 &self,
90 src: Option<&Path>,
91 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
92 self.list(src)
93 }
94}
95
96#[async_trait]
97pub trait ObjectStorePutExt: Send + Sync + 'static {
98 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
100}
101
102macro_rules! as_ref_put_ext_impl {
103 ($type:ty) => {
104 #[async_trait]
105 impl ObjectStorePutExt for $type {
106 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
107 self.as_ref().put_bytes(src, bytes).await
108 }
109 }
110 };
111}
112
113as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
114as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
115
116#[async_trait]
117impl ObjectStorePutExt for Arc<DynObjectStore> {
118 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
119 self.put(src, bytes.into()).await?;
120 Ok(())
121 }
122}
123
124#[async_trait]
125pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
126 async fn delete_object(&self, src: &Path) -> Result<()>;
128}
129
130macro_rules! as_ref_delete_ext_impl {
131 ($type:ty) => {
132 #[async_trait]
133 impl ObjectStoreDeleteExt for $type {
134 async fn delete_object(&self, src: &Path) -> Result<()> {
135 self.as_ref().delete_object(src).await
136 }
137 }
138 };
139}
140
141as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
142as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
143
144#[async_trait]
145
146impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
147 async fn delete_object(&self, src: &Path) -> Result<()> {
148 self.delete(src).await?;
149 Ok(())
150 }
151}