sui_indexer_alt_framework/ingestion/
store_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Context;
7use bytes::Bytes;
8use object_store::Error;
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt;
11use object_store::RetryConfig;
12use object_store::path::Path as ObjectPath;
13use prometheus::IntCounter;
14use serde::de::DeserializeOwned;
15use sui_types::digests::ChainIdentifier;
16
17use crate::ingestion::decode;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23// from sui-indexer-alt-object-store
24pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";
25
26pub struct StoreIngestionClient {
27    store: Arc<dyn ObjectStore>,
28    /// Counter incremented (in the [`IngestionClientTrait`] impl) by the size in bytes of each
29    /// fetched checkpoint payload. `None` for callers that only use this client for one-shot
30    /// metadata fetches (e.g. `end_of_epoch_checkpoints`) and don't need a metric.
31    total_ingested_bytes: Option<IntCounter>,
32}
33
34#[derive(serde::Deserialize, serde::Serialize)]
35pub(crate) struct ObjectStoreWatermark {
36    pub checkpoint_hi_inclusive: u64,
37}
38
39impl StoreIngestionClient {
40    pub fn new(store: Arc<dyn ObjectStore>, total_ingested_bytes: Option<IntCounter>) -> Self {
41        Self {
42            store,
43            total_ingested_bytes,
44        }
45    }
46
47    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
48    /// The response is a JSON-encoded array of checkpoint sequence numbers.
49    pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
50        let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
51        Ok(serde_json::from_slice(&bytes)?)
52    }
53
54    /// Fetch and decode checkpoint data by sequence number.
55    pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
56        let bytes = self.checkpoint_bytes(checkpoint).await?;
57        // zstd decompress + prost decode + proto -> Checkpoint is multi-ms of CPU
58        // work; offload to the blocking pool so it doesn't stall the reactor.
59        let decoded = tokio::task::spawn_blocking(move || decode::checkpoint(&bytes))
60            .await
61            .context("decode task panicked")??;
62        Ok(decoded)
63    }
64
65    async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
66        self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
67            .await
68    }
69
70    async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
71        let result = self.store.get(&path).await?;
72        result.bytes().await
73    }
74
75    async fn watermark_checkpoint_hi_inclusive(&self) -> anyhow::Result<Option<u64>> {
76        let bytes = match self.bytes(ObjectPath::from(WATERMARK_PATH)).await {
77            Ok(bytes) => bytes,
78            Err(Error::NotFound { .. }) => return Ok(None),
79            Err(e) => return Err(e).context(format!("error reading {WATERMARK_PATH}")),
80        };
81
82        let watermark: ObjectStoreWatermark =
83            serde_json::from_slice(&bytes).context(format!("error parsing {WATERMARK_PATH}"))?;
84
85        Ok(Some(watermark.checkpoint_hi_inclusive))
86    }
87}
88
89#[async_trait::async_trait]
90impl IngestionClientTrait for StoreIngestionClient {
91    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
92        let checkpoint = self.checkpoint(0).await?;
93        Ok((*checkpoint.summary.digest()).into())
94    }
95
96    /// Fetch a checkpoint from the remote store.
97    ///
98    /// Transient errors include:
99    ///
100    /// - failures to issue a request, (network errors, redirect issues, etc)
101    /// - request timeouts,
102    /// - rate limiting,
103    /// - server errors (5xx),
104    /// - issues getting a full response.
105    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
106        let bytes = self
107            .checkpoint_bytes(checkpoint)
108            .await
109            .map_err(|e| match e {
110                Error::NotFound { .. } => CheckpointError::NotFound,
111                e => CheckpointError::Fetch(e.into()),
112            })?;
113
114        if let Some(counter) = &self.total_ingested_bytes {
115            counter.inc_by(bytes.len() as u64);
116        }
117        // zstd decompress + prost decode + proto -> Checkpoint is multi-ms of CPU
118        // work; offload to the blocking pool so it doesn't stall the reactor.
119        tokio::task::spawn_blocking(move || decode::checkpoint(&bytes))
120            .await
121            .map_err(|e| CheckpointError::Fetch(anyhow::anyhow!("decode task panicked: {e}")))?
122            .map_err(CheckpointError::Decode)
123    }
124
125    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
126        self.watermark_checkpoint_hi_inclusive()
127            .await
128            .map(|cp| cp.unwrap_or(0))
129    }
130}
131
132/// Disable object_store's internal retries so that transient errors (429s, 5xx) propagate
133/// immediately to the framework's own retry logic.
134pub(super) fn retry_config() -> RetryConfig {
135    RetryConfig {
136        max_retries: 0,
137        ..Default::default()
138    }
139}
140
141#[cfg(test)]
142pub(crate) mod tests {
143    use axum::http::StatusCode;
144    use object_store::ClientOptions;
145    use object_store::http::HttpBuilder;
146    use std::sync::Mutex;
147    use std::sync::atomic::AtomicUsize;
148    use std::sync::atomic::Ordering;
149    use std::time::Duration;
150    use wiremock::Mock;
151    use wiremock::MockServer;
152    use wiremock::Request;
153    use wiremock::Respond;
154    use wiremock::ResponseTemplate;
155    use wiremock::matchers::method;
156    use wiremock::matchers::path;
157    use wiremock::matchers::path_regex;
158
159    use crate::ingestion::error::Error;
160    use crate::ingestion::ingestion_client::IngestionClient;
161    use crate::ingestion::test_utils::test_checkpoint_data;
162    use crate::metrics::tests::test_ingestion_metrics;
163
164    use super::*;
165
166    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
167        Mock::given(method("GET"))
168            .and(path_regex(r"/\d+\.binpb\.zst"))
169            .respond_with(response)
170            .mount(server)
171            .await;
172    }
173
174    /// Mount a high-priority mock for checkpoint 0 used by `StoreIngestionClient::chain_id()`.
175    pub(crate) async fn respond_with_chain_id(server: &MockServer) {
176        Mock::given(method("GET"))
177            .and(path("/0.binpb.zst"))
178            .respond_with(status(StatusCode::OK).set_body_bytes(test_checkpoint_data(0)))
179            .with_priority(1)
180            .mount(server)
181            .await;
182    }
183
184    /// Returns the expected chain_id produced by `StoreIngestionClient::chain_id()` when
185    /// `respond_with_chain_id` is mounted.
186    pub(crate) fn expected_chain_id() -> ChainIdentifier {
187        let bytes = test_checkpoint_data(0);
188        let checkpoint = decode::checkpoint(&bytes).unwrap();
189        (*checkpoint.summary.digest()).into()
190    }
191
192    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
193        ResponseTemplate::new(code.as_u16())
194    }
195
196    fn remote_test_client(uri: String) -> IngestionClient {
197        let store = HttpBuilder::new()
198            .with_url(uri)
199            .with_client_options(ClientOptions::default().with_allow_http(true))
200            .build()
201            .map(Arc::new)
202            .unwrap();
203        IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
204    }
205
206    async fn test_latest_checkpoint_number(watermark: ResponseTemplate) -> anyhow::Result<u64> {
207        let server = MockServer::start().await;
208
209        Mock::given(method("GET"))
210            .and(path(WATERMARK_PATH))
211            .respond_with(watermark)
212            .mount(&server)
213            .await;
214
215        let store = HttpBuilder::new()
216            .with_url(server.uri())
217            .with_client_options(ClientOptions::default().with_allow_http(true))
218            .build()
219            .map(Arc::new)
220            .unwrap();
221        let client = StoreIngestionClient::new(store, None);
222
223        IngestionClientTrait::latest_checkpoint_number(&client).await
224    }
225
226    #[tokio::test]
227    async fn test_latest_checkpoint_no_watermark() {
228        assert_eq!(
229            test_latest_checkpoint_number(status(StatusCode::NOT_FOUND))
230                .await
231                .unwrap(),
232            0
233        )
234    }
235
236    #[tokio::test]
237    async fn test_latest_checkpoint_corrupt_watermark() {
238        assert!(
239            test_latest_checkpoint_number(status(StatusCode::OK).set_body_string("<"))
240                .await
241                .is_err()
242        )
243    }
244
245    #[tokio::test]
246    async fn test_latest_checkpoint_from_watermark() {
247        let body = serde_json::json!({"checkpoint_hi_inclusive": 1}).to_string();
248        assert_eq!(
249            test_latest_checkpoint_number(status(StatusCode::OK).set_body_string(body))
250                .await
251                .unwrap(),
252            1
253        )
254    }
255
256    #[tokio::test]
257    async fn fail_on_not_found() {
258        let server = MockServer::start().await;
259        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
260
261        let client = remote_test_client(server.uri());
262        let error = client.checkpoint(42).await.unwrap_err();
263
264        assert!(matches!(error, Error::NotFound(42)));
265    }
266
267    /// Assume that failures to send the request to the remote store are due to temporary
268    /// connectivity issues, and retry them.
269    #[tokio::test]
270    async fn retry_on_request_error() {
271        let server = MockServer::start().await;
272
273        let times: Mutex<u64> = Mutex::new(0);
274        respond_with(&server, move |r: &Request| {
275            let mut times = times.lock().unwrap();
276            *times += 1;
277            match (*times, r.url.path()) {
278                // The first request will trigger a redirect to 999999.binpb.zst no matter what
279                // the original request was for -- triggering a request error.
280                (1, _) => status(StatusCode::MOVED_PERMANENTLY)
281                    .append_header("Location", "/999999.binpb.zst"),
282
283                // Set-up checkpoint 999999 as an infinite redirect loop.
284                (_, "/999999.binpb.zst") => {
285                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
286                }
287
288                // Subsequently, requests will succeed.
289                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
290            }
291        })
292        .await;
293        respond_with_chain_id(&server).await;
294
295        let client = remote_test_client(server.uri());
296        let envelope = client.checkpoint(42).await.unwrap();
297
298        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
299        assert_eq!(envelope.chain_id, expected_chain_id());
300    }
301
302    /// Assume that certain errors will recover by themselves, and keep retrying with an
303    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
304    /// (rate limiting).
305    #[tokio::test]
306    async fn retry_on_transient_server_error() {
307        let server = MockServer::start().await;
308        let times: Mutex<u64> = Mutex::new(0);
309        respond_with(&server, move |_: &Request| {
310            let mut times = times.lock().unwrap();
311            *times += 1;
312            match *times {
313                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
314                2 => status(StatusCode::REQUEST_TIMEOUT),
315                3 => status(StatusCode::TOO_MANY_REQUESTS),
316                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
317            }
318        })
319        .await;
320        respond_with_chain_id(&server).await;
321
322        let client = remote_test_client(server.uri());
323        let envelope = client.checkpoint(42).await.unwrap();
324
325        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
326        assert_eq!(envelope.chain_id, expected_chain_id());
327    }
328
329    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
330    /// that is fetched should be valid (deserializable as a `Checkpoint`).
331    #[tokio::test]
332    async fn retry_on_deserialization_error() {
333        let server = MockServer::start().await;
334        let times: Mutex<u64> = Mutex::new(0);
335        respond_with(&server, move |_: &Request| {
336            let mut times = times.lock().unwrap();
337            *times += 1;
338            if *times < 3 {
339                status(StatusCode::OK).set_body_bytes(vec![])
340            } else {
341                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
342            }
343        })
344        .await;
345        respond_with_chain_id(&server).await;
346
347        let client = remote_test_client(server.uri());
348        let envelope = client.checkpoint(42).await.unwrap();
349
350        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
351        assert_eq!(envelope.chain_id, expected_chain_id());
352    }
353
354    /// Test that timeout errors are retried as transient errors.
355    /// The first request will timeout, the second will succeed.
356    #[tokio::test]
357    async fn retry_on_timeout() {
358        let server = MockServer::start().await;
359        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
360        let times_clone = times.clone();
361
362        // First request will delay longer than timeout, second will succeed immediately
363        respond_with(&server, move |_: &Request| {
364            match times_clone.fetch_add(1, Ordering::Relaxed) {
365                0 => {
366                    // Delay longer than our test timeout (2 seconds)
367                    std::thread::sleep(Duration::from_secs(4));
368                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
369                }
370                _ => {
371                    // Respond immediately on retry attempts
372                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
373                }
374            }
375        })
376        .await;
377        respond_with_chain_id(&server).await;
378
379        let options = ClientOptions::default()
380            .with_allow_http(true)
381            .with_timeout(Duration::from_secs(2));
382        let store = HttpBuilder::new()
383            .with_url(server.uri())
384            .with_client_options(options)
385            .build()
386            .map(Arc::new)
387            .unwrap();
388        let ingestion_client =
389            IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
390
391        // This should timeout once, then succeed on retry
392        let envelope = ingestion_client.checkpoint(42).await.unwrap();
393        assert_eq!(42, envelope.checkpoint.summary.sequence_number);
394        assert_eq!(envelope.chain_id, expected_chain_id());
395
396        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
397        // The chain_id request for checkpoint 0 is handled by a separate mock.
398        let final_count = times.load(Ordering::Relaxed);
399        assert_eq!(final_count, 2);
400    }
401}