sui_indexer_alt_framework/ingestion/
remote_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, error};
7use url::Url;
8
9use crate::ingestion::Result as IngestionResult;
10use crate::ingestion::client::{FetchData, FetchError, FetchResult, IngestionClientTrait};
11
12/// Default timeout for remote checkpoint fetches.
13/// This prevents requests from hanging indefinitely due to network issues,
14/// unresponsive servers, or other connection problems.
15const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
16
17#[derive(thiserror::Error, Debug, Eq, PartialEq)]
18pub enum HttpError {
19    #[error("HTTP error with status code: {0}")]
20    Http(StatusCode),
21}
22
23fn status_code_to_error(code: StatusCode) -> anyhow::Error {
24    HttpError::Http(code).into()
25}
26
27pub struct RemoteIngestionClient {
28    url: Url,
29    client: Client,
30}
31
32impl RemoteIngestionClient {
33    pub fn new(url: Url) -> IngestionResult<Self> {
34        Ok(Self {
35            url,
36            client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
37        })
38    }
39
40    #[cfg(test)]
41    pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
42        Ok(Self {
43            url,
44            client: Client::builder().timeout(timeout).build()?,
45        })
46    }
47
48    /// Fetch metadata mapping epoch IDs to the sequence numbers of their last checkpoints.
49    /// The response is a JSON-encoded array of checkpoint sequence numbers.
50    pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
51        // SAFETY: The path being joined is statically known to be valid.
52        let url = self
53            .url
54            .join("/epochs.json")
55            .expect("Unexpected invalid URL");
56
57        self.client.get(url).send().await
58    }
59
60    /// Fetch the bytes for a checkpoint by its sequence number.
61    /// The response is the serialized representation of a checkpoint, as raw bytes.
62    pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
63        // SAFETY: The path being joined is statically known to be valid.
64        let url = self
65            .url
66            .join(&format!("/{checkpoint}.chk"))
67            .expect("Unexpected invalid URL");
68
69        self.client.get(url).send().await
70    }
71}
72
73#[async_trait::async_trait]
74impl IngestionClientTrait for RemoteIngestionClient {
75    /// Fetch a checkpoint from the remote store.
76    ///
77    /// Transient errors include:
78    ///
79    /// - failures to issue a request, (network errors, redirect issues, etc)
80    /// - request timeouts,
81    /// - rate limiting,
82    /// - server errors (5xx),
83    /// - issues getting a full response.
84    async fn fetch(&self, checkpoint: u64) -> FetchResult {
85        let response = self
86            .checkpoint(checkpoint)
87            .await
88            .map_err(|e| FetchError::Transient {
89                reason: "request",
90                error: e.into(),
91            })?;
92
93        match response.status() {
94            code if code.is_success() => {
95                // Failure to extract all the bytes from the payload, or to deserialize the
96                // checkpoint from them is considered a transient error -- the store being
97                // fetched from needs to be corrected, and ingestion will keep retrying it
98                // until it is.
99                response
100                    .bytes()
101                    .await
102                    .map_err(|e| FetchError::Transient {
103                        reason: "bytes",
104                        error: e.into(),
105                    })
106                    .map(FetchData::Raw)
107            }
108
109            // Treat 404s as a special case so we can match on this error type.
110            code @ StatusCode::NOT_FOUND => {
111                debug!(checkpoint, %code, "Checkpoint not found");
112                Err(FetchError::NotFound)
113            }
114
115            // Timeouts are a client error but they are usually transient.
116            code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
117                reason: "timeout",
118                error: status_code_to_error(code),
119            }),
120
121            // Rate limiting is also a client error, but the backoff will eventually widen the
122            // interval appropriately.
123            code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
124                reason: "too_many_requests",
125                error: status_code_to_error(code),
126            }),
127
128            // Assume that if the server is facing difficulties, it will recover eventually.
129            code if code.is_server_error() => Err(FetchError::Transient {
130                reason: "server_error",
131                error: status_code_to_error(code),
132            }),
133
134            // Still retry on other unsuccessful codes, but the reason is unclear.
135            code => Err(FetchError::Transient {
136                reason: "unknown",
137                error: status_code_to_error(code),
138            }),
139        }
140    }
141}
142
143#[cfg(test)]
144pub(crate) mod tests {
145    use super::*;
146    use crate::ingestion::client::IngestionClient;
147    use crate::ingestion::error::Error;
148    use crate::ingestion::test_utils::test_checkpoint_data;
149    use crate::metrics::tests::test_metrics;
150    use axum::http::StatusCode;
151    use std::sync::{
152        Mutex,
153        atomic::{AtomicUsize, Ordering},
154    };
155    use wiremock::{
156        Mock, MockServer, Request, Respond, ResponseTemplate,
157        matchers::{method, path_regex},
158    };
159
160    pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
161        Mock::given(method("GET"))
162            .and(path_regex(r"/\d+.chk"))
163            .respond_with(response)
164            .mount(server)
165            .await;
166    }
167
168    pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
169        ResponseTemplate::new(code.as_u16())
170    }
171
172    fn remote_test_client(uri: String) -> IngestionClient {
173        IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_metrics()).unwrap()
174    }
175
176    #[tokio::test]
177    async fn fail_on_not_found() {
178        let server = MockServer::start().await;
179        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
180
181        let client = remote_test_client(server.uri());
182        let error = client.fetch(42).await.unwrap_err();
183
184        assert!(matches!(error, Error::NotFound(42)));
185    }
186
187    /// Assume that failures to send the request to the remote store are due to temporary
188    /// connectivity issues, and retry them.
189    #[tokio::test]
190    async fn retry_on_request_error() {
191        let server = MockServer::start().await;
192
193        let times: Mutex<u64> = Mutex::new(0);
194        respond_with(&server, move |r: &Request| {
195            let mut times = times.lock().unwrap();
196            *times += 1;
197            match (*times, r.url.path()) {
198                // The first request will trigger a redirect to 0.chk no matter what the original
199                // request was for -- triggering a request error.
200                (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
201
202                // Set-up checkpoint 0 as an infinite redirect loop.
203                (_, "/0.chk") => {
204                    status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
205                }
206
207                // Subsequently, requests will succeed.
208                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
209            }
210        })
211        .await;
212
213        let client = remote_test_client(server.uri());
214        let checkpoint = client.fetch(42).await.unwrap();
215
216        assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
217    }
218
219    /// Assume that certain errors will recover by themselves, and keep retrying with an
220    /// exponential back-off. These errors include: 5xx (server) errors, 408 (timeout), and 429
221    /// (rate limiting).
222    #[tokio::test]
223    async fn retry_on_transient_server_error() {
224        let server = MockServer::start().await;
225        let times: Mutex<u64> = Mutex::new(0);
226        respond_with(&server, move |_: &Request| {
227            let mut times = times.lock().unwrap();
228            *times += 1;
229            match *times {
230                1 => status(StatusCode::INTERNAL_SERVER_ERROR),
231                2 => status(StatusCode::REQUEST_TIMEOUT),
232                3 => status(StatusCode::TOO_MANY_REQUESTS),
233                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
234            }
235        })
236        .await;
237
238        let client = remote_test_client(server.uri());
239        let checkpoint = client.fetch(42).await.unwrap();
240
241        assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
242    }
243
244    /// Treat deserialization failure as another kind of transient error -- all checkpoint data
245    /// that is fetched should be valid (deserializable as a `CheckpointData`).
246    #[tokio::test]
247    async fn retry_on_deserialization_error() {
248        let server = MockServer::start().await;
249        let times: Mutex<u64> = Mutex::new(0);
250        respond_with(&server, move |_: &Request| {
251            let mut times = times.lock().unwrap();
252            *times += 1;
253            if *times < 3 {
254                status(StatusCode::OK).set_body_bytes(vec![])
255            } else {
256                status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
257            }
258        })
259        .await;
260
261        let client = remote_test_client(server.uri());
262        let checkpoint = client.fetch(42).await.unwrap();
263
264        assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
265    }
266
267    /// Test that timeout errors are retried as transient errors.
268    /// The first request will timeout, the second will succeed.
269    #[tokio::test]
270    async fn retry_on_timeout() {
271        use std::sync::Arc;
272
273        let server = MockServer::start().await;
274        let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
275        let times_clone = times.clone();
276
277        // First request will delay longer than timeout, second will succeed immediately
278        respond_with(&server, move |_: &Request| {
279            match times_clone.fetch_add(1, Ordering::Relaxed) {
280                0 => {
281                    // Delay longer than our test timeout (2 seconds)
282                    std::thread::sleep(std::time::Duration::from_secs(4));
283                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
284                }
285                _ => {
286                    // Respond immediately on retry attempts
287                    status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
288                }
289            }
290        })
291        .await;
292
293        // Create a client with a 2 second timeout for testing
294        let ingestion_client = IngestionClient::new_remote_with_timeout(
295            Url::parse(&server.uri()).unwrap(),
296            Duration::from_secs(2),
297            test_metrics(),
298        )
299        .unwrap();
300
301        // This should timeout once, then succeed on retry
302        let checkpoint = ingestion_client.fetch(42).await.unwrap();
303        assert_eq!(42, checkpoint.checkpoint_summary.sequence_number);
304
305        // Verify that the server received exactly 2 requests (1 timeout + 1 successful retry)
306        let final_count = times.load(Ordering::Relaxed);
307        assert_eq!(final_count, 2);
308    }
309}