sui_indexer_alt_framework/ingestion/
mod.rs1#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
19use crate::ingestion::broadcaster::broadcaster;
20use crate::ingestion::error::Error;
21use crate::ingestion::error::Result;
22use crate::ingestion::ingestion_client::IngestionClient;
23use crate::ingestion::ingestion_client::IngestionClientArgs;
24use crate::ingestion::streaming_client::GrpcStreamingClient;
25use crate::ingestion::streaming_client::StreamingClientArgs;
26use crate::metrics::IngestionMetrics;
27use crate::types::full_checkpoint_content::Checkpoint;
28
29mod broadcaster;
30pub(crate) mod decode;
31pub mod error;
32pub mod ingestion_client;
33mod rpc_client;
34pub mod store_client;
35pub mod streaming_client;
36#[cfg(test)]
37mod test_utils;
38
39pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
40
41#[derive(clap::Args, Clone, Debug, Default)]
44pub struct ClientArgs {
45 #[clap(flatten)]
46 pub ingestion: IngestionClientArgs,
47
48 #[clap(flatten)]
49 pub streaming: StreamingClientArgs,
50}
51
52#[derive(Serialize, Deserialize, Debug, Clone)]
53pub struct IngestionConfig {
54 pub checkpoint_buffer_size: usize,
56
57 pub ingest_concurrency: IngestConcurrencyConfig,
61
62 pub retry_interval_ms: u64,
64
65 pub streaming_backoff_initial_batch_size: usize,
67
68 pub streaming_backoff_max_batch_size: usize,
70
71 pub streaming_connection_timeout_ms: u64,
73
74 pub streaming_statement_timeout_ms: u64,
76}
77
78pub struct IngestionService {
79 config: IngestionConfig,
80 ingestion_client: IngestionClient,
81 streaming_client: Option<GrpcStreamingClient>,
82 commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
83 commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
84 subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
85 metrics: Arc<IngestionMetrics>,
86}
87
88impl IngestionConfig {
89 pub fn retry_interval(&self) -> Duration {
90 Duration::from_millis(self.retry_interval_ms)
91 }
92
93 pub fn streaming_connection_timeout(&self) -> Duration {
94 Duration::from_millis(self.streaming_connection_timeout_ms)
95 }
96
97 pub fn streaming_statement_timeout(&self) -> Duration {
98 Duration::from_millis(self.streaming_statement_timeout_ms)
99 }
100}
101
102impl IngestionService {
103 pub fn new(
113 args: ClientArgs,
114 config: IngestionConfig,
115 metrics_prefix: Option<&str>,
116 registry: &Registry,
117 ) -> Result<Self> {
118 let metrics = IngestionMetrics::new(metrics_prefix, registry);
119 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
120 let streaming_client = args
121 .streaming
122 .streaming_url
123 .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
124
125 let subscribers = Vec::new();
126 let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
127 Ok(Self {
128 config,
129 ingestion_client,
130 streaming_client,
131 commit_hi_tx,
132 commit_hi_rx,
133 subscribers,
134 metrics,
135 })
136 }
137
138 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
140 &self.ingestion_client
141 }
142
143 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
145 &self.metrics
146 }
147
148 pub fn subscribe(
158 &mut self,
159 ) -> (
160 mpsc::Receiver<Arc<Checkpoint>>,
161 mpsc::UnboundedSender<(&'static str, u64)>,
162 ) {
163 let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
164 self.subscribers.push(sender);
165 (receiver, self.commit_hi_tx.clone())
166 }
167
168 pub async fn run<R>(
188 self,
189 checkpoints: R,
190 next_sequential_checkpoint: Option<u64>,
191 ) -> Result<Service>
192 where
193 R: std::ops::RangeBounds<u64> + Send + 'static,
194 {
195 let IngestionService {
196 config,
197 ingestion_client,
198 streaming_client,
199 commit_hi_tx: _,
200 commit_hi_rx,
201 subscribers,
202 metrics,
203 } = self;
204
205 if subscribers.is_empty() {
206 return Err(Error::NoSubscribers);
207 }
208
209 Ok(broadcaster(
210 checkpoints,
211 next_sequential_checkpoint,
212 streaming_client,
213 config,
214 ingestion_client,
215 commit_hi_rx,
216 subscribers,
217 metrics,
218 ))
219 }
220}
221
222impl Default for IngestionConfig {
223 fn default() -> Self {
224 Self {
225 checkpoint_buffer_size: 50,
226 ingest_concurrency: IngestConcurrencyConfig::Adaptive {
227 initial: 1,
228 min: 1,
229 max: 500,
230 dead_band: None,
231 },
232 retry_interval_ms: 200,
233 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::sync::Mutex;
244
245 use axum::http::StatusCode;
246 use sui_futures::task::TaskGuard;
247 use url::Url;
248 use wiremock::MockServer;
249 use wiremock::Request;
250
251 use crate::ingestion::store_client::tests::respond_with;
252 use crate::ingestion::store_client::tests::status;
253 use crate::ingestion::test_utils::test_checkpoint_data;
254
255 use super::*;
256
257 async fn test_ingestion(
258 uri: String,
259 checkpoint_buffer_size: usize,
260 ingest_concurrency: usize,
261 ) -> IngestionService {
262 let registry = Registry::new();
263 IngestionService::new(
264 ClientArgs {
265 ingestion: IngestionClientArgs {
266 remote_store_url: Some(Url::parse(&uri).unwrap()),
267 ..Default::default()
268 },
269 ..Default::default()
270 },
271 IngestionConfig {
272 checkpoint_buffer_size,
273 ingest_concurrency: IngestConcurrencyConfig::Fixed {
274 value: ingest_concurrency,
275 },
276 ..Default::default()
277 },
278 None,
279 ®istry,
280 )
281 .unwrap()
282 }
283
284 async fn test_subscriber(
285 stop_after: usize,
286 mut rx: mpsc::Receiver<Arc<Checkpoint>>,
287 ) -> TaskGuard<Vec<u64>> {
288 TaskGuard::new(tokio::spawn(async move {
289 let mut seqs = vec![];
290 for _ in 0..stop_after {
291 let Some(checkpoint) = rx.recv().await else {
292 break;
293 };
294
295 seqs.push(checkpoint.summary.sequence_number);
296 }
297
298 seqs
299 }))
300 }
301
302 #[tokio::test]
305 async fn fail_on_no_subscribers() {
306 let server = MockServer::start().await;
309 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
310
311 let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
312
313 let res = ingestion_service.run(0.., None).await;
314 assert!(matches!(res, Err(Error::NoSubscribers)));
315 }
316
317 #[tokio::test]
320 async fn shutdown() {
321 let server = MockServer::start().await;
322 respond_with(
323 &server,
324 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
325 )
326 .await;
327
328 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
329
330 let (rx, _) = ingestion_service.subscribe();
331 let subscriber = test_subscriber(usize::MAX, rx).await;
332 let svc = ingestion_service.run(0.., None).await.unwrap();
333
334 svc.shutdown().await.unwrap();
335 subscriber.await.unwrap();
336 }
337
338 #[tokio::test]
341 async fn shutdown_on_subscriber_drop() {
342 let server = MockServer::start().await;
343 respond_with(
344 &server,
345 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
346 )
347 .await;
348
349 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
350
351 let (rx, _) = ingestion_service.subscribe();
352 let subscriber = test_subscriber(1, rx).await;
353 let mut svc = ingestion_service.run(0.., None).await.unwrap();
354
355 drop(subscriber);
356 svc.join().await.unwrap();
357 }
358
359 #[tokio::test]
362 async fn retry_on_not_found() {
363 let server = MockServer::start().await;
364 let times: Mutex<u64> = Mutex::new(0);
365 respond_with(&server, move |_: &Request| {
366 let mut times = times.lock().unwrap();
367 *times += 1;
368 match *times {
369 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
370 4..6 => status(StatusCode::NOT_FOUND),
371 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
372 }
373 })
374 .await;
375
376 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
377
378 let (rx, _) = ingestion_service.subscribe();
379 let subscriber = test_subscriber(5, rx).await;
380 let _svc = ingestion_service.run(0.., None).await.unwrap();
381
382 let seqs = subscriber.await.unwrap();
383 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
384 }
385
386 #[tokio::test]
388 async fn retry_on_transient_error() {
389 let server = MockServer::start().await;
390 let times: Mutex<u64> = Mutex::new(0);
391 respond_with(&server, move |_: &Request| {
392 let mut times = times.lock().unwrap();
393 *times += 1;
394 match *times {
395 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
396 4..6 => status(StatusCode::REQUEST_TIMEOUT),
397 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
398 }
399 })
400 .await;
401
402 let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
403
404 let (rx, _) = ingestion_service.subscribe();
405 let subscriber = test_subscriber(5, rx).await;
406 let _svc = ingestion_service.run(0.., None).await.unwrap();
407
408 let seqs = subscriber.await.unwrap();
409 assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
410 }
411
412 #[tokio::test]
416 async fn back_pressure_and_buffering() {
417 let server = MockServer::start().await;
418 let times: Mutex<u64> = Mutex::new(0);
419 respond_with(&server, move |_: &Request| {
420 let mut times = times.lock().unwrap();
421 *times += 1;
422 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
423 })
424 .await;
425
426 let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
427
428 let (mut laggard, _) = ingestion_service.subscribe();
430 async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
431 let checkpoint = laggard.recv().await.unwrap();
432 checkpoint.summary.sequence_number
433 }
434
435 let (rx, _) = ingestion_service.subscribe();
436 let subscriber = test_subscriber(5, rx).await;
437 let _svc = ingestion_service.run(0.., None).await.unwrap();
438
439 assert_eq!(unblock(&mut laggard).await, 1);
444 assert_eq!(unblock(&mut laggard).await, 2);
445
446 let seqs = subscriber.await.unwrap();
447 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
448 }
449}