sui_indexer_alt_framework/ingestion/
remote_client.rs1use std::time::Duration;
5
6use reqwest::Client;
7use reqwest::StatusCode;
8use tracing::debug;
9use tracing::error;
10use url::Url;
11
12use crate::ingestion::Result as IngestionResult;
13use crate::ingestion::ingestion_client::FetchData;
14use crate::ingestion::ingestion_client::FetchError;
15use crate::ingestion::ingestion_client::FetchResult;
16use crate::ingestion::ingestion_client::IngestionClientTrait;
17
18const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
22
23#[derive(thiserror::Error, Debug, Eq, PartialEq)]
24pub enum HttpError {
25 #[error("HTTP error with status code: {0}")]
26 Http(StatusCode),
27}
28
29fn status_code_to_error(code: StatusCode) -> anyhow::Error {
30 HttpError::Http(code).into()
31}
32
33pub struct RemoteIngestionClient {
34 url: Url,
35 client: Client,
36}
37
38impl RemoteIngestionClient {
39 pub fn new(url: Url) -> IngestionResult<Self> {
40 Ok(Self {
41 url,
42 client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
43 })
44 }
45
46 pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
47 Ok(Self {
48 url,
49 client: Client::builder().timeout(timeout).build()?,
50 })
51 }
52
53 pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
56 let url = self
58 .url
59 .join("/epochs.json")
60 .expect("Unexpected invalid URL");
61
62 self.client.get(url).send().await
63 }
64
65 pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
68 let url = self
70 .url
71 .join(&format!("/{checkpoint}.chk"))
72 .expect("Unexpected invalid URL");
73
74 self.client.get(url).send().await
75 }
76}
77
78#[async_trait::async_trait]
79impl IngestionClientTrait for RemoteIngestionClient {
80 async fn fetch(&self, checkpoint: u64) -> FetchResult {
90 let response = self
91 .checkpoint(checkpoint)
92 .await
93 .map_err(|e| FetchError::Transient {
94 reason: "request",
95 error: e.into(),
96 })?;
97
98 match response.status() {
99 code if code.is_success() => {
100 response
105 .bytes()
106 .await
107 .map_err(|e| FetchError::Transient {
108 reason: "bytes",
109 error: e.into(),
110 })
111 .map(FetchData::Raw)
112 }
113
114 code @ StatusCode::NOT_FOUND => {
116 debug!(checkpoint, %code, "Checkpoint not found");
117 Err(FetchError::NotFound)
118 }
119
120 code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
122 reason: "timeout",
123 error: status_code_to_error(code),
124 }),
125
126 code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
129 reason: "too_many_requests",
130 error: status_code_to_error(code),
131 }),
132
133 code if code.is_server_error() => Err(FetchError::Transient {
135 reason: "server_error",
136 error: status_code_to_error(code),
137 }),
138
139 code => Err(FetchError::Transient {
141 reason: "unknown",
142 error: status_code_to_error(code),
143 }),
144 }
145 }
146}
147
148#[cfg(test)]
149pub(crate) mod tests {
150 use super::*;
151 use crate::ingestion::error::Error;
152 use crate::ingestion::ingestion_client::IngestionClient;
153 use crate::ingestion::test_utils::test_checkpoint_data;
154 use crate::metrics::tests::test_ingestion_metrics;
155 use axum::http::StatusCode;
156 use std::sync::Mutex;
157 use std::sync::atomic::AtomicUsize;
158 use std::sync::atomic::Ordering;
159 use wiremock::Mock;
160 use wiremock::MockServer;
161 use wiremock::Request;
162 use wiremock::Respond;
163 use wiremock::ResponseTemplate;
164 use wiremock::matchers::method;
165 use wiremock::matchers::path_regex;
166
167 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
168 Mock::given(method("GET"))
169 .and(path_regex(r"/\d+.chk"))
170 .respond_with(response)
171 .mount(server)
172 .await;
173 }
174
175 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
176 ResponseTemplate::new(code.as_u16())
177 }
178
179 fn remote_test_client(uri: String) -> IngestionClient {
180 IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_ingestion_metrics()).unwrap()
181 }
182
183 #[tokio::test]
184 async fn fail_on_not_found() {
185 let server = MockServer::start().await;
186 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
187
188 let client = remote_test_client(server.uri());
189 let error = client.fetch(42).await.unwrap_err();
190
191 assert!(matches!(error, Error::NotFound(42)));
192 }
193
194 #[tokio::test]
197 async fn retry_on_request_error() {
198 let server = MockServer::start().await;
199
200 let times: Mutex<u64> = Mutex::new(0);
201 respond_with(&server, move |r: &Request| {
202 let mut times = times.lock().unwrap();
203 *times += 1;
204 match (*times, r.url.path()) {
205 (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
208
209 (_, "/0.chk") => {
211 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
212 }
213
214 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
216 }
217 })
218 .await;
219
220 let client = remote_test_client(server.uri());
221 let checkpoint = client.fetch(42).await.unwrap();
222
223 assert_eq!(42, checkpoint.summary.sequence_number)
224 }
225
226 #[tokio::test]
230 async fn retry_on_transient_server_error() {
231 let server = MockServer::start().await;
232 let times: Mutex<u64> = Mutex::new(0);
233 respond_with(&server, move |_: &Request| {
234 let mut times = times.lock().unwrap();
235 *times += 1;
236 match *times {
237 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
238 2 => status(StatusCode::REQUEST_TIMEOUT),
239 3 => status(StatusCode::TOO_MANY_REQUESTS),
240 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
241 }
242 })
243 .await;
244
245 let client = remote_test_client(server.uri());
246 let checkpoint = client.fetch(42).await.unwrap();
247
248 assert_eq!(42, checkpoint.summary.sequence_number)
249 }
250
251 #[tokio::test]
254 async fn retry_on_deserialization_error() {
255 let server = MockServer::start().await;
256 let times: Mutex<u64> = Mutex::new(0);
257 respond_with(&server, move |_: &Request| {
258 let mut times = times.lock().unwrap();
259 *times += 1;
260 if *times < 3 {
261 status(StatusCode::OK).set_body_bytes(vec![])
262 } else {
263 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
264 }
265 })
266 .await;
267
268 let client = remote_test_client(server.uri());
269 let checkpoint = client.fetch(42).await.unwrap();
270
271 assert_eq!(42, checkpoint.summary.sequence_number)
272 }
273
274 #[tokio::test]
277 async fn retry_on_timeout() {
278 use std::sync::Arc;
279
280 let server = MockServer::start().await;
281 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
282 let times_clone = times.clone();
283
284 respond_with(&server, move |_: &Request| {
286 match times_clone.fetch_add(1, Ordering::Relaxed) {
287 0 => {
288 std::thread::sleep(std::time::Duration::from_secs(4));
290 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
291 }
292 _ => {
293 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
295 }
296 }
297 })
298 .await;
299
300 let ingestion_client = IngestionClient::new_remote_with_timeout(
302 Url::parse(&server.uri()).unwrap(),
303 Duration::from_secs(2),
304 test_ingestion_metrics(),
305 )
306 .unwrap();
307
308 let checkpoint = ingestion_client.fetch(42).await.unwrap();
310 assert_eq!(42, checkpoint.summary.sequence_number);
311
312 let final_count = times.load(Ordering::Relaxed);
314 assert_eq!(final_count, 2);
315 }
316}