sui_indexer_alt_framework/ingestion/
remote_client.rs1use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, error};
7use url::Url;
8
9use crate::ingestion::Result as IngestionResult;
10use crate::ingestion::ingestion_client::{
11 FetchData, FetchError, FetchResult, IngestionClientTrait,
12};
13
14const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
18
19#[derive(thiserror::Error, Debug, Eq, PartialEq)]
20pub enum HttpError {
21 #[error("HTTP error with status code: {0}")]
22 Http(StatusCode),
23}
24
25fn status_code_to_error(code: StatusCode) -> anyhow::Error {
26 HttpError::Http(code).into()
27}
28
29pub struct RemoteIngestionClient {
30 url: Url,
31 client: Client,
32}
33
34impl RemoteIngestionClient {
35 pub fn new(url: Url) -> IngestionResult<Self> {
36 Ok(Self {
37 url,
38 client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
39 })
40 }
41
42 pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
43 Ok(Self {
44 url,
45 client: Client::builder().timeout(timeout).build()?,
46 })
47 }
48
49 pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
52 let url = self
54 .url
55 .join("/epochs.json")
56 .expect("Unexpected invalid URL");
57
58 self.client.get(url).send().await
59 }
60
61 pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
64 let url = self
66 .url
67 .join(&format!("/{checkpoint}.chk"))
68 .expect("Unexpected invalid URL");
69
70 self.client.get(url).send().await
71 }
72}
73
74#[async_trait::async_trait]
75impl IngestionClientTrait for RemoteIngestionClient {
76 async fn fetch(&self, checkpoint: u64) -> FetchResult {
86 let response = self
87 .checkpoint(checkpoint)
88 .await
89 .map_err(|e| FetchError::Transient {
90 reason: "request",
91 error: e.into(),
92 })?;
93
94 match response.status() {
95 code if code.is_success() => {
96 response
101 .bytes()
102 .await
103 .map_err(|e| FetchError::Transient {
104 reason: "bytes",
105 error: e.into(),
106 })
107 .map(FetchData::Raw)
108 }
109
110 code @ StatusCode::NOT_FOUND => {
112 debug!(checkpoint, %code, "Checkpoint not found");
113 Err(FetchError::NotFound)
114 }
115
116 code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
118 reason: "timeout",
119 error: status_code_to_error(code),
120 }),
121
122 code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
125 reason: "too_many_requests",
126 error: status_code_to_error(code),
127 }),
128
129 code if code.is_server_error() => Err(FetchError::Transient {
131 reason: "server_error",
132 error: status_code_to_error(code),
133 }),
134
135 code => Err(FetchError::Transient {
137 reason: "unknown",
138 error: status_code_to_error(code),
139 }),
140 }
141 }
142}
143
144#[cfg(test)]
145pub(crate) mod tests {
146 use super::*;
147 use crate::ingestion::error::Error;
148 use crate::ingestion::ingestion_client::IngestionClient;
149 use crate::ingestion::test_utils::test_checkpoint_data;
150 use crate::metrics::tests::test_ingestion_metrics;
151 use axum::http::StatusCode;
152 use std::sync::{
153 Mutex,
154 atomic::{AtomicUsize, Ordering},
155 };
156 use wiremock::{
157 Mock, MockServer, Request, Respond, ResponseTemplate,
158 matchers::{method, path_regex},
159 };
160
161 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
162 Mock::given(method("GET"))
163 .and(path_regex(r"/\d+.chk"))
164 .respond_with(response)
165 .mount(server)
166 .await;
167 }
168
169 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
170 ResponseTemplate::new(code.as_u16())
171 }
172
173 fn remote_test_client(uri: String) -> IngestionClient {
174 IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_ingestion_metrics()).unwrap()
175 }
176
177 #[tokio::test]
178 async fn fail_on_not_found() {
179 let server = MockServer::start().await;
180 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
181
182 let client = remote_test_client(server.uri());
183 let error = client.fetch(42).await.unwrap_err();
184
185 assert!(matches!(error, Error::NotFound(42)));
186 }
187
188 #[tokio::test]
191 async fn retry_on_request_error() {
192 let server = MockServer::start().await;
193
194 let times: Mutex<u64> = Mutex::new(0);
195 respond_with(&server, move |r: &Request| {
196 let mut times = times.lock().unwrap();
197 *times += 1;
198 match (*times, r.url.path()) {
199 (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
202
203 (_, "/0.chk") => {
205 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
206 }
207
208 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
210 }
211 })
212 .await;
213
214 let client = remote_test_client(server.uri());
215 let checkpoint = client.fetch(42).await.unwrap();
216
217 assert_eq!(42, checkpoint.summary.sequence_number)
218 }
219
220 #[tokio::test]
224 async fn retry_on_transient_server_error() {
225 let server = MockServer::start().await;
226 let times: Mutex<u64> = Mutex::new(0);
227 respond_with(&server, move |_: &Request| {
228 let mut times = times.lock().unwrap();
229 *times += 1;
230 match *times {
231 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
232 2 => status(StatusCode::REQUEST_TIMEOUT),
233 3 => status(StatusCode::TOO_MANY_REQUESTS),
234 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
235 }
236 })
237 .await;
238
239 let client = remote_test_client(server.uri());
240 let checkpoint = client.fetch(42).await.unwrap();
241
242 assert_eq!(42, checkpoint.summary.sequence_number)
243 }
244
245 #[tokio::test]
248 async fn retry_on_deserialization_error() {
249 let server = MockServer::start().await;
250 let times: Mutex<u64> = Mutex::new(0);
251 respond_with(&server, move |_: &Request| {
252 let mut times = times.lock().unwrap();
253 *times += 1;
254 if *times < 3 {
255 status(StatusCode::OK).set_body_bytes(vec![])
256 } else {
257 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
258 }
259 })
260 .await;
261
262 let client = remote_test_client(server.uri());
263 let checkpoint = client.fetch(42).await.unwrap();
264
265 assert_eq!(42, checkpoint.summary.sequence_number)
266 }
267
268 #[tokio::test]
271 async fn retry_on_timeout() {
272 use std::sync::Arc;
273
274 let server = MockServer::start().await;
275 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
276 let times_clone = times.clone();
277
278 respond_with(&server, move |_: &Request| {
280 match times_clone.fetch_add(1, Ordering::Relaxed) {
281 0 => {
282 std::thread::sleep(std::time::Duration::from_secs(4));
284 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
285 }
286 _ => {
287 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
289 }
290 }
291 })
292 .await;
293
294 let ingestion_client = IngestionClient::new_remote_with_timeout(
296 Url::parse(&server.uri()).unwrap(),
297 Duration::from_secs(2),
298 test_ingestion_metrics(),
299 )
300 .unwrap();
301
302 let checkpoint = ingestion_client.fetch(42).await.unwrap();
304 assert_eq!(42, checkpoint.summary.sequence_number);
305
306 let final_count = times.load(Ordering::Relaxed);
308 assert_eq!(final_count, 2);
309 }
310}