sui_storage/object_store/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{DynObjectStore, ObjectMeta, ObjectStore};
use std::sync::Arc;

pub mod http;
pub mod util;

#[async_trait]
pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
    /// Return the bytes at given path in object store
    async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
}

macro_rules! as_ref_get_ext_impl {
    ($type:ty) => {
        #[async_trait]
        impl ObjectStoreGetExt for $type {
            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
                self.as_ref().get_bytes(src).await
            }
        }
    };
}

as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);

macro_rules! as_ref_get_impl {
    ($type:ty) => {
        #[async_trait]
        impl ObjectStoreGetExt for $type {
            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
                self.get(src)
                    .await
                    .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
                    .bytes()
                    .await
                    .map_err(|e| {
                        anyhow!(
                            "Failed to collect GET result for file {} into bytes with error: {:?}",
                            src,
                            e
                        )
                    })
            }
        }
    };
}

as_ref_get_impl!(Arc<dyn ObjectStore>);
as_ref_get_impl!(Box<dyn ObjectStore>);

#[async_trait]
pub trait ObjectStoreListExt: Send + Sync + 'static {
    /// List the objects at the given path in object store
    async fn list_objects(
        &self,
        src: Option<&Path>,
    ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
}

macro_rules! as_ref_list_ext_impl {
    ($type:ty) => {
        #[async_trait]
        impl ObjectStoreListExt for $type {
            async fn list_objects(
                &self,
                src: Option<&Path>,
            ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
                self.as_ref().list_objects(src).await
            }
        }
    };
}

as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);

#[async_trait]
impl ObjectStoreListExt for Arc<DynObjectStore> {
    async fn list_objects(
        &self,
        src: Option<&Path>,
    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
        self.list(src)
    }
}

#[async_trait]
pub trait ObjectStorePutExt: Send + Sync + 'static {
    /// Write the bytes at the given location in object store
    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
}

macro_rules! as_ref_put_ext_impl {
    ($type:ty) => {
        #[async_trait]
        impl ObjectStorePutExt for $type {
            async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
                self.as_ref().put_bytes(src, bytes).await
            }
        }
    };
}

as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);

#[async_trait]
impl ObjectStorePutExt for Arc<DynObjectStore> {
    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
        self.put(src, bytes.into()).await?;
        Ok(())
    }
}

#[async_trait]
pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
    /// Delete the object at the given location in object store
    async fn delete_object(&self, src: &Path) -> Result<()>;
}

macro_rules! as_ref_delete_ext_impl {
    ($type:ty) => {
        #[async_trait]
        impl ObjectStoreDeleteExt for $type {
            async fn delete_object(&self, src: &Path) -> Result<()> {
                self.as_ref().delete_object(src).await
            }
        }
    };
}

as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);

#[async_trait]

impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
    async fn delete_object(&self, src: &Path) -> Result<()> {
        self.delete(src).await?;
        Ok(())
    }
}