sui_indexer_alt_framework/ingestion/
store_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use object_store::Error;
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt;
11use object_store::RetryConfig;
12use object_store::path::Path as ObjectPath;
13use prometheus::IntCounter;
14use serde::de::DeserializeOwned;
15use sui_types::digests::ChainIdentifier;
16
17use crate::ingestion::decode;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23/// Disable object_store's internal retries so that transient errors (429s, 5xx) propagate
24/// immediately to the framework's own retry logic.
25pub(super) fn retry_config() -> RetryConfig {
26    RetryConfig {
27        max_retries: 0,
28        ..Default::default()
29    }
30}
31
32pub struct StoreIngestionClient {
33    store: Arc<dyn ObjectStore>,
34    /// Counter incremented (in the [`IngestionClientTrait`] impl) by the size in bytes of each
35    /// fetched checkpoint payload. `None` for callers that only use this client for one-shot
36    /// metadata fetches (e.g. `end_of_epoch_checkpoints`) and don't need a metric.
37    total_ingested_bytes: Option<IntCounter>,
38}
39
40// from sui-indexer-alt-object-store
41pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";
42
43#[derive(serde::Deserialize, serde::Serialize)]
44pub(crate) struct ObjectStoreWatermark {
45    pub checkpoint_hi_inclusive: u64,
46}
47
48impl StoreIngestionClient {
49    pub fn new(store: Arc<dyn ObjectStore>, total_ingested_bytes: Option<IntCounter>) -> Self {
50        Self {
51            store,
52            total_ingested_bytes,
53        }
54    }
55
56    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
57    /// The response is a JSON-encoded array of checkpoint sequence numbers.
58    pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
59        let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
60        Ok(serde_json::from_slice(&bytes)?)
61    }
62
63    /// Fetch and decode checkpoint data by sequence number.
64    pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
65        let bytes = self.checkpoint_bytes(checkpoint).await?;
66        Ok(decode::checkpoint(&bytes)?)
67    }
68
69    async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
70        self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
71            .await
72    }
73
74    async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
75        let result = self.store.get(&path).await?;
76        result.bytes().await
77    }
78
79    async fn watermark_checkpoint_hi_inclusive(&self) -> anyhow::Result<Option<u64>> {
80        let bytes = match self.bytes(ObjectPath::from(WATERMARK_PATH)).await {
81            Ok(bytes) => bytes,
82            Err(Error::NotFound { .. }) => return Ok(None),
83            Err(e) => return Err(e).context(format!("error reading {WATERMARK_PATH}")),
84        };
85        let watermark: ObjectStoreWatermark =
86            serde_json::from_slice(&bytes).context(format!("error parsing {WATERMARK_PATH}"))?;
87        Ok(Some(watermark.checkpoint_hi_inclusive))
88    }
89}
90
91#[async_trait::async_trait]
92impl IngestionClientTrait for StoreIngestionClient {
93    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
94        let checkpoint = self.checkpoint(0).await?;
95        Ok((*checkpoint.summary.digest()).into())
96    }
97
98    /// Fetch a checkpoint from the remote store.
99    ///
100    /// Transient errors include:
101    ///
102    /// - failures to issue a request, (network errors, redirect issues, etc)
103    /// - request timeouts,
104    /// - rate limiting,
105    /// - server errors (5xx),
106    /// - issues getting a full response.
107    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
108        let bytes = self
109            .checkpoint_bytes(checkpoint)
110            .await
111            .map_err(|e| match e {
112                Error::NotFound { .. } => CheckpointError::NotFound,
113                e => CheckpointError::Fetch(e.into()),
114            })?;
115
116        if let Some(counter) = &self.total_ingested_bytes {
117            counter.inc_by(bytes.len() as u64);
118        }
119        decode::checkpoint(&bytes).map_err(CheckpointError::Decode)
120    }
121
122    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
123        self.watermark_checkpoint_hi_inclusive()
124            .await
125            .map(|cp| cp.unwrap_or(0))
126    }
127}
128
129#[cfg(test)]
130pub(crate) mod tests {
131    use axum::http::StatusCode;
132    use object_store::ClientOptions;
133    use object_store::http::HttpBuilder;
134    use std::sync::Mutex;
135    use std::sync::atomic::AtomicUsize;
136    use std::sync::atomic::Ordering;
137    use std::time::Duration;
138    use wiremock::Mock;
139    use wiremock::MockServer;
140    use wiremock::Request;
141    use wiremock::Respond;
142    use wiremock::ResponseTemplate;
143    use wiremock::matchers::method;
144    use wiremock::matchers::path;
145    use wiremock::matchers::path_regex;
146
147    use crate::ingestion::error::Error;
148    use crate::ingestion::ingestion_client::IngestionClient;
149    use crate::ingestion::test_utils::test_checkpoint_data;
150    use crate::metrics::tests::test_ingestion_metrics;
151
152    use super::*;
153
154    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
155        Mock::given(method("GET"))
156            .and(path_regex(r"/\d+\.binpb\.zst"))
157            .respond_with(response)
158            .mount(server)
159            .await;
160    }
161
162    /// Mount a high-priority mock for checkpoint 0 used by `StoreIngestionClient::chain_id()`.
163    pub(crate) async fn respond_with_chain_id(server: &MockServer) {
164        Mock::given(method("GET"))
165            .and(path("/0.binpb.zst"))
166            .respond_with(status(StatusCode::OK).set_body_bytes(test_checkpoint_data(0)))
167            .with_priority(1)
168            .mount(server)
169            .await;
170    }
171
172    /// Returns the expected chain_id produced by `StoreIngestionClient::chain_id()` when
173    /// `respond_with_chain_id` is mounted.
174    pub(crate) fn expected_chain_id() -> ChainIdentifier {
175        let bytes = test_checkpoint_data(0);
176        let checkpoint = decode::checkpoint(&bytes).unwrap();
177        (*checkpoint.summary.digest()).into()
178    }
179
180    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
181        ResponseTemplate::new(code.as_u16())
182    }
183
184    fn remote_test_client(uri: String) -> IngestionClient {
185        let store = HttpBuilder::new()
186            .with_url(uri)
187            .with_client_options(ClientOptions::default().with_allow_http(true))
188            .build()
189            .map(Arc::new)
190            .unwrap();
191        IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
192    }
193
194    async fn test_latest_checkpoint_number(watermark: ResponseTemplate) -> anyhow::Result<u64> {
195        let server = MockServer::start().await;
196
197        Mock::given(method("GET"))
198            .and(path(WATERMARK_PATH))
199            .respond_with(watermark)
200            .mount(&server)
201            .await;
202
203        let store = HttpBuilder::new()
204            .with_url(server.uri())
205            .with_client_options(ClientOptions::default().with_allow_http(true))
206            .build()
207            .map(Arc::new)
208            .unwrap();
209        let client = StoreIngestionClient::new(store, None);
210
211        IngestionClientTrait::latest_checkpoint_number(&client).await
212    }
213
214    #[tokio::test]
215    async fn test_latest_checkpoint_no_watermark() {
216        assert_eq!(
217            test_latest_checkpoint_number(status(StatusCode::NOT_FOUND))
218                .await
219                .unwrap(),
220            0
221        )
222    }
223
224    #[tokio::test]
225    async fn test_latest_checkpoint_corrupt_watermark() {
226        assert!(
227            test_latest_checkpoint_number(status(StatusCode::OK).set_body_string("<"))
228                .await
229                .is_err()
230        )
231    }
232
233    #[tokio::test]
234    async fn test_latest_checkpoint_from_watermark() {
235        let body = serde_json::json!({"checkpoint_hi_inclusive": 1}).to_string();
236        assert_eq!(
237            test_latest_checkpoint_number(status(StatusCode::OK).set_body_string(body))
238                .await
239                .unwrap(),
240            1
241        )
242    }
243
244    #[tokio::test]
245    async fn fail_on_not_found() {
246        let server = MockServer::start().await;
247        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
248
249        let client = remote_test_client(server.uri());
250        let error = client.checkpoint(42).await.unwrap_err();
251
252        assert!(matches!(error, Error::NotFound(42)));
253    }
254
255    /// Assume that failures to send the request to the remote store are due to temporary
256    /// connectivity issues, and retry them.
257    #[tokio::test]
258    async fn retry_on_request_error() {
259        let server = MockServer::start().await;
260
261        let times: Mutex<u64> = Mutex::new(0);
262        respond_with(&server, move |r: &Request| {
263            let mut times = times.lock().unwrap();
264            *times += 1;
265            match (*times, r.url.path()) {
266                // The first request will trigger a redirect to 999999.binpb.zst no matter what
267                // the original request was for -- triggering a request error.
268                (1, _) => status(StatusCode::MOVED_PERMANENTLY)
269                    .append_header("Location", "/999999.binpb.zst"),
270
271                // Set-up checkpoint 999999 as an infinite redirect loop.
272                (_, "/999999.binpb.zst") => {
273                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
274                }
275
276                // Subsequently, requests will succeed.
277                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
278            }
279        })
280        .await;
281        respond_with_chain_id(&server).await;
282
283        let client = remote_test_client(server.uri());
284        let envelope = client.checkpoint(42).await.unwrap();
285
286        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
287        assert_eq!(envelope.chain_id, expected_chain_id());
288    }
289
290    /// Assume that certain errors will recover by themselves, and keep retrying with an
291    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
292    /// (rate limiting).
293    #[tokio::test]
294    async fn retry_on_transient_server_error() {
295        let server = MockServer::start().await;
296        let times: Mutex<u64> = Mutex::new(0);
297        respond_with(&server, move |_: &Request| {
298            let mut times = times.lock().unwrap();
299            *times += 1;
300            match *times {
301                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
302                2 => status(StatusCode::REQUEST_TIMEOUT),
303                3 => status(StatusCode::TOO_MANY_REQUESTS),
304                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
305            }
306        })
307        .await;
308        respond_with_chain_id(&server).await;
309
310        let client = remote_test_client(server.uri());
311        let envelope = client.checkpoint(42).await.unwrap();
312
313        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
314        assert_eq!(envelope.chain_id, expected_chain_id());
315    }
316
317    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
318    /// that is fetched should be valid (deserializable as a `Checkpoint`).
319    #[tokio::test]
320    async fn retry_on_deserialization_error() {
321        let server = MockServer::start().await;
322        let times: Mutex<u64> = Mutex::new(0);
323        respond_with(&server, move |_: &Request| {
324            let mut times = times.lock().unwrap();
325            *times += 1;
326            if *times < 3 {
327                status(StatusCode::OK).set_body_bytes(vec![])
328            } else {
329                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
330            }
331        })
332        .await;
333        respond_with_chain_id(&server).await;
334
335        let client = remote_test_client(server.uri());
336        let envelope = client.checkpoint(42).await.unwrap();
337
338        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
339        assert_eq!(envelope.chain_id, expected_chain_id());
340    }
341
342    /// Test that timeout errors are retried as transient errors.
343    /// The first request will timeout, the second will succeed.
344    #[tokio::test]
345    async fn retry_on_timeout() {
346        let server = MockServer::start().await;
347        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
348        let times_clone = times.clone();
349
350        // First request will delay longer than timeout, second will succeed immediately
351        respond_with(&server, move |_: &Request| {
352            match times_clone.fetch_add(1, Ordering::Relaxed) {
353                0 => {
354                    // Delay longer than our test timeout (2 seconds)
355                    std::thread::sleep(Duration::from_secs(4));
356                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
357                }
358                _ => {
359                    // Respond immediately on retry attempts
360                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
361                }
362            }
363        })
364        .await;
365        respond_with_chain_id(&server).await;
366
367        let options = ClientOptions::default()
368            .with_allow_http(true)
369            .with_timeout(Duration::from_secs(2));
370        let store = HttpBuilder::new()
371            .with_url(server.uri())
372            .with_client_options(options)
373            .build()
374            .map(Arc::new)
375            .unwrap();
376        let ingestion_client =
377            IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
378
379        // This should timeout once, then succeed on retry
380        let envelope = ingestion_client.checkpoint(42).await.unwrap();
381        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
382        assert_eq!(envelope.chain_id, expected_chain_id());
383
384        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
385        // The chain_id request for checkpoint 0 is handled by a separate mock.
386        let final_count = times.load(Ordering::Relaxed);
387        assert_eq!(final_count, 2);
388    }
389}