sui_indexer_alt_framework/ingestion/
remote_client.rs1use reqwest::{Client, StatusCode};
5use std::time::Duration;
6use tracing::{debug, error};
7use url::Url;
8
9use crate::ingestion::Result as IngestionResult;
10use crate::ingestion::client::{FetchData, FetchError, FetchResult, IngestionClientTrait};
11
12const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
16
17#[derive(thiserror::Error, Debug, Eq, PartialEq)]
18pub enum HttpError {
19 #[error("HTTP error with status code: {0}")]
20 Http(StatusCode),
21}
22
23fn status_code_to_error(code: StatusCode) -> anyhow::Error {
24 HttpError::Http(code).into()
25}
26
27pub struct RemoteIngestionClient {
28 url: Url,
29 client: Client,
30}
31
32impl RemoteIngestionClient {
33 pub fn new(url: Url) -> IngestionResult<Self> {
34 Ok(Self {
35 url,
36 client: Client::builder().timeout(DEFAULT_REQUEST_TIMEOUT).build()?,
37 })
38 }
39
40 #[cfg(test)]
41 pub fn new_with_timeout(url: Url, timeout: Duration) -> IngestionResult<Self> {
42 Ok(Self {
43 url,
44 client: Client::builder().timeout(timeout).build()?,
45 })
46 }
47
48 pub async fn end_of_epoch_checkpoints(&self) -> reqwest::Result<reqwest::Response> {
51 let url = self
53 .url
54 .join("/epochs.json")
55 .expect("Unexpected invalid URL");
56
57 self.client.get(url).send().await
58 }
59
60 pub async fn checkpoint(&self, checkpoint: u64) -> reqwest::Result<reqwest::Response> {
63 let url = self
65 .url
66 .join(&format!("/{checkpoint}.chk"))
67 .expect("Unexpected invalid URL");
68
69 self.client.get(url).send().await
70 }
71}
72
73#[async_trait::async_trait]
74impl IngestionClientTrait for RemoteIngestionClient {
75 async fn fetch(&self, checkpoint: u64) -> FetchResult {
85 let response = self
86 .checkpoint(checkpoint)
87 .await
88 .map_err(|e| FetchError::Transient {
89 reason: "request",
90 error: e.into(),
91 })?;
92
93 match response.status() {
94 code if code.is_success() => {
95 response
100 .bytes()
101 .await
102 .map_err(|e| FetchError::Transient {
103 reason: "bytes",
104 error: e.into(),
105 })
106 .map(FetchData::Raw)
107 }
108
109 code @ StatusCode::NOT_FOUND => {
111 debug!(checkpoint, %code, "Checkpoint not found");
112 Err(FetchError::NotFound)
113 }
114
115 code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
117 reason: "timeout",
118 error: status_code_to_error(code),
119 }),
120
121 code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
124 reason: "too_many_requests",
125 error: status_code_to_error(code),
126 }),
127
128 code if code.is_server_error() => Err(FetchError::Transient {
130 reason: "server_error",
131 error: status_code_to_error(code),
132 }),
133
134 code => Err(FetchError::Transient {
136 reason: "unknown",
137 error: status_code_to_error(code),
138 }),
139 }
140 }
141}
142
143#[cfg(test)]
144pub(crate) mod tests {
145 use super::*;
146 use crate::ingestion::client::IngestionClient;
147 use crate::ingestion::error::Error;
148 use crate::ingestion::test_utils::test_checkpoint_data;
149 use crate::metrics::tests::test_metrics;
150 use axum::http::StatusCode;
151 use std::sync::{
152 Mutex,
153 atomic::{AtomicUsize, Ordering},
154 };
155 use wiremock::{
156 Mock, MockServer, Request, Respond, ResponseTemplate,
157 matchers::{method, path_regex},
158 };
159
160 pub(crate) async fn respond_with(server: &MockServer, response: impl Respond + 'static) {
161 Mock::given(method("GET"))
162 .and(path_regex(r"/\d+.chk"))
163 .respond_with(response)
164 .mount(server)
165 .await;
166 }
167
168 pub(crate) fn status(code: StatusCode) -> ResponseTemplate {
169 ResponseTemplate::new(code.as_u16())
170 }
171
172 fn remote_test_client(uri: String) -> IngestionClient {
173 IngestionClient::new_remote(Url::parse(&uri).unwrap(), test_metrics()).unwrap()
174 }
175
176 #[tokio::test]
177 async fn fail_on_not_found() {
178 let server = MockServer::start().await;
179 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
180
181 let client = remote_test_client(server.uri());
182 let error = client.fetch(42).await.unwrap_err();
183
184 assert!(matches!(error, Error::NotFound(42)));
185 }
186
187 #[tokio::test]
190 async fn retry_on_request_error() {
191 let server = MockServer::start().await;
192
193 let times: Mutex<u64> = Mutex::new(0);
194 respond_with(&server, move |r: &Request| {
195 let mut times = times.lock().unwrap();
196 *times += 1;
197 match (*times, r.url.path()) {
198 (1, _) => status(StatusCode::MOVED_PERMANENTLY).append_header("Location", "/0.chk"),
201
202 (_, "/0.chk") => {
204 status(StatusCode::MOVED_PERMANENTLY).append_header("Location", r.url.as_str())
205 }
206
207 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
209 }
210 })
211 .await;
212
213 let client = remote_test_client(server.uri());
214 let checkpoint = client.fetch(42).await.unwrap();
215
216 assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
217 }
218
219 #[tokio::test]
223 async fn retry_on_transient_server_error() {
224 let server = MockServer::start().await;
225 let times: Mutex<u64> = Mutex::new(0);
226 respond_with(&server, move |_: &Request| {
227 let mut times = times.lock().unwrap();
228 *times += 1;
229 match *times {
230 1 => status(StatusCode::INTERNAL_SERVER_ERROR),
231 2 => status(StatusCode::REQUEST_TIMEOUT),
232 3 => status(StatusCode::TOO_MANY_REQUESTS),
233 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
234 }
235 })
236 .await;
237
238 let client = remote_test_client(server.uri());
239 let checkpoint = client.fetch(42).await.unwrap();
240
241 assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
242 }
243
244 #[tokio::test]
247 async fn retry_on_deserialization_error() {
248 let server = MockServer::start().await;
249 let times: Mutex<u64> = Mutex::new(0);
250 respond_with(&server, move |_: &Request| {
251 let mut times = times.lock().unwrap();
252 *times += 1;
253 if *times < 3 {
254 status(StatusCode::OK).set_body_bytes(vec![])
255 } else {
256 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
257 }
258 })
259 .await;
260
261 let client = remote_test_client(server.uri());
262 let checkpoint = client.fetch(42).await.unwrap();
263
264 assert_eq!(42, checkpoint.checkpoint_summary.sequence_number)
265 }
266
267 #[tokio::test]
270 async fn retry_on_timeout() {
271 use std::sync::Arc;
272
273 let server = MockServer::start().await;
274 let times: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
275 let times_clone = times.clone();
276
277 respond_with(&server, move |_: &Request| {
279 match times_clone.fetch_add(1, Ordering::Relaxed) {
280 0 => {
281 std::thread::sleep(std::time::Duration::from_secs(4));
283 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
284 }
285 _ => {
286 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42))
288 }
289 }
290 })
291 .await;
292
293 let ingestion_client = IngestionClient::new_remote_with_timeout(
295 Url::parse(&server.uri()).unwrap(),
296 Duration::from_secs(2),
297 test_metrics(),
298 )
299 .unwrap();
300
301 let checkpoint = ingestion_client.fetch(42).await.unwrap();
303 assert_eq!(42, checkpoint.checkpoint_summary.sequence_number);
304
305 let final_count = times.load(Ordering::Relaxed);
307 assert_eq!(final_count, 2);
308 }
309}