sui_indexer_alt_framework/ingestion/
store_client.rs1use std::sync::Arc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use object_store::Error;
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt;
11use object_store::RetryConfig;
12use object_store::path::Path as ObjectPath;
13use prometheus::IntCounter;
14use serde::de::DeserializeOwned;
15use sui_types::digests::ChainIdentifier;
16
17use crate::ingestion::decode;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23pub(super) fn retry_config() -> RetryConfig {
26 RetryConfig {
27 max_retries: 0,
28 ..Default::default()
29 }
30}
31
32pub struct StoreIngestionClient {
33 store: Arc<dyn ObjectStore>,
34 total_ingested_bytes: Option<IntCounter>,
38}
39
40pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";
42
43#[derive(serde::Deserialize, serde::Serialize)]
44pub(crate) struct ObjectStoreWatermark {
45 pub checkpoint_hi_inclusive: u64,
46}
47
48impl StoreIngestionClient {
49 pub fn new(store: Arc<dyn ObjectStore>, total_ingested_bytes: Option<IntCounter>) -> Self {
50 Self {
51 store,
52 total_ingested_bytes,
53 }
54 }
55
56 pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
59 let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
60 Ok(serde_json::from_slice(&bytes)?)
61 }
62
63 pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
65 let bytes = self.checkpoint_bytes(checkpoint).await?;
66 Ok(decode::checkpoint(&bytes)?)
67 }
68
69 async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
70 self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
71 .await
72 }
73
74 async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
75 let result = self.store.get(&path).await?;
76 result.bytes().await
77 }
78
79 async fn watermark_checkpoint_hi_inclusive(&self) -> anyhow::Result<Option<u64>> {
80 let bytes = match self.bytes(ObjectPath::from(WATERMARK_PATH)).await {
81 Ok(bytes) => bytes,
82 Err(Error::NotFound { .. }) => return Ok(None),
83 Err(e) => return Err(e).context(format!("error reading {WATERMARK_PATH}")),
84 };
85 let watermark: ObjectStoreWatermark =
86 serde_json::from_slice(&bytes).context(format!("error parsing {WATERMARK_PATH}"))?;
87 Ok(Some(watermark.checkpoint_hi_inclusive))
88 }
89}
90
91#[async_trait::async_trait]
92impl IngestionClientTrait for StoreIngestionClient {
93 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
94 let checkpoint = self.checkpoint(0).await?;
95 Ok((*checkpoint.summary.digest()).into())
96 }
97
98 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
108 let bytes = self
109 .checkpoint_bytes(checkpoint)
110 .await
111 .map_err(|e| match e {
112 Error::NotFound { .. } => CheckpointError::NotFound,
113 e => CheckpointError::Fetch(e.into()),
114 })?;
115
116 if let Some(counter) = &self.total_ingested_bytes {
117 counter.inc_by(bytes.len() as u64);
118 }
119 decode::checkpoint(&bytes).map_err(CheckpointError::Decode)
120 }
121
122 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
123 self.watermark_checkpoint_hi_inclusive()
124 .await
125 .map(|cp| cp.unwrap_or(0))
126 }
127}
128
129#[cfg(test)]
130pub(crate) mod tests {
131 use axum::http::StatusCode;
132 use object_store::ClientOptions;
133 use object_store::http::HttpBuilder;
134 use std::sync::Mutex;
135 use std::sync::atomic::AtomicUsize;
136 use std::sync::atomic::Ordering;
137 use std::time::Duration;
138 use wiremock::Mock;
139 use wiremock::MockServer;
140 use wiremock::Request;
141 use wiremock::Respond;
142 use wiremock::ResponseTemplate;
143 use wiremock::matchers::method;
144 use wiremock::matchers::path;
145 use wiremock::matchers::path_regex;
146
147 use crate::ingestion::error::Error;
148 use crate::ingestion::ingestion_client::IngestionClient;
149 use crate::ingestion::test_utils::test_checkpoint_data;
150 use crate::metrics::tests::test_ingestion_metrics;
151
152 use super::*;
153
154 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
155 Mock::given(method("GET"))
156 .and(path_regex(r"/\d+\.binpb\.zst"))
157 .respond_with(response)
158 .mount(server)
159 .await;
160 }
161
162 pub(crate) async fn respond_with_chain_id(server: &MockServer) {
164 Mock::given(method("GET"))
165 .and(path("/0.binpb.zst"))
166 .respond_with(status(StatusCode::OK).set_body_bytes(test_checkpoint_data(0)))
167 .with_priority(1)
168 .mount(server)
169 .await;
170 }
171
172 pub(crate) fn expected_chain_id() -> ChainIdentifier {
175 let bytes = test_checkpoint_data(0);
176 let checkpoint = decode::checkpoint(&bytes).unwrap();
177 (*checkpoint.summary.digest()).into()
178 }
179
180 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
181 ResponseTemplate::new(code.as_u16())
182 }
183
184 fn remote_test_client(uri: String) -> IngestionClient {
185 let store = HttpBuilder::new()
186 .with_url(uri)
187 .with_client_options(ClientOptions::default().with_allow_http(true))
188 .build()
189 .map(Arc::new)
190 .unwrap();
191 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
192 }
193
194 async fn test_latest_checkpoint_number(watermark: ResponseTemplate) -> anyhow::Result<u64> {
195 let server = MockServer::start().await;
196
197 Mock::given(method("GET"))
198 .and(path(WATERMARK_PATH))
199 .respond_with(watermark)
200 .mount(&server)
201 .await;
202
203 let store = HttpBuilder::new()
204 .with_url(server.uri())
205 .with_client_options(ClientOptions::default().with_allow_http(true))
206 .build()
207 .map(Arc::new)
208 .unwrap();
209 let client = StoreIngestionClient::new(store, None);
210
211 IngestionClientTrait::latest_checkpoint_number(&client).await
212 }
213
214 #[tokio::test]
215 async fn test_latest_checkpoint_no_watermark() {
216 assert_eq!(
217 test_latest_checkpoint_number(status(StatusCode::NOT_FOUND))
218 .await
219 .unwrap(),
220 0
221 )
222 }
223
224 #[tokio::test]
225 async fn test_latest_checkpoint_corrupt_watermark() {
226 assert!(
227 test_latest_checkpoint_number(status(StatusCode::OK).set_body_string("<"))
228 .await
229 .is_err()
230 )
231 }
232
233 #[tokio::test]
234 async fn test_latest_checkpoint_from_watermark() {
235 let body = serde_json::json!({"checkpoint_hi_inclusive": 1}).to_string();
236 assert_eq!(
237 test_latest_checkpoint_number(status(StatusCode::OK).set_body_string(body))
238 .await
239 .unwrap(),
240 1
241 )
242 }
243
244 #[tokio::test]
245 async fn fail_on_not_found() {
246 let server = MockServer::start().await;
247 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
248
249 let client = remote_test_client(server.uri());
250 let error = client.checkpoint(42).await.unwrap_err();
251
252 assert!(matches!(error, Error::NotFound(42)));
253 }
254
255 #[tokio::test]
258 async fn retry_on_request_error() {
259 let server = MockServer::start().await;
260
261 let times: Mutex<u64> = Mutex::new(0);
262 respond_with(&server, move |r: &Request| {
263 let mut times = times.lock().unwrap();
264 *times += 1;
265 match (*times, r.url.path()) {
266 (1, _) => status(StatusCode::MOVED_PERMANENTLY)
269 .append_header("Location", "/999999.binpb.zst"),
270
271 (_, "/999999.binpb.zst") => {
273 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
274 }
275
276 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
278 }
279 })
280 .await;
281 respond_with_chain_id(&server).await;
282
283 let client = remote_test_client(server.uri());
284 let envelope = client.checkpoint(42).await.unwrap();
285
286 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
287 assert_eq!(envelope.chain_id, expected_chain_id());
288 }
289
290 #[tokio::test]
294 async fn retry_on_transient_server_error() {
295 let server = MockServer::start().await;
296 let times: Mutex<u64> = Mutex::new(0);
297 respond_with(&server, move |_: &Request| {
298 let mut times = times.lock().unwrap();
299 *times += 1;
300 match *times {
301 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
302 2 => status(StatusCode::REQUEST_TIMEOUT),
303 3 => status(StatusCode::TOO_MANY_REQUESTS),
304 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
305 }
306 })
307 .await;
308 respond_with_chain_id(&server).await;
309
310 let client = remote_test_client(server.uri());
311 let envelope = client.checkpoint(42).await.unwrap();
312
313 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
314 assert_eq!(envelope.chain_id, expected_chain_id());
315 }
316
317 #[tokio::test]
320 async fn retry_on_deserialization_error() {
321 let server = MockServer::start().await;
322 let times: Mutex<u64> = Mutex::new(0);
323 respond_with(&server, move |_: &Request| {
324 let mut times = times.lock().unwrap();
325 *times += 1;
326 if *times < 3 {
327 status(StatusCode::OK).set_body_bytes(vec![])
328 } else {
329 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
330 }
331 })
332 .await;
333 respond_with_chain_id(&server).await;
334
335 let client = remote_test_client(server.uri());
336 let envelope = client.checkpoint(42).await.unwrap();
337
338 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
339 assert_eq!(envelope.chain_id, expected_chain_id());
340 }
341
342 #[tokio::test]
345 async fn retry_on_timeout() {
346 let server = MockServer::start().await;
347 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
348 let times_clone = times.clone();
349
350 respond_with(&server, move |_: &Request| {
352 match times_clone.fetch_add(1, Ordering::Relaxed) {
353 0 => {
354 std::thread::sleep(Duration::from_secs(4));
356 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
357 }
358 _ => {
359 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
361 }
362 }
363 })
364 .await;
365 respond_with_chain_id(&server).await;
366
367 let options = ClientOptions::default()
368 .with_allow_http(true)
369 .with_timeout(Duration::from_secs(2));
370 let store = HttpBuilder::new()
371 .with_url(server.uri())
372 .with_client_options(options)
373 .build()
374 .map(Arc::new)
375 .unwrap();
376 let ingestion_client =
377 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
378
379 let envelope = ingestion_client.checkpoint(42).await.unwrap();
381 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
382 assert_eq!(envelope.chain_id, expected_chain_id());
383
384 let final_count = times.load(Ordering::Relaxed);
387 assert_eq!(final_count, 2);
388 }
389}