sui_indexer_alt_framework/ingestion/
store_client.rs1use std::sync::Arc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use object_store::Error;
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt;
11use object_store::RetryConfig;
12use object_store::path::Path as ObjectPath;
13use prometheus::IntCounter;
14use serde::de::DeserializeOwned;
15use sui_types::digests::ChainIdentifier;
16
17use crate::ingestion::decode;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";
25
26pub struct StoreIngestionClient {
27 store: Arc<dyn ObjectStore>,
28 total_ingested_bytes: Option<IntCounter>,
32}
33
34#[derive(serde::Deserialize, serde::Serialize)]
35pub(crate) struct ObjectStoreWatermark {
36 pub checkpoint_hi_inclusive: u64,
37}
38
39impl StoreIngestionClient {
40 pub fn new(store: Arc<dyn ObjectStore>, total_ingested_bytes: Option<IntCounter>) -> Self {
41 Self {
42 store,
43 total_ingested_bytes,
44 }
45 }
46
47 pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
50 let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
51 Ok(serde_json::from_slice(&bytes)?)
52 }
53
54 pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
56 let bytes = self.checkpoint_bytes(checkpoint).await?;
57 let decoded = tokio::task::spawn_blocking(move || decode::checkpoint(&bytes))
60 .await
61 .context("decode task panicked")??;
62 Ok(decoded)
63 }
64
65 async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
66 self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
67 .await
68 }
69
70 async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
71 let result = self.store.get(&path).await?;
72 result.bytes().await
73 }
74
75 async fn watermark_checkpoint_hi_inclusive(&self) -> anyhow::Result<Option<u64>> {
76 let bytes = match self.bytes(ObjectPath::from(WATERMARK_PATH)).await {
77 Ok(bytes) => bytes,
78 Err(Error::NotFound { .. }) => return Ok(None),
79 Err(e) => return Err(e).context(format!("error reading {WATERMARK_PATH}")),
80 };
81
82 let watermark: ObjectStoreWatermark =
83 serde_json::from_slice(&bytes).context(format!("error parsing {WATERMARK_PATH}"))?;
84
85 Ok(Some(watermark.checkpoint_hi_inclusive))
86 }
87}
88
89#[async_trait::async_trait]
90impl IngestionClientTrait for StoreIngestionClient {
91 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
92 let checkpoint = self.checkpoint(0).await?;
93 Ok((*checkpoint.summary.digest()).into())
94 }
95
96 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
106 let bytes = self
107 .checkpoint_bytes(checkpoint)
108 .await
109 .map_err(|e| match e {
110 Error::NotFound { .. } => CheckpointError::NotFound,
111 e => CheckpointError::Fetch(e.into()),
112 })?;
113
114 if let Some(counter) = &self.total_ingested_bytes {
115 counter.inc_by(bytes.len() as u64);
116 }
117 tokio::task::spawn_blocking(move || decode::checkpoint(&bytes))
120 .await
121 .map_err(|e| CheckpointError::Fetch(anyhow::anyhow!("decode task panicked: {e}")))?
122 .map_err(CheckpointError::Decode)
123 }
124
125 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
126 self.watermark_checkpoint_hi_inclusive()
127 .await
128 .map(|cp| cp.unwrap_or(0))
129 }
130}
131
132pub(super) fn retry_config() -> RetryConfig {
135 RetryConfig {
136 max_retries: 0,
137 ..Default::default()
138 }
139}
140
141#[cfg(test)]
142pub(crate) mod tests {
143 use axum::http::StatusCode;
144 use object_store::ClientOptions;
145 use object_store::http::HttpBuilder;
146 use std::sync::Mutex;
147 use std::sync::atomic::AtomicUsize;
148 use std::sync::atomic::Ordering;
149 use std::time::Duration;
150 use wiremock::Mock;
151 use wiremock::MockServer;
152 use wiremock::Request;
153 use wiremock::Respond;
154 use wiremock::ResponseTemplate;
155 use wiremock::matchers::method;
156 use wiremock::matchers::path;
157 use wiremock::matchers::path_regex;
158
159 use crate::ingestion::error::Error;
160 use crate::ingestion::ingestion_client::IngestionClient;
161 use crate::ingestion::test_utils::test_checkpoint_data;
162 use crate::metrics::tests::test_ingestion_metrics;
163
164 use super::*;
165
166 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
167 Mock::given(method("GET"))
168 .and(path_regex(r"/\d+\.binpb\.zst"))
169 .respond_with(response)
170 .mount(server)
171 .await;
172 }
173
174 pub(crate) async fn respond_with_chain_id(server: &MockServer) {
176 Mock::given(method("GET"))
177 .and(path("/0.binpb.zst"))
178 .respond_with(status(StatusCode::OK).set_body_bytes(test_checkpoint_data(0)))
179 .with_priority(1)
180 .mount(server)
181 .await;
182 }
183
184 pub(crate) fn expected_chain_id() -> ChainIdentifier {
187 let bytes = test_checkpoint_data(0);
188 let checkpoint = decode::checkpoint(&bytes).unwrap();
189 (*checkpoint.summary.digest()).into()
190 }
191
192 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
193 ResponseTemplate::new(code.as_u16())
194 }
195
196 fn remote_test_client(uri: String) -> IngestionClient {
197 let store = HttpBuilder::new()
198 .with_url(uri)
199 .with_client_options(ClientOptions::default().with_allow_http(true))
200 .build()
201 .map(Arc::new)
202 .unwrap();
203 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
204 }
205
206 async fn test_latest_checkpoint_number(watermark: ResponseTemplate) -> anyhow::Result<u64> {
207 let server = MockServer::start().await;
208
209 Mock::given(method("GET"))
210 .and(path(WATERMARK_PATH))
211 .respond_with(watermark)
212 .mount(&server)
213 .await;
214
215 let store = HttpBuilder::new()
216 .with_url(server.uri())
217 .with_client_options(ClientOptions::default().with_allow_http(true))
218 .build()
219 .map(Arc::new)
220 .unwrap();
221 let client = StoreIngestionClient::new(store, None);
222
223 IngestionClientTrait::latest_checkpoint_number(&client).await
224 }
225
226 #[tokio::test]
227 async fn test_latest_checkpoint_no_watermark() {
228 assert_eq!(
229 test_latest_checkpoint_number(status(StatusCode::NOT_FOUND))
230 .await
231 .unwrap(),
232 0
233 )
234 }
235
236 #[tokio::test]
237 async fn test_latest_checkpoint_corrupt_watermark() {
238 assert!(
239 test_latest_checkpoint_number(status(StatusCode::OK).set_body_string("<"))
240 .await
241 .is_err()
242 )
243 }
244
245 #[tokio::test]
246 async fn test_latest_checkpoint_from_watermark() {
247 let body = serde_json::json!({"checkpoint_hi_inclusive": 1}).to_string();
248 assert_eq!(
249 test_latest_checkpoint_number(status(StatusCode::OK).set_body_string(body))
250 .await
251 .unwrap(),
252 1
253 )
254 }
255
256 #[tokio::test]
257 async fn fail_on_not_found() {
258 let server = MockServer::start().await;
259 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
260
261 let client = remote_test_client(server.uri());
262 let error = client.checkpoint(42).await.unwrap_err();
263
264 assert!(matches!(error, Error::NotFound(42)));
265 }
266
267 #[tokio::test]
270 async fn retry_on_request_error() {
271 let server = MockServer::start().await;
272
273 let times: Mutex<u64> = Mutex::new(0);
274 respond_with(&server, move |r: &Request| {
275 let mut times = times.lock().unwrap();
276 *times += 1;
277 match (*times, r.url.path()) {
278 (1, _) => status(StatusCode::MOVED_PERMANENTLY)
281 .append_header("Location", "/999999.binpb.zst"),
282
283 (_, "/999999.binpb.zst") => {
285 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
286 }
287
288 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
290 }
291 })
292 .await;
293 respond_with_chain_id(&server).await;
294
295 let client = remote_test_client(server.uri());
296 let envelope = client.checkpoint(42).await.unwrap();
297
298 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
299 assert_eq!(envelope.chain_id, expected_chain_id());
300 }
301
302 #[tokio::test]
306 async fn retry_on_transient_server_error() {
307 let server = MockServer::start().await;
308 let times: Mutex<u64> = Mutex::new(0);
309 respond_with(&server, move |_: &Request| {
310 let mut times = times.lock().unwrap();
311 *times += 1;
312 match *times {
313 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
314 2 => status(StatusCode::REQUEST_TIMEOUT),
315 3 => status(StatusCode::TOO_MANY_REQUESTS),
316 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
317 }
318 })
319 .await;
320 respond_with_chain_id(&server).await;
321
322 let client = remote_test_client(server.uri());
323 let envelope = client.checkpoint(42).await.unwrap();
324
325 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
326 assert_eq!(envelope.chain_id, expected_chain_id());
327 }
328
329 #[tokio::test]
332 async fn retry_on_deserialization_error() {
333 let server = MockServer::start().await;
334 let times: Mutex<u64> = Mutex::new(0);
335 respond_with(&server, move |_: &Request| {
336 let mut times = times.lock().unwrap();
337 *times += 1;
338 if *times < 3 {
339 status(StatusCode::OK).set_body_bytes(vec![])
340 } else {
341 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
342 }
343 })
344 .await;
345 respond_with_chain_id(&server).await;
346
347 let client = remote_test_client(server.uri());
348 let envelope = client.checkpoint(42).await.unwrap();
349
350 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
351 assert_eq!(envelope.chain_id, expected_chain_id());
352 }
353
354 #[tokio::test]
357 async fn retry_on_timeout() {
358 let server = MockServer::start().await;
359 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
360 let times_clone = times.clone();
361
362 respond_with(&server, move |_: &Request| {
364 match times_clone.fetch_add(1, Ordering::Relaxed) {
365 0 => {
366 std::thread::sleep(Duration::from_secs(4));
368 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
369 }
370 _ => {
371 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
373 }
374 }
375 })
376 .await;
377 respond_with_chain_id(&server).await;
378
379 let options = ClientOptions::default()
380 .with_allow_http(true)
381 .with_timeout(Duration::from_secs(2));
382 let store = HttpBuilder::new()
383 .with_url(server.uri())
384 .with_client_options(options)
385 .build()
386 .map(Arc::new)
387 .unwrap();
388 let ingestion_client =
389 IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
390
391 let envelope = ingestion_client.checkpoint(42).await.unwrap();
393 assert_eq!(42, envelope.checkpoint.summary.sequence_number);
394 assert_eq!(envelope.chain_id, expected_chain_id());
395
396 let final_count = times.load(Ordering::Relaxed);
399 assert_eq!(final_count, 2);
400 }
401}