sui_indexer_alt_framework/ingestion/
remote_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::time::Duration;
5
6use reqwest::Client;
7use reqwest::StatusCode;
8use tracing::debug;
9use tracing::error;
10use url::Url;
11
12use crate::ingestion::Result as IngestionResult;
13use crate::ingestion::ingestion_client::FetchData;
14use crate::ingestion::ingestion_client::FetchError;
15use crate::ingestion::ingestion_client::FetchResult;
16use crate::ingestion::ingestion_client::IngestionClientTrait;
17
18/// Default timeout for remote checkpoint fetches.
19/// This prevents requests from hanging indefinitely due to network issues,
20/// unresponsive servers, or other connection problems.
21const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
22
23#[derive(thiserror::Error, Debug, Eq, PartialEq)]
24pub enum HttpError {
25    #[error("HTTP error with status code: {0}")]
26    Http(StatusCode),
27}
28
29fn status_code_to_error(code: StatusCode) -> anyhow::Error {
30    HttpError::Http(code).into()
31}
32
33pub struct RemoteIngestionClient {
34    url: Url,
35    client: Client,
36}
37
38impl RemoteIngestionClient {
39    pub fn new(url: Url) -> IngestionResult<Self> {
40        Ok(Self {
41            url,
42            client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
43        })
44    }
45
46    pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
47        Ok(Self {
48            url,
49            client: Client::builder().timeout(timeout).build()?,
50        })
51    }
52
53    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
54    /// The response is a JSON-encoded array of checkpoint sequence numbers.
55    pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
56        // SAFETY: The path being joined is statically known to be valid.
57        let url = self
58            .url
59            .join("/epochs.json")
60            .expect("Unexpected invalid URL");
61
62        self.client.get(url).send().await
63    }
64
65    /// Fetch the bytes for a checkpoint by its sequence number.
66    /// The response is the serialized representation of a checkpoint, as raw bytes.
67    pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
68        // SAFETY: The path being joined is statically known to be valid.
69        let url = self
70            .url
71            .join(&format!("/{checkpoint}.chk"))
72            .expect("Unexpected invalid URL");
73
74        self.client.get(url).send().await
75    }
76}
77
78#[async_trait::async_trait]
79impl IngestionClientTrait for RemoteIngestionClient {
80    /// Fetch a checkpoint from the remote store.
81    ///
82    /// Transient errors include:
83    ///
84    /// - failures to issue a request, (network errors, redirect issues, etc)
85    /// - request timeouts,
86    /// - rate limiting,
87    /// - server errors (5xx),
88    /// - issues getting a full response.
89    async fn fetch(&self, checkpoint: u64) -> FetchResult {
90        let response = self
91            .checkpoint(checkpoint)
92            .await
93            .map_err(|e| FetchError::Transient {
94                reason: "request",
95                error: e.into(),
96            })?;
97
98        match response.status() {
99            code if code.is_success() => {
100                // Failure to extract all the bytes from the payload, or to deserialize the
101                // checkpoint from them is considered a transient error -- the store being
102                // fetched from needs to be corrected, and ingestion will keep retrying it
103                // until it is.
104                response
105                    .bytes()
106                    .await
107                    .map_err(|e| FetchError::Transient {
108                        reason: "bytes",
109                        error: e.into(),
110                    })
111                    .map(FetchData::Raw)
112            }
113
114            // Treat 404s as a special case so we can match on this error type.
115            code @ StatusCode::NOT_FOUND => {
116                debug!(checkpoint, %code, "Checkpoint not found");
117                Err(FetchError::NotFound)
118            }
119
120            // Timeouts are a client error but they are usually transient.
121            code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
122                reason: "timeout",
123                error: status_code_to_error(code),
124            }),
125
126            // Rate limiting is also a client error, but the backoff will eventually widen the
127            // interval appropriately.
128            code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
129                reason: "too_many_requests",
130                error: status_code_to_error(code),
131            }),
132
133            // Assume that if the server is facing difficulties, it will recover eventually.
134            code if code.is_server_error() => Err(FetchError::Transient {
135                reason: "server_error",
136                error: status_code_to_error(code),
137            }),
138
139            // Still retry on other unsuccessful codes, but the reason is unclear.
140            code => Err(FetchError::Transient {
141                reason: "unknown",
142                error: status_code_to_error(code),
143            }),
144        }
145    }
146}
147
148#[cfg(test)]
149pub(crate) mod tests {
150    use super::*;
151    use crate::ingestion::error::Error;
152    use crate::ingestion::ingestion_client::IngestionClient;
153    use crate::ingestion::test_utils::test_checkpoint_data;
154    use crate::metrics::tests::test_ingestion_metrics;
155    use axum::http::StatusCode;
156    use std::sync::Mutex;
157    use std::sync::atomic::AtomicUsize;
158    use std::sync::atomic::Ordering;
159    use wiremock::Mock;
160    use wiremock::MockServer;
161    use wiremock::Request;
162    use wiremock::Respond;
163    use wiremock::ResponseTemplate;
164    use wiremock::matchers::method;
165    use wiremock::matchers::path_regex;
166
167    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
168        Mock::given(method("GET"))
169            .and(path_regex(r"/\d+.chk"))
170            .respond_with(response)
171            .mount(server)
172            .await;
173    }
174
175    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
176        ResponseTemplate::new(code.as_u16())
177    }
178
179    fn remote_test_client(uri: String) -> IngestionClient {
180        IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_ingestion_metrics()).unwrap()
181    }
182
183    #[tokio::test]
184    async fn fail_on_not_found() {
185        let server = MockServer::start().await;
186        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
187
188        let client = remote_test_client(server.uri());
189        let error = client.fetch(42).await.unwrap_err();
190
191        assert!(matches!(error, Error::NotFound(42)));
192    }
193
194    /// Assume that failures to send the request to the remote store are due to temporary
195    /// connectivity issues, and retry them.
196    #[tokio::test]
197    async fn retry_on_request_error() {
198        let server = MockServer::start().await;
199
200        let times: Mutex<u64> = Mutex::new(0);
201        respond_with(&server, move |r: &Request| {
202            let mut times = times.lock().unwrap();
203            *times += 1;
204            match (*times, r.url.path()) {
205                // The first request will trigger a redirect to 0.chk no matter what the original
206                // request was for -- triggering a request error.
207                (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
208
209                // Set-up checkpoint 0 as an infinite redirect loop.
210                (_, "/0.chk") => {
211                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
212                }
213
214                // Subsequently, requests will succeed.
215                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
216            }
217        })
218        .await;
219
220        let client = remote_test_client(server.uri());
221        let checkpoint = client.fetch(42).await.unwrap();
222
223        assert_eq!(42, checkpoint.summary.sequence_number)
224    }
225
226    /// Assume that certain errors will recover by themselves, and keep retrying with an
227    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
228    /// (rate limiting).
229    #[tokio::test]
230    async fn retry_on_transient_server_error() {
231        let server = MockServer::start().await;
232        let times: Mutex<u64> = Mutex::new(0);
233        respond_with(&server, move |_: &Request| {
234            let mut times = times.lock().unwrap();
235            *times += 1;
236            match *times {
237                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
238                2 => status(StatusCode::REQUEST_TIMEOUT),
239                3 => status(StatusCode::TOO_MANY_REQUESTS),
240                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
241            }
242        })
243        .await;
244
245        let client = remote_test_client(server.uri());
246        let checkpoint = client.fetch(42).await.unwrap();
247
248        assert_eq!(42, checkpoint.summary.sequence_number)
249    }
250
251    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
252    /// that is fetched should be valid (deserializable as a `Checkpoint`).
253    #[tokio::test]
254    async fn retry_on_deserialization_error() {
255        let server = MockServer::start().await;
256        let times: Mutex<u64> = Mutex::new(0);
257        respond_with(&server, move |_: &Request| {
258            let mut times = times.lock().unwrap();
259            *times += 1;
260            if *times < 3 {
261                status(StatusCode::OK).set_body_bytes(vec![])
262            } else {
263                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
264            }
265        })
266        .await;
267
268        let client = remote_test_client(server.uri());
269        let checkpoint = client.fetch(42).await.unwrap();
270
271        assert_eq!(42, checkpoint.summary.sequence_number)
272    }
273
274    /// Test that timeout errors are retried as transient errors.
275    /// The first request will timeout, the second will succeed.
276    #[tokio::test]
277    async fn retry_on_timeout() {
278        use std::sync::Arc;
279
280        let server = MockServer::start().await;
281        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
282        let times_clone = times.clone();
283
284        // First request will delay longer than timeout, second will succeed immediately
285        respond_with(&server, move |_: &Request| {
286            match times_clone.fetch_add(1, Ordering::Relaxed) {
287                0 => {
288                    // Delay longer than our test timeout (2 seconds)
289                    std::thread::sleep(std::time::Duration::from_secs(4));
290                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
291                }
292                _ => {
293                    // Respond immediately on retry attempts
294                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
295                }
296            }
297        })
298        .await;
299
300        // Create a client with a 2 second timeout for testing
301        let ingestion_client = IngestionClient::new_remote_with_timeout(
302            Url::parse(&server.uri()).unwrap(),
303            Duration::from_secs(2),
304            test_ingestion_metrics(),
305        )
306        .unwrap();
307
308        // This should timeout once, then succeed on retry
309        let checkpoint = ingestion_client.fetch(42).await.unwrap();
310        assert_eq!(42, checkpoint.summary.sequence_number);
311
312        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
313        let final_count = times.load(Ordering::Relaxed);
314        assert_eq!(final_count, 2);
315    }
316}