sui_indexer_alt_framework/ingestion/
store_client.rs1use std::sync::Arc;
5
6use bytes::Bytes;
7use object_store::Error as ObjectStoreError;
8use object_store::ObjectStore;
9use object_store::ObjectStoreExt;
10use object_store::RetryConfig;
11use object_store::path::Path as ObjectPath;
12use serde::de::DeserializeOwned;
13use tracing::debug;
14use tracing::error;
15
16use crate::ingestion::decode;
17use crate::ingestion::ingestion_client::CheckpointData;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23pub(super) fn retry_config() -> RetryConfig {
26 RetryConfig {
27 max_retries: 0,
28 ..Default::default()
29 }
30}
31
32pub struct StoreIngestionClient {
33 store: Arc<dyn ObjectStore>,
34}
35
36impl StoreIngestionClient {
37 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
38 Self { store }
39 }
40
41 pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
44 let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
45 Ok(serde_json::from_slice(&bytes)?)
46 }
47
48 pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
50 let bytes = self.checkpoint_bytes(checkpoint).await?;
51 Ok(decode::checkpoint(&bytes)?)
52 }
53
54 async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
55 self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
56 .await
57 }
58
59 async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
60 let result = self.store.get(&path).await?;
61 result.bytes().await
62 }
63}
64
65#[async_trait::async_trait]
66impl IngestionClientTrait for StoreIngestionClient {
67 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
77 match self.checkpoint_bytes(checkpoint).await {
78 Ok(bytes) => Ok(CheckpointData::Raw(bytes)),
79 Err(ObjectStoreError::NotFound { .. }) => {
80 debug!(checkpoint, "Checkpoint not found");
81 Err(CheckpointError::NotFound)
82 }
83 Err(error) => {
84 error!(checkpoint, "Failed to fetch checkpoint: {error}");
85 Err(CheckpointError::Transient {
86 reason: "object_store",
87 error: error.into(),
88 })
89 }
90 }
91 }
92}
93
94#[cfg(test)]
95pub(crate) mod tests {
96 use axum::http::StatusCode;
97 use object_store::ClientOptions;
98 use object_store::http::HttpBuilder;
99 use std::sync::Mutex;
100 use std::sync::atomic::AtomicUsize;
101 use std::sync::atomic::Ordering;
102 use std::time::Duration;
103 use wiremock::Mock;
104 use wiremock::MockServer;
105 use wiremock::Request;
106 use wiremock::Respond;
107 use wiremock::ResponseTemplate;
108 use wiremock::matchers::method;
109 use wiremock::matchers::path_regex;
110
111 use crate::ingestion::error::Error;
112 use crate::ingestion::ingestion_client::IngestionClient;
113 use crate::ingestion::test_utils::test_checkpoint_data;
114 use crate::metrics::tests::test_ingestion_metrics;
115
116 use super::*;
117
118 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
119 Mock::given(method("GET"))
120 .and(path_regex(r"/\d+\.binpb\.zst"))
121 .respond_with(response)
122 .mount(server)
123 .await;
124 }
125
126 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
127 ResponseTemplate::new(code.as_u16())
128 }
129
130 fn remote_test_client(uri: String) -> IngestionClient {
131 let store = HttpBuilder::new()
132 .with_url(uri)
133 .with_client_options(ClientOptions::default().with_allow_http(true))
134 .build()
135 .map(Arc::new)
136 .unwrap();
137 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
138 }
139
140 #[tokio::test]
141 async fn fail_on_not_found() {
142 let server = MockServer::start().await;
143 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
144
145 let client = remote_test_client(server.uri());
146 let error = client.checkpoint(42).await.unwrap_err();
147
148 assert!(matches!(error, Error::NotFound(42)));
149 }
150
151 #[tokio::test]
154 async fn retry_on_request_error() {
155 let server = MockServer::start().await;
156
157 let times: Mutex<u64> = Mutex::new(0);
158 respond_with(&server, move |r: &Request| {
159 let mut times = times.lock().unwrap();
160 *times += 1;
161 match (*times, r.url.path()) {
162 (1, _) => {
165 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.binpb.zst")
166 }
167
168 (_, "/0.binpb.zst") => {
170 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
171 }
172
173 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
175 }
176 })
177 .await;
178
179 let client = remote_test_client(server.uri());
180 let checkpoint = client.checkpoint(42).await.unwrap();
181
182 assert_eq!(42, checkpoint.summary.sequence_number)
183 }
184
185 #[tokio::test]
189 async fn retry_on_transient_server_error() {
190 let server = MockServer::start().await;
191 let times: Mutex<u64> = Mutex::new(0);
192 respond_with(&server, move |_: &Request| {
193 let mut times = times.lock().unwrap();
194 *times += 1;
195 match *times {
196 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
197 2 => status(StatusCode::REQUEST_TIMEOUT),
198 3 => status(StatusCode::TOO_MANY_REQUESTS),
199 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
200 }
201 })
202 .await;
203
204 let client = remote_test_client(server.uri());
205 let checkpoint = client.checkpoint(42).await.unwrap();
206
207 assert_eq!(42, checkpoint.summary.sequence_number)
208 }
209
210 #[tokio::test]
213 async fn retry_on_deserialization_error() {
214 let server = MockServer::start().await;
215 let times: Mutex<u64> = Mutex::new(0);
216 respond_with(&server, move |_: &Request| {
217 let mut times = times.lock().unwrap();
218 *times += 1;
219 if *times < 3 {
220 status(StatusCode::OK).set_body_bytes(vec![])
221 } else {
222 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
223 }
224 })
225 .await;
226
227 let client = remote_test_client(server.uri());
228 let checkpoint = client.checkpoint(42).await.unwrap();
229
230 assert_eq!(42, checkpoint.summary.sequence_number)
231 }
232
233 #[tokio::test]
236 async fn retry_on_timeout() {
237 let server = MockServer::start().await;
238 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
239 let times_clone = times.clone();
240
241 respond_with(&server, move |_: &Request| {
243 match times_clone.fetch_add(1, Ordering::Relaxed) {
244 0 => {
245 std::thread::sleep(std::time::Duration::from_secs(4));
247 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
248 }
249 _ => {
250 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
252 }
253 }
254 })
255 .await;
256
257 let options = ClientOptions::default()
258 .with_allow_http(true)
259 .with_timeout(Duration::from_secs(2));
260 let store = HttpBuilder::new()
261 .with_url(server.uri())
262 .with_client_options(options)
263 .build()
264 .map(Arc::new)
265 .unwrap();
266 let ingestion_client =
267 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
268
269 let checkpoint = ingestion_client.checkpoint(42).await.unwrap();
271 assert_eq!(42, checkpoint.summary.sequence_number);
272
273 let final_count = times.load(Ordering::Relaxed);
275 assert_eq!(final_count, 2);
276 }
277}