sui_indexer_alt_framework/ingestion/
mod.rs1#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18use crate::ingestion::broadcaster::broadcaster;
19use crate::ingestion::error::Error;
20use crate::ingestion::error::Result;
21use crate::ingestion::ingestion_client::IngestionClient;
22use crate::ingestion::ingestion_client::IngestionClientArgs;
23use crate::ingestion::streaming_client::GrpcStreamingClient;
24use crate::ingestion::streaming_client::StreamingClientArgs;
25use crate::metrics::IngestionMetrics;
26use crate::types::full_checkpoint_content::Checkpoint;
27
28mod broadcaster;
29pub mod error;
30pub mod ingestion_client;
31mod local_client;
32pub mod remote_client;
33mod rpc_client;
34pub mod streaming_client;
35#[cfg(test)]
36mod test_utils;
37
38pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
39
40#[derive(clap::Args, Clone, Debug, Default)]
43pub struct ClientArgs {
44 #[clap(flatten)]
45 pub ingestion: IngestionClientArgs,
46
47 #[clap(flatten)]
48 pub streaming: StreamingClientArgs,
49}
50
51#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct IngestionConfig {
53 pub checkpoint_buffer_size: usize,
55
56 pub ingest_concurrency: usize,
58
59 pub retry_interval_ms: u64,
61
62 pub streaming_backoff_initial_batch_size: usize,
64
65 pub streaming_backoff_max_batch_size: usize,
67
68 pub streaming_connection_timeout_ms: u64,
70
71 pub streaming_statement_timeout_ms: u64,
73}
74
75pub struct IngestionService {
76 config: IngestionConfig,
77 ingestion_client: IngestionClient,
78 streaming_client: Option<GrpcStreamingClient>,
79 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
80 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
81 subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
82 metrics: Arc<IngestionMetrics>,
83}
84
85impl IngestionConfig {
86 pub fn retry_interval(&self) -> Duration {
87 Duration::from_millis(self.retry_interval_ms)
88 }
89
90 pub fn streaming_connection_timeout(&self) -> Duration {
91 Duration::from_millis(self.streaming_connection_timeout_ms)
92 }
93
94 pub fn streaming_statement_timeout(&self) -> Duration {
95 Duration::from_millis(self.streaming_statement_timeout_ms)
96 }
97}
98
99impl IngestionService {
100 pub fn new(
110 args: ClientArgs,
111 config: IngestionConfig,
112 metrics_prefix: Option<&str>,
113 registry: &Registry,
114 ) -> Result<Self> {
115 let metrics = IngestionMetrics::new(metrics_prefix, registry);
116 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
117 let streaming_client = args
118 .streaming
119 .streaming_url
120 .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
121
122 let subscribers = Vec::new();
123 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
124 Ok(Self {
125 config,
126 ingestion_client,
127 streaming_client,
128 commit_hi_tx,
129 commit_hi_rx,
130 subscribers,
131 metrics,
132 })
133 }
134
135 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
137 &self.ingestion_client
138 }
139
140 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
142 &self.metrics
143 }
144
145 pub fn subscribe(
155 &mut self,
156 ) -> (
157 mpsc::Receiver<Arc<Checkpoint>>,
158 mpsc::UnboundedSender<(&'static str, u64)>,
159 ) {
160 let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
161 self.subscribers.push(sender);
162 (receiver, self.commit_hi_tx.clone())
163 }
164
165 pub async fn run<R>(self, checkpoints: R, initial_commit_hi: Option<u64>) -> Result<Service>
180 where
181 R: std::ops::RangeBounds<u64> + Send + 'static,
182 {
183 let IngestionService {
184 config,
185 ingestion_client,
186 streaming_client,
187 commit_hi_tx: _,
188 commit_hi_rx,
189 subscribers,
190 metrics,
191 } = self;
192
193 if subscribers.is_empty() {
194 return Err(Error::NoSubscribers);
195 }
196
197 Ok(broadcaster(
198 checkpoints,
199 initial_commit_hi,
200 streaming_client,
201 config,
202 ingestion_client,
203 commit_hi_rx,
204 subscribers,
205 metrics,
206 ))
207 }
208}
209
210impl Default for IngestionConfig {
211 fn default() -> Self {
212 Self {
213 checkpoint_buffer_size: 5000,
214 ingest_concurrency: 200,
215 retry_interval_ms: 200,
216 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use std::sync::Mutex;
227
228 use reqwest::StatusCode;
229 use sui_futures::task::TaskGuard;
230 use url::Url;
231 use wiremock::MockServer;
232 use wiremock::Request;
233
234 use crate::ingestion::remote_client::tests::respond_with;
235 use crate::ingestion::remote_client::tests::status;
236 use crate::ingestion::test_utils::test_checkpoint_data;
237
238 use super::*;
239
240 async fn test_ingestion(
241 uri: String,
242 checkpoint_buffer_size: usize,
243 ingest_concurrency: usize,
244 ) -> IngestionService {
245 let registry = Registry::new();
246 IngestionService::new(
247 ClientArgs {
248 ingestion: IngestionClientArgs {
249 remote_store_url: Some(Url::parse(&uri).unwrap()),
250 ..Default::default()
251 },
252 ..Default::default()
253 },
254 IngestionConfig {
255 checkpoint_buffer_size,
256 ingest_concurrency,
257 ..Default::default()
258 },
259 None,
260 ®istry,
261 )
262 .unwrap()
263 }
264
265 async fn test_subscriber(
266 stop_after: usize,
267 mut rx: mpsc::Receiver<Arc<Checkpoint>>,
268 ) -> TaskGuard<Vec<u64>> {
269 TaskGuard::new(tokio::spawn(async move {
270 let mut seqs = vec![];
271 for _ in 0..stop_after {
272 let Some(checkpoint) = rx.recv().await else {
273 break;
274 };
275
276 seqs.push(checkpoint.summary.sequence_number);
277 }
278
279 seqs
280 }))
281 }
282
283 #[tokio::test]
286 async fn fail_on_no_subscribers() {
287 let server = MockServer::start().await;
290 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
291
292 let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
293
294 let res = ingestion_service.run(0.., None).await;
295 assert!(matches!(res, Err(Error::NoSubscribers)));
296 }
297
298 #[tokio::test]
301 async fn shutdown() {
302 let server = MockServer::start().await;
303 respond_with(
304 &server,
305 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
306 )
307 .await;
308
309 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
310
311 let (rx, _) = ingestion_service.subscribe();
312 let subscriber = test_subscriber(usize::MAX, rx).await;
313 let svc = ingestion_service.run(0.., None).await.unwrap();
314
315 svc.shutdown().await.unwrap();
316 subscriber.await.unwrap();
317 }
318
319 #[tokio::test]
322 async fn shutdown_on_subscriber_drop() {
323 let server = MockServer::start().await;
324 respond_with(
325 &server,
326 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
327 )
328 .await;
329
330 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
331
332 let (rx, _) = ingestion_service.subscribe();
333 let subscriber = test_subscriber(1, rx).await;
334 let mut svc = ingestion_service.run(0.., None).await.unwrap();
335
336 drop(subscriber);
337 svc.join().await.unwrap();
338 }
339
340 #[tokio::test]
343 async fn retry_on_not_found() {
344 let server = MockServer::start().await;
345 let times: Mutex<u64> = Mutex::new(0);
346 respond_with(&server, move |_: &Request| {
347 let mut times = times.lock().unwrap();
348 *times += 1;
349 match *times {
350 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
351 4..6 => status(StatusCode::NOT_FOUND),
352 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
353 }
354 })
355 .await;
356
357 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
358
359 let (rx, _) = ingestion_service.subscribe();
360 let subscriber = test_subscriber(5, rx).await;
361 let _svc = ingestion_service.run(0.., None).await.unwrap();
362
363 let seqs = subscriber.await.unwrap();
364 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
365 }
366
367 #[tokio::test]
369 async fn retry_on_transient_error() {
370 let server = MockServer::start().await;
371 let times: Mutex<u64> = Mutex::new(0);
372 respond_with(&server, move |_: &Request| {
373 let mut times = times.lock().unwrap();
374 *times += 1;
375 match *times {
376 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
377 4..6 => status(StatusCode::REQUEST_TIMEOUT),
378 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
379 }
380 })
381 .await;
382
383 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
384
385 let (rx, _) = ingestion_service.subscribe();
386 let subscriber = test_subscriber(5, rx).await;
387 let _svc = ingestion_service.run(0.., None).await.unwrap();
388
389 let seqs = subscriber.await.unwrap();
390 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
391 }
392
393 #[tokio::test]
397 async fn back_pressure_and_buffering() {
398 let server = MockServer::start().await;
399 let times: Mutex<u64> = Mutex::new(0);
400 respond_with(&server, move |_: &Request| {
401 let mut times = times.lock().unwrap();
402 *times += 1;
403 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
404 })
405 .await;
406
407 let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
408
409 let (mut laggard, _) = ingestion_service.subscribe();
411 async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
412 let checkpoint = laggard.recv().await.unwrap();
413 checkpoint.summary.sequence_number
414 }
415
416 let (rx, _) = ingestion_service.subscribe();
417 let subscriber = test_subscriber(5, rx).await;
418 let _svc = ingestion_service.run(0.., None).await.unwrap();
419
420 assert_eq!(unblock(&mut laggard).await, 1);
425 assert_eq!(unblock(&mut laggard).await, 2);
426
427 let seqs = subscriber.await.unwrap();
428 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
429 }
430}