sui_indexer_alt_framework/ingestion/
mod.rs1#![allow(clippy::disallowed_methods)]
8
9use std::{sync::Arc, time::Duration};
10
11use prometheus::Registry;
12use serde::{Deserialize, Serialize};
13use sui_futures::service::Service;
14use tokio::sync::mpsc;
15
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::{Error, Result};
18use crate::ingestion::ingestion_client::{IngestionClient, IngestionClientArgs};
19use crate::ingestion::streaming_client::{GrpcStreamingClient, StreamingClientArgs};
20use crate::metrics::IngestionMetrics;
21use crate::types::full_checkpoint_content::Checkpoint;
22
23mod broadcaster;
24pub mod error;
25pub mod ingestion_client;
26mod local_client;
27pub mod remote_client;
28mod rpc_client;
29pub mod streaming_client;
30#[cfg(test)]
31mod test_utils;
32
33pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
34
35#[derive(clap::Args, Clone, Debug, Default)]
38pub struct ClientArgs {
39 #[clap(flatten)]
40 pub ingestion: IngestionClientArgs,
41
42 #[clap(flatten)]
43 pub streaming: StreamingClientArgs,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47pub struct IngestionConfig {
48 pub checkpoint_buffer_size: usize,
50
51 pub ingest_concurrency: usize,
53
54 pub retry_interval_ms: u64,
56
57 pub streaming_backoff_initial_batch_size: usize,
59
60 pub streaming_backoff_max_batch_size: usize,
62
63 pub streaming_connection_timeout_ms: u64,
65
66 pub streaming_statement_timeout_ms: u64,
68}
69
70pub struct IngestionService {
71 config: IngestionConfig,
72 ingestion_client: IngestionClient,
73 streaming_client: Option<GrpcStreamingClient>,
74 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
75 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
76 subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
77 metrics: Arc<IngestionMetrics>,
78}
79
80impl IngestionConfig {
81 pub fn retry_interval(&self) -> Duration {
82 Duration::from_millis(self.retry_interval_ms)
83 }
84
85 pub fn streaming_connection_timeout(&self) -> Duration {
86 Duration::from_millis(self.streaming_connection_timeout_ms)
87 }
88
89 pub fn streaming_statement_timeout(&self) -> Duration {
90 Duration::from_millis(self.streaming_statement_timeout_ms)
91 }
92}
93
94impl IngestionService {
95 pub fn new(
105 args: ClientArgs,
106 config: IngestionConfig,
107 metrics_prefix: Option<&str>,
108 registry: &Registry,
109 ) -> Result<Self> {
110 let metrics = IngestionMetrics::new(metrics_prefix, registry);
111 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
112 let streaming_client = args
113 .streaming
114 .streaming_url
115 .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
116
117 let subscribers = Vec::new();
118 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
119 Ok(Self {
120 config,
121 ingestion_client,
122 streaming_client,
123 commit_hi_tx,
124 commit_hi_rx,
125 subscribers,
126 metrics,
127 })
128 }
129
130 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
132 &self.ingestion_client
133 }
134
135 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
137 &self.metrics
138 }
139
140 pub fn subscribe(
150 &mut self,
151 ) -> (
152 mpsc::Receiver<Arc<Checkpoint>>,
153 mpsc::UnboundedSender<(&'static str, u64)>,
154 ) {
155 let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
156 self.subscribers.push(sender);
157 (receiver, self.commit_hi_tx.clone())
158 }
159
160 pub async fn run<R>(self, checkpoints: R, initial_commit_hi: Option<u64>) -> Result<Service>
175 where
176 R: std::ops::RangeBounds<u64> + Send + 'static,
177 {
178 let IngestionService {
179 config,
180 ingestion_client,
181 streaming_client,
182 commit_hi_tx: _,
183 commit_hi_rx,
184 subscribers,
185 metrics,
186 } = self;
187
188 if subscribers.is_empty() {
189 return Err(Error::NoSubscribers);
190 }
191
192 Ok(broadcaster(
193 checkpoints,
194 initial_commit_hi,
195 streaming_client,
196 config,
197 ingestion_client,
198 commit_hi_rx,
199 subscribers,
200 metrics,
201 ))
202 }
203}
204
205impl Default for IngestionConfig {
206 fn default() -> Self {
207 Self {
208 checkpoint_buffer_size: 5000,
209 ingest_concurrency: 200,
210 retry_interval_ms: 200,
211 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::sync::Mutex;
222
223 use reqwest::StatusCode;
224 use sui_futures::task::TaskGuard;
225 use url::Url;
226 use wiremock::{MockServer, Request};
227
228 use crate::ingestion::remote_client::tests::{respond_with, status};
229 use crate::ingestion::test_utils::test_checkpoint_data;
230
231 use super::*;
232
233 async fn test_ingestion(
234 uri: String,
235 checkpoint_buffer_size: usize,
236 ingest_concurrency: usize,
237 ) -> IngestionService {
238 let registry = Registry::new();
239 IngestionService::new(
240 ClientArgs {
241 ingestion: IngestionClientArgs {
242 remote_store_url: Some(Url::parse(&uri).unwrap()),
243 ..Default::default()
244 },
245 ..Default::default()
246 },
247 IngestionConfig {
248 checkpoint_buffer_size,
249 ingest_concurrency,
250 ..Default::default()
251 },
252 None,
253 ®istry,
254 )
255 .unwrap()
256 }
257
258 async fn test_subscriber(
259 stop_after: usize,
260 mut rx: mpsc::Receiver<Arc<Checkpoint>>,
261 ) -> TaskGuard<Vec<u64>> {
262 TaskGuard::new(tokio::spawn(async move {
263 let mut seqs = vec![];
264 for _ in 0..stop_after {
265 let Some(checkpoint) = rx.recv().await else {
266 break;
267 };
268
269 seqs.push(checkpoint.summary.sequence_number);
270 }
271
272 seqs
273 }))
274 }
275
276 #[tokio::test]
279 async fn fail_on_no_subscribers() {
280 let server = MockServer::start().await;
283 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
284
285 let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
286
287 let res = ingestion_service.run(0.., None).await;
288 assert!(matches!(res, Err(Error::NoSubscribers)));
289 }
290
291 #[tokio::test]
294 async fn shutdown() {
295 let server = MockServer::start().await;
296 respond_with(
297 &server,
298 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
299 )
300 .await;
301
302 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
303
304 let (rx, _) = ingestion_service.subscribe();
305 let subscriber = test_subscriber(usize::MAX, rx).await;
306 let svc = ingestion_service.run(0.., None).await.unwrap();
307
308 svc.shutdown().await.unwrap();
309 subscriber.await.unwrap();
310 }
311
312 #[tokio::test]
315 async fn shutdown_on_subscriber_drop() {
316 let server = MockServer::start().await;
317 respond_with(
318 &server,
319 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
320 )
321 .await;
322
323 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
324
325 let (rx, _) = ingestion_service.subscribe();
326 let subscriber = test_subscriber(1, rx).await;
327 let mut svc = ingestion_service.run(0.., None).await.unwrap();
328
329 drop(subscriber);
330 svc.join().await.unwrap();
331 }
332
333 #[tokio::test]
336 async fn retry_on_not_found() {
337 let server = MockServer::start().await;
338 let times: Mutex<u64> = Mutex::new(0);
339 respond_with(&server, move |_: &Request| {
340 let mut times = times.lock().unwrap();
341 *times += 1;
342 match *times {
343 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
344 4..6 => status(StatusCode::NOT_FOUND),
345 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
346 }
347 })
348 .await;
349
350 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
351
352 let (rx, _) = ingestion_service.subscribe();
353 let subscriber = test_subscriber(5, rx).await;
354 let _svc = ingestion_service.run(0.., None).await.unwrap();
355
356 let seqs = subscriber.await.unwrap();
357 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
358 }
359
360 #[tokio::test]
362 async fn retry_on_transient_error() {
363 let server = MockServer::start().await;
364 let times: Mutex<u64> = Mutex::new(0);
365 respond_with(&server, move |_: &Request| {
366 let mut times = times.lock().unwrap();
367 *times += 1;
368 match *times {
369 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
370 4..6 => status(StatusCode::REQUEST_TIMEOUT),
371 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
372 }
373 })
374 .await;
375
376 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
377
378 let (rx, _) = ingestion_service.subscribe();
379 let subscriber = test_subscriber(5, rx).await;
380 let _svc = ingestion_service.run(0.., None).await.unwrap();
381
382 let seqs = subscriber.await.unwrap();
383 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
384 }
385
386 #[tokio::test]
390 async fn back_pressure_and_buffering() {
391 let server = MockServer::start().await;
392 let times: Mutex<u64> = Mutex::new(0);
393 respond_with(&server, move |_: &Request| {
394 let mut times = times.lock().unwrap();
395 *times += 1;
396 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
397 })
398 .await;
399
400 let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
401
402 let (mut laggard, _) = ingestion_service.subscribe();
404 async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
405 let checkpoint = laggard.recv().await.unwrap();
406 checkpoint.summary.sequence_number
407 }
408
409 let (rx, _) = ingestion_service.subscribe();
410 let subscriber = test_subscriber(5, rx).await;
411 let _svc = ingestion_service.run(0.., None).await.unwrap();
412
413 assert_eq!(unblock(&mut laggard).await, 1);
418 assert_eq!(unblock(&mut laggard).await, 2);
419
420 let seqs = subscriber.await.unwrap();
421 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
422 }
423}