sui_indexer_alt_framework/ingestion/
remote_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, error};
7use url::Url;
8
9use crate::ingestion::Result as IngestionResult;
10use crate::ingestion::ingestion_client::{
11    FetchData, FetchError, FetchResult, IngestionClientTrait,
12};
13
14/// Default timeout for remote checkpoint fetches.
15/// This prevents requests from hanging indefinitely due to network issues,
16/// unresponsive servers, or other connection problems.
17const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
18
19#[derive(thiserror::Error, Debug, Eq, PartialEq)]
20pub enum HttpError {
21    #[error("HTTP error with status code: {0}")]
22    Http(StatusCode),
23}
24
25fn status_code_to_error(code: StatusCode) -> anyhow::Error {
26    HttpError::Http(code).into()
27}
28
29pub struct RemoteIngestionClient {
30    url: Url,
31    client: Client,
32}
33
34impl RemoteIngestionClient {
35    pub fn new(url: Url) -> IngestionResult<Self> {
36        Ok(Self {
37            url,
38            client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
39        })
40    }
41
42    pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
43        Ok(Self {
44            url,
45            client: Client::builder().timeout(timeout).build()?,
46        })
47    }
48
49    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
50    /// The response is a JSON-encoded array of checkpoint sequence numbers.
51    pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
52        // SAFETY: The path being joined is statically known to be valid.
53        let url = self
54            .url
55            .join("/epochs.json")
56            .expect("Unexpected invalid URL");
57
58        self.client.get(url).send().await
59    }
60
61    /// Fetch the bytes for a checkpoint by its sequence number.
62    /// The response is the serialized representation of a checkpoint, as raw bytes.
63    pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
64        // SAFETY: The path being joined is statically known to be valid.
65        let url = self
66            .url
67            .join(&format!("/{checkpoint}.chk"))
68            .expect("Unexpected invalid URL");
69
70        self.client.get(url).send().await
71    }
72}
73
74#[async_trait::async_trait]
75impl IngestionClientTrait for RemoteIngestionClient {
76    /// Fetch a checkpoint from the remote store.
77    ///
78    /// Transient errors include:
79    ///
80    /// - failures to issue a request, (network errors, redirect issues, etc)
81    /// - request timeouts,
82    /// - rate limiting,
83    /// - server errors (5xx),
84    /// - issues getting a full response.
85    async fn fetch(&self, checkpoint: u64) -> FetchResult {
86        let response = self
87            .checkpoint(checkpoint)
88            .await
89            .map_err(|e| FetchError::Transient {
90                reason: "request",
91                error: e.into(),
92            })?;
93
94        match response.status() {
95            code if code.is_success() => {
96                // Failure to extract all the bytes from the payload, or to deserialize the
97                // checkpoint from them is considered a transient error -- the store being
98                // fetched from needs to be corrected, and ingestion will keep retrying it
99                // until it is.
100                response
101                    .bytes()
102                    .await
103                    .map_err(|e| FetchError::Transient {
104                        reason: "bytes",
105                        error: e.into(),
106                    })
107                    .map(FetchData::Raw)
108            }
109
110            // Treat 404s as a special case so we can match on this error type.
111            code @ StatusCode::NOT_FOUND => {
112                debug!(checkpoint, %code, "Checkpoint not found");
113                Err(FetchError::NotFound)
114            }
115
116            // Timeouts are a client error but they are usually transient.
117            code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
118                reason: "timeout",
119                error: status_code_to_error(code),
120            }),
121
122            // Rate limiting is also a client error, but the backoff will eventually widen the
123            // interval appropriately.
124            code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
125                reason: "too_many_requests",
126                error: status_code_to_error(code),
127            }),
128
129            // Assume that if the server is facing difficulties, it will recover eventually.
130            code if code.is_server_error() => Err(FetchError::Transient {
131                reason: "server_error",
132                error: status_code_to_error(code),
133            }),
134
135            // Still retry on other unsuccessful codes, but the reason is unclear.
136            code => Err(FetchError::Transient {
137                reason: "unknown",
138                error: status_code_to_error(code),
139            }),
140        }
141    }
142}
143
144#[cfg(test)]
145pub(crate) mod tests {
146    use super::*;
147    use crate::ingestion::error::Error;
148    use crate::ingestion::ingestion_client::IngestionClient;
149    use crate::ingestion::test_utils::test_checkpoint_data;
150    use crate::metrics::tests::test_ingestion_metrics;
151    use axum::http::StatusCode;
152    use std::sync::{
153        Mutex,
154        atomic::{AtomicUsize, Ordering},
155    };
156    use wiremock::{
157        Mock, MockServer, Request, Respond, ResponseTemplate,
158        matchers::{method, path_regex},
159    };
160
161    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
162        Mock::given(method("GET"))
163            .and(path_regex(r"/\d+.chk"))
164            .respond_with(response)
165            .mount(server)
166            .await;
167    }
168
169    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
170        ResponseTemplate::new(code.as_u16())
171    }
172
173    fn remote_test_client(uri: String) -> IngestionClient {
174        IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_ingestion_metrics()).unwrap()
175    }
176
177    #[tokio::test]
178    async fn fail_on_not_found() {
179        let server = MockServer::start().await;
180        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
181
182        let client = remote_test_client(server.uri());
183        let error = client.fetch(42).await.unwrap_err();
184
185        assert!(matches!(error, Error::NotFound(42)));
186    }
187
188    /// Assume that failures to send the request to the remote store are due to temporary
189    /// connectivity issues, and retry them.
190    #[tokio::test]
191    async fn retry_on_request_error() {
192        let server = MockServer::start().await;
193
194        let times: Mutex<u64> = Mutex::new(0);
195        respond_with(&server, move |r: &Request| {
196            let mut times = times.lock().unwrap();
197            *times += 1;
198            match (*times, r.url.path()) {
199                // The first request will trigger a redirect to 0.chk no matter what the original
200                // request was for -- triggering a request error.
201                (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
202
203                // Set-up checkpoint 0 as an infinite redirect loop.
204                (_, "/0.chk") => {
205                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
206                }
207
208                // Subsequently, requests will succeed.
209                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
210            }
211        })
212        .await;
213
214        let client = remote_test_client(server.uri());
215        let checkpoint = client.fetch(42).await.unwrap();
216
217        assert_eq!(42, checkpoint.summary.sequence_number)
218    }
219
220    /// Assume that certain errors will recover by themselves, and keep retrying with an
221    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
222    /// (rate limiting).
223    #[tokio::test]
224    async fn retry_on_transient_server_error() {
225        let server = MockServer::start().await;
226        let times: Mutex<u64> = Mutex::new(0);
227        respond_with(&server, move |_: &Request| {
228            let mut times = times.lock().unwrap();
229            *times += 1;
230            match *times {
231                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
232                2 => status(StatusCode::REQUEST_TIMEOUT),
233                3 => status(StatusCode::TOO_MANY_REQUESTS),
234                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
235            }
236        })
237        .await;
238
239        let client = remote_test_client(server.uri());
240        let checkpoint = client.fetch(42).await.unwrap();
241
242        assert_eq!(42, checkpoint.summary.sequence_number)
243    }
244
245    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
246    /// that is fetched should be valid (deserializable as a `Checkpoint`).
247    #[tokio::test]
248    async fn retry_on_deserialization_error() {
249        let server = MockServer::start().await;
250        let times: Mutex<u64> = Mutex::new(0);
251        respond_with(&server, move |_: &Request| {
252            let mut times = times.lock().unwrap();
253            *times += 1;
254            if *times < 3 {
255                status(StatusCode::OK).set_body_bytes(vec![])
256            } else {
257                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
258            }
259        })
260        .await;
261
262        let client = remote_test_client(server.uri());
263        let checkpoint = client.fetch(42).await.unwrap();
264
265        assert_eq!(42, checkpoint.summary.sequence_number)
266    }
267
268    /// Test that timeout errors are retried as transient errors.
269    /// The first request will timeout, the second will succeed.
270    #[tokio::test]
271    async fn retry_on_timeout() {
272        use std::sync::Arc;
273
274        let server = MockServer::start().await;
275        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
276        let times_clone = times.clone();
277
278        // First request will delay longer than timeout, second will succeed immediately
279        respond_with(&server, move |_: &Request| {
280            match times_clone.fetch_add(1, Ordering::Relaxed) {
281                0 => {
282                    // Delay longer than our test timeout (2 seconds)
283                    std::thread::sleep(std::time::Duration::from_secs(4));
284                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
285                }
286                _ => {
287                    // Respond immediately on retry attempts
288                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
289                }
290            }
291        })
292        .await;
293
294        // Create a client with a 2 second timeout for testing
295        let ingestion_client = IngestionClient::new_remote_with_timeout(
296            Url::parse(&server.uri()).unwrap(),
297            Duration::from_secs(2),
298            test_ingestion_metrics(),
299        )
300        .unwrap();
301
302        // This should timeout once, then succeed on retry
303        let checkpoint = ingestion_client.fetch(42).await.unwrap();
304        assert_eq!(42, checkpoint.summary.sequence_number);
305
306        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
307        let final_count = times.load(Ordering::Relaxed);
308        assert_eq!(final_count, 2);
309    }
310}