sui_indexer_alt_framework/ingestion/
store_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use bytes::Bytes;
7use object_store::Error as ObjectStoreError;
8use object_store::ObjectStore;
9use object_store::ObjectStoreExt;
10use object_store::RetryConfig;
11use object_store::path::Path as ObjectPath;
12use serde::de::DeserializeOwned;
13use tracing::debug;
14use tracing::error;
15
16use crate::ingestion::decode;
17use crate::ingestion::ingestion_client::CheckpointData;
18use crate::ingestion::ingestion_client::CheckpointError;
19use crate::ingestion::ingestion_client::CheckpointResult;
20use crate::ingestion::ingestion_client::IngestionClientTrait;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23/// Disable object_store's internal retries so that transient errors (429s, 5xx) propagate
24/// immediately to the framework's own retry logic.
25pub(super) fn retry_config() -> RetryConfig {
26    RetryConfig {
27        max_retries: 0,
28        ..Default::default()
29    }
30}
31
32pub struct StoreIngestionClient {
33    store: Arc<dyn ObjectStore>,
34}
35
36impl StoreIngestionClient {
37    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
38        Self { store }
39    }
40
41    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
42    /// The response is a JSON-encoded array of checkpoint sequence numbers.
43    pub async fn end_of_epoch_checkpoints<T: DeserializeOwned>(&self) -> anyhow::Result<T> {
44        let bytes = self.bytes(ObjectPath::from("epochs.json")).await?;
45        Ok(serde_json::from_slice(&bytes)?)
46    }
47
48    /// Fetch and decode checkpoint data by sequence number.
49    pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
50        let bytes = self.checkpoint_bytes(checkpoint).await?;
51        Ok(decode::checkpoint(&bytes)?)
52    }
53
54    async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
55        self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
56            .await
57    }
58
59    async fn bytes(&self, path: ObjectPath) -> object_store::Result<Bytes> {
60        let result = self.store.get(&path).await?;
61        result.bytes().await
62    }
63}
64
65#[async_trait::async_trait]
66impl IngestionClientTrait for StoreIngestionClient {
67    /// Fetch a checkpoint from the remote store.
68    ///
69    /// Transient errors include:
70    ///
71    /// - failures to issue a request, (network errors, redirect issues, etc)
72    /// - request timeouts,
73    /// - rate limiting,
74    /// - server errors (5xx),
75    /// - issues getting a full response.
76    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
77        match self.checkpoint_bytes(checkpoint).await {
78            Ok(bytes) => Ok(CheckpointData::Raw(bytes)),
79            Err(ObjectStoreError::NotFound { .. }) => {
80                debug!(checkpoint, "Checkpoint not found");
81                Err(CheckpointError::NotFound)
82            }
83            Err(error) => {
84                error!(checkpoint, "Failed to fetch checkpoint: {error}");
85                Err(CheckpointError::Transient {
86                    reason: "object_store",
87                    error: error.into(),
88                })
89            }
90        }
91    }
92}
93
94#[cfg(test)]
95pub(crate) mod tests {
96    use axum::http::StatusCode;
97    use object_store::ClientOptions;
98    use object_store::http::HttpBuilder;
99    use std::sync::Mutex;
100    use std::sync::atomic::AtomicUsize;
101    use std::sync::atomic::Ordering;
102    use std::time::Duration;
103    use wiremock::Mock;
104    use wiremock::MockServer;
105    use wiremock::Request;
106    use wiremock::Respond;
107    use wiremock::ResponseTemplate;
108    use wiremock::matchers::method;
109    use wiremock::matchers::path_regex;
110
111    use crate::ingestion::error::Error;
112    use crate::ingestion::ingestion_client::IngestionClient;
113    use crate::ingestion::test_utils::test_checkpoint_data;
114    use crate::metrics::tests::test_ingestion_metrics;
115
116    use super::*;
117
118    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
119        Mock::given(method("GET"))
120            .and(path_regex(r"/\d+\.binpb\.zst"))
121            .respond_with(response)
122            .mount(server)
123            .await;
124    }
125
126    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
127        ResponseTemplate::new(code.as_u16())
128    }
129
130    fn remote_test_client(uri: String) -> IngestionClient {
131        let store = HttpBuilder::new()
132            .with_url(uri)
133            .with_client_options(ClientOptions::default().with_allow_http(true))
134            .build()
135            .map(Arc::new)
136            .unwrap();
137        IngestionClient::with_store(store, test_ingestion_metrics()).unwrap()
138    }
139
140    #[tokio::test]
141    async fn fail_on_not_found() {
142        let server = MockServer::start().await;
143        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
144
145        let client = remote_test_client(server.uri());
146        let error = client.checkpoint(42).await.unwrap_err();
147
148        assert!(matches!(error, Error::NotFound(42)));
149    }
150
151    /// Assume that failures to send the request to the remote store are due to temporary
152    /// connectivity issues, and retry them.
153    #[tokio::test]
154    async fn retry_on_request_error() {
155        let server = MockServer::start().await;
156
157        let times: Mutex<u64> = Mutex::new(0);
158        respond_with(&server, move |r: &Request| {
159            let mut times = times.lock().unwrap();
160            *times += 1;
161            match (*times, r.url.path()) {
162                // The first request will trigger a redirect to 0.binpb.zst no matter what the
163                // original request was for -- triggering a request error.
164                (1, _) => {
165                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.binpb.zst")
166                }
167
168                // Set-up checkpoint 0 as an infinite redirect loop.
169                (_, "/0.binpb.zst") => {
170                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
171                }
172
173                // Subsequently, requests will succeed.
174                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
175            }
176        })
177        .await;
178
179        let client = remote_test_client(server.uri());
180        let checkpoint = client.checkpoint(42).await.unwrap();
181
182        assert_eq!(42, checkpoint.summary.sequence_number)
183    }
184
185    /// Assume that certain errors will recover by themselves, and keep retrying with an
186    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
187    /// (rate limiting).
188    #[tokio::test]
189    async fn retry_on_transient_server_error() {
190        let server = MockServer::start().await;
191        let times: Mutex<u64> = Mutex::new(0);
192        respond_with(&server, move |_: &Request| {
193            let mut times = times.lock().unwrap();
194            *times += 1;
195            match *times {
196                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
197                2 => status(StatusCode::REQUEST_TIMEOUT),
198                3 => status(StatusCode::TOO_MANY_REQUESTS),
199                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
200            }
201        })
202        .await;
203
204        let client = remote_test_client(server.uri());
205        let checkpoint = client.checkpoint(42).await.unwrap();
206
207        assert_eq!(42, checkpoint.summary.sequence_number)
208    }
209
210    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
211    /// that is fetched should be valid (deserializable as a `Checkpoint`).
212    #[tokio::test]
213    async fn retry_on_deserialization_error() {
214        let server = MockServer::start().await;
215        let times: Mutex<u64> = Mutex::new(0);
216        respond_with(&server, move |_: &Request| {
217            let mut times = times.lock().unwrap();
218            *times += 1;
219            if *times < 3 {
220                status(StatusCode::OK).set_body_bytes(vec![])
221            } else {
222                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
223            }
224        })
225        .await;
226
227        let client = remote_test_client(server.uri());
228        let checkpoint = client.checkpoint(42).await.unwrap();
229
230        assert_eq!(42, checkpoint.summary.sequence_number)
231    }
232
233    /// Test that timeout errors are retried as transient errors.
234    /// The first request will timeout, the second will succeed.
235    #[tokio::test]
236    async fn retry_on_timeout() {
237        let server = MockServer::start().await;
238        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
239        let times_clone = times.clone();
240
241        // First request will delay longer than timeout, second will succeed immediately
242        respond_with(&server, move |_: &Request| {
243            match times_clone.fetch_add(1, Ordering::Relaxed) {
244                0 => {
245                    // Delay longer than our test timeout (2 seconds)
246                    std::thread::sleep(std::time::Duration::from_secs(4));
247                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
248                }
249                _ => {
250                    // Respond immediately on retry attempts
251                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
252                }
253            }
254        })
255        .await;
256
257        let options = ClientOptions::default()
258            .with_allow_http(true)
259            .with_timeout(Duration::from_secs(2));
260        let store = HttpBuilder::new()
261            .with_url(server.uri())
262            .with_client_options(options)
263            .build()
264            .map(Arc::new)
265            .unwrap();
266        let ingestion_client =
267            IngestionClient::with_store(store, test_ingestion_metrics()).unwrap();
268
269        // This should timeout once, then succeed on retry
270        let checkpoint = ingestion_client.checkpoint(42).await.unwrap();
271        assert_eq!(42, checkpoint.summary.sequence_number);
272
273        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
274        let final_count = times.load(Ordering::Relaxed);
275        assert_eq!(final_count, 2);
276    }
277}