1use std::num::NonZeroUsize;
5use std::sync::Arc;
6use std::time::Duration;
7
8use prometheus::Registry;
9use serde::Deserialize;
10use serde::Serialize;
11use sui_futures::service::Service;
12use tokio::sync::mpsc;
13use tracing::warn;
14
15pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
16use crate::ingestion::broadcaster::broadcaster;
17use crate::ingestion::error::Error;
18use crate::ingestion::error::Result;
19use crate::ingestion::ingestion_client::CheckpointEnvelope;
20use crate::ingestion::ingestion_client::IngestionClient;
21use crate::ingestion::ingestion_client::IngestionClientArgs;
22use crate::ingestion::ingestion_client::retry_transient_with_slow_monitor;
23use crate::ingestion::streaming_client::CheckpointStreamingClient;
24use crate::ingestion::streaming_client::GrpcStreamingClient;
25use crate::ingestion::streaming_client::StreamingClientArgs;
26use crate::metrics::IngestionMetrics;
27
28pub type BoxedStreamingClient = Box<dyn CheckpointStreamingClient + Send + Sync>;
36
37mod broadcaster;
38mod byte_count;
39pub(crate) mod decode;
40pub mod error;
41pub mod ingestion_client;
42mod rpc_client;
43pub mod store_client;
44pub mod streaming_client;
45#[cfg(test)]
46mod test_utils;
47
48pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
49
50#[derive(clap::Args, Clone, Debug, Default)]
53pub struct ClientArgs {
54 #[clap(flatten)]
55 pub ingestion: IngestionClientArgs,
56
57 #[clap(flatten)]
58 pub streaming: StreamingClientArgs,
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub struct IngestionConfig {
63 pub ingest_concurrency: IngestConcurrencyConfig,
67
68 pub retry_interval_ms: u64,
70
71 pub streaming_backoff_initial_batch_size: NonZeroUsize,
73
74 pub streaming_backoff_max_batch_size: usize,
76
77 pub streaming_connection_timeout_ms: u64,
79
80 pub streaming_statement_timeout_ms: u64,
82}
83
84pub struct IngestionService {
85 config: IngestionConfig,
86 ingestion_client: IngestionClient,
87 streaming_client: Option<BoxedStreamingClient>,
88 subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
89 metrics: Arc<IngestionMetrics>,
90}
91
92impl IngestionConfig {
93 pub fn retry_interval(&self) -> Duration {
94 Duration::from_millis(self.retry_interval_ms)
95 }
96
97 pub fn streaming_connection_timeout(&self) -> Duration {
98 Duration::from_millis(self.streaming_connection_timeout_ms)
99 }
100
101 pub fn streaming_statement_timeout(&self) -> Duration {
102 Duration::from_millis(self.streaming_statement_timeout_ms)
103 }
104}
105
106impl IngestionService {
107 pub fn new(
118 args: ClientArgs,
119 config: IngestionConfig,
120 metrics_prefix: Option<&str>,
121 registry: &Registry,
122 ) -> Result<Self> {
123 let metrics = IngestionMetrics::new(metrics_prefix, registry);
124 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
125 let streaming_client: Option<BoxedStreamingClient> =
126 args.streaming.streaming_url.map(|uri| {
127 Box::new(GrpcStreamingClient::new(
128 uri,
129 config.streaming_connection_timeout(),
130 config.streaming_statement_timeout(),
131 )) as BoxedStreamingClient
132 });
133 Ok(Self::from_clients(
134 ingestion_client,
135 streaming_client,
136 config,
137 metrics,
138 ))
139 }
140
141 pub fn with_clients(
156 ingestion_client: IngestionClient,
157 streaming_client: Option<BoxedStreamingClient>,
158 config: IngestionConfig,
159 metrics: Arc<IngestionMetrics>,
160 ) -> Self {
161 Self::from_clients(ingestion_client, streaming_client, config, metrics)
162 }
163
164 fn from_clients(
169 ingestion_client: IngestionClient,
170 streaming_client: Option<BoxedStreamingClient>,
171 config: IngestionConfig,
172 metrics: Arc<IngestionMetrics>,
173 ) -> Self {
174 Self {
175 config,
176 ingestion_client,
177 streaming_client,
178 subscribers: Vec::new(),
179 metrics,
180 }
181 }
182
183 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
185 &self.ingestion_client
186 }
187
188 pub async fn latest_checkpoint_number(&mut self) -> anyhow::Result<u64> {
191 if let Some(streaming_client) = self.streaming_client.as_deref_mut() {
192 match streaming_client.latest_checkpoint_number().await {
193 Ok(checkpoint_number) => return Ok(checkpoint_number),
194 Err(e) => {
195 warn!(
196 operation = "latest_checkpoint_number",
197 "Failed to get latest checkpoint number from streaming client: {e}"
198 );
199 }
200 }
201 }
202
203 let ingestion_client = self.ingestion_client.clone();
204 let future = move || {
205 let ingestion_client = ingestion_client.clone();
206 async move {
207 ingestion_client
208 .latest_checkpoint_number()
209 .await
210 .map_err(|e| backoff::Error::transient(Error::LatestCheckpointError(e)))
211 }
212 };
213
214 Ok(retry_transient_with_slow_monitor(
215 "latest_checkpoint_number",
216 future,
217 &self.metrics.ingested_latest_checkpoint_latency,
218 )
219 .await?)
220 }
221
222 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
224 &self.metrics
225 }
226
227 pub fn config(&self) -> &IngestionConfig {
229 &self.config
230 }
231
232 pub fn subscribe_bounded(&mut self, size: usize) -> mpsc::Receiver<Arc<CheckpointEnvelope>> {
239 let (tx, rx) = mpsc::channel(size);
240 self.subscribers.push(tx);
241 rx
242 }
243
244 pub async fn run<R>(self, checkpoints: R) -> Result<Service>
257 where
258 R: std::ops::RangeBounds<u64> + Send + 'static,
259 {
260 let IngestionService {
261 config,
262 ingestion_client,
263 streaming_client,
264 subscribers,
265 metrics,
266 } = self;
267
268 if subscribers.is_empty() {
269 return Err(Error::NoSubscribers);
270 }
271
272 Ok(broadcaster(
273 checkpoints,
274 streaming_client,
275 config,
276 ingestion_client,
277 subscribers,
278 metrics,
279 ))
280 }
281}
282
283impl Default for IngestionConfig {
284 fn default() -> Self {
285 Self {
286 ingest_concurrency: IngestConcurrencyConfig::Adaptive {
287 initial: 1,
288 min: 1,
289 max: 500,
290 dead_band: None,
291 },
292 retry_interval_ms: 200,
293 streaming_backoff_initial_batch_size: NonZeroUsize::new(10)
294 .expect("default initial streaming backoff is non-zero"), streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use std::sync::Mutex;
305
306 use axum::http::StatusCode;
307 use sui_futures::task::TaskGuard;
308 use url::Url;
309 use wiremock::MockServer;
310 use wiremock::Request;
311
312 use crate::ingestion::ingestion_client::tests::MockIngestionClient;
313 use crate::ingestion::store_client::tests::respond_with;
314 use crate::ingestion::store_client::tests::respond_with_chain_id;
315 use crate::ingestion::store_client::tests::status;
316 use crate::ingestion::streaming_client::test_utils::MockStreamingClient;
317 use crate::ingestion::test_utils::test_checkpoint_data;
318 use crate::metrics::IngestionMetrics;
319
320 use super::*;
321
322 const FALLBACK: u64 = 99;
323
324 fn mock_ingestion_client(latest_checkpoint: u64) -> IngestionClient {
325 let metrics = IngestionMetrics::new(None, &Registry::new());
326 let mock = MockIngestionClient {
327 latest_checkpoint,
328 ..Default::default()
329 };
330 IngestionClient::from_trait(Arc::new(mock), metrics)
331 }
332
333 async fn test_ingestion(uri: String, ingest_concurrency: usize) -> IngestionService {
334 let registry = Registry::new();
335 IngestionService::new(
336 ClientArgs {
337 ingestion: IngestionClientArgs {
338 remote_store_url: Some(Url::parse(&uri).unwrap()),
339 ..Default::default()
340 },
341 ..Default::default()
342 },
343 IngestionConfig {
344 ingest_concurrency: IngestConcurrencyConfig::Fixed {
345 value: ingest_concurrency,
346 },
347 ..Default::default()
348 },
349 None,
350 ®istry,
351 )
352 .unwrap()
353 }
354
355 async fn test_subscriber(
356 stop_after: usize,
357 mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
358 ) -> TaskGuard<Vec<u64>> {
359 TaskGuard::new(tokio::spawn(async move {
360 let mut seqs = vec![];
361 for _ in 0..stop_after {
362 let Some(checkpoint_envelope) = rx.recv().await else {
363 break;
364 };
365
366 seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
367 }
368
369 seqs
370 }))
371 }
372
373 #[cfg(test)]
378 async fn latest_checkpoint_number<S>(
379 streaming_client: Option<&mut S>,
380 ingestion_client: &IngestionClient,
381 ) -> anyhow::Result<u64>
382 where
383 S: CheckpointStreamingClient + Send + ?Sized,
384 {
385 if let Some(streaming_client) = streaming_client {
386 match streaming_client.latest_checkpoint_number().await {
387 Ok(checkpoint_number) => return Ok(checkpoint_number),
388 Err(e) => {
389 warn!(
390 operation = "latest_checkpoint_number",
391 "Failed to get latest checkpoint number from streaming client: {e}"
392 );
393 }
394 }
395 }
396
397 ingestion_client.latest_checkpoint_number().await
398 }
399
400 #[tokio::test]
403 async fn fail_on_no_subscribers() {
404 let server = MockServer::start().await;
407 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
408
409 let ingestion_service = test_ingestion(server.uri(), 1).await;
410
411 let res = ingestion_service.run(0..).await;
412 assert!(matches!(res, Err(Error::NoSubscribers)));
413 }
414
415 #[tokio::test]
418 async fn shutdown() {
419 let server = MockServer::start().await;
420 respond_with(
421 &server,
422 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
423 )
424 .await;
425 respond_with_chain_id(&server).await;
426
427 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
428
429 let rx = ingestion_service.subscribe_bounded(1);
430 let subscriber = test_subscriber(usize::MAX, rx).await;
431 let svc = ingestion_service.run(0..).await.unwrap();
432
433 svc.shutdown().await.unwrap();
434 subscriber.await.unwrap();
435 }
436
437 #[tokio::test]
440 async fn shutdown_on_subscriber_drop() {
441 let server = MockServer::start().await;
442 respond_with(
443 &server,
444 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
445 )
446 .await;
447 respond_with_chain_id(&server).await;
448
449 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
450
451 let rx = ingestion_service.subscribe_bounded(1);
452 let subscriber = test_subscriber(1, rx).await;
453 let mut svc = ingestion_service.run(0..).await.unwrap();
454
455 drop(subscriber);
456 svc.join().await.unwrap();
457 }
458
459 #[tokio::test]
462 async fn retry_on_not_found() {
463 let server = MockServer::start().await;
464 let times: Mutex<u64> = Mutex::new(0);
465 respond_with(&server, move |_: &Request| {
466 let mut times = times.lock().unwrap();
467 *times += 1;
468 match *times {
469 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
470 4..6 => status(StatusCode::NOT_FOUND),
471 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
472 }
473 })
474 .await;
475 respond_with_chain_id(&server).await;
476
477 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
478
479 let rx = ingestion_service.subscribe_bounded(1);
480 let subscriber = test_subscriber(6, rx).await;
481 let _svc = ingestion_service.run(0..).await.unwrap();
482
483 let seqs = subscriber.await.unwrap();
484 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
485 }
486
487 #[tokio::test]
489 async fn retry_on_transient_error() {
490 let server = MockServer::start().await;
491 let times: Mutex<u64> = Mutex::new(0);
492 respond_with(&server, move |_: &Request| {
493 let mut times = times.lock().unwrap();
494 *times += 1;
495 match *times {
496 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
497 4..6 => status(StatusCode::REQUEST_TIMEOUT),
498 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
499 }
500 })
501 .await;
502 respond_with_chain_id(&server).await;
503
504 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
505
506 let rx = ingestion_service.subscribe_bounded(1);
507 let subscriber = test_subscriber(6, rx).await;
508 let _svc = ingestion_service.run(0..).await.unwrap();
509
510 let seqs = subscriber.await.unwrap();
511 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
512 }
513
514 #[tokio::test]
518 async fn back_pressure_and_buffering() {
519 let server = MockServer::start().await;
520 let times: Mutex<u64> = Mutex::new(0);
521 respond_with(&server, move |_: &Request| {
522 let mut times = times.lock().unwrap();
523 *times += 1;
524 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
525 })
526 .await;
527 respond_with_chain_id(&server).await;
528
529 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
530 let size = 3;
531
532 let mut laggard = ingestion_service.subscribe_bounded(size);
534 async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
535 let checkpoint_envelope = laggard.recv().await.unwrap();
536 checkpoint_envelope.checkpoint.summary.sequence_number
537 }
538
539 let rx = ingestion_service.subscribe_bounded(size);
540 let subscriber = test_subscriber(6, rx).await;
541 let _svc = ingestion_service.run(0..).await.unwrap();
542
543 assert_eq!(unblock(&mut laggard).await, 0);
548 assert_eq!(unblock(&mut laggard).await, 1);
549 assert_eq!(unblock(&mut laggard).await, 2);
550
551 let seqs = subscriber.await.unwrap();
552 assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
553 }
554
555 #[tokio::test]
556 async fn latest_checkpoint_number_no_streaming_client() {
557 let client = mock_ingestion_client(FALLBACK);
558 let mut streaming: Option<MockStreamingClient> = None;
559 let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
560 assert_eq!(result.unwrap(), FALLBACK);
561 }
562
563 #[tokio::test]
564 async fn latest_checkpoint_number_from_stream() {
565 let client = mock_ingestion_client(FALLBACK);
566 let mut streaming = Some(MockStreamingClient::new([42], None));
567 let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
568 assert_eq!(result.unwrap(), 42);
569 }
570
571 #[tokio::test]
572 async fn latest_checkpoint_number_stream_error_falls_back() {
573 let client = mock_ingestion_client(FALLBACK);
574 let mut mock = MockStreamingClient::new(std::iter::empty::<u64>(), None);
575 mock.insert_error();
576 let mut streaming = Some(mock);
577 let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
578 assert_eq!(result.unwrap(), FALLBACK);
579 }
580
581 #[tokio::test]
582 async fn latest_checkpoint_number_empty_stream_falls_back() {
583 let client = mock_ingestion_client(FALLBACK);
584 let mut streaming = Some(MockStreamingClient::new(std::iter::empty::<u64>(), None));
585 let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
586 assert_eq!(result.unwrap(), FALLBACK);
587 }
588
589 #[tokio::test]
590 async fn latest_checkpoint_number_connection_failure_falls_back() {
591 let client = mock_ingestion_client(FALLBACK);
592 let mut streaming = Some(
593 MockStreamingClient::new(std::iter::empty::<u64>(), None).fail_connection_times(1),
594 );
595 let result = latest_checkpoint_number(streaming.as_mut(), &client).await;
596 assert_eq!(result.unwrap(), FALLBACK);
597 }
598
599 #[test]
600 fn reject_zero_initial_streaming_backoff_batch_size() {
601 let mut config = serde_json::to_value(IngestionConfig::default()).unwrap();
602 config["streaming_backoff_initial_batch_size"] = serde_json::json!(0);
603
604 let error = serde_json::from_value::<IngestionConfig>(config).unwrap_err();
605 assert!(error.to_string().contains("nonzero"));
606 }
607
608 #[tokio::test]
611 async fn with_clients_uses_supplied_streaming_client() {
612 const STREAM_LATEST: u64 = 42;
613
614 let registry = Registry::new();
615 let metrics = IngestionMetrics::new(None, ®istry);
616 let mut service = IngestionService::with_clients(
617 IngestionClient::from_trait(
618 Arc::new(MockIngestionClient {
619 latest_checkpoint: FALLBACK,
620 ..Default::default()
621 }),
622 metrics.clone(),
623 ),
624 Some(Box::new(MockStreamingClient::new([STREAM_LATEST], None))),
625 IngestionConfig::default(),
626 metrics,
627 );
628
629 let latest = service.latest_checkpoint_number().await.unwrap();
630 assert_eq!(latest, STREAM_LATEST);
631 }
632
633 #[tokio::test]
636 async fn with_clients_falls_back_to_ingestion_when_no_streaming() {
637 let registry = Registry::new();
638 let metrics = IngestionMetrics::new(None, ®istry);
639 let mut service = IngestionService::with_clients(
640 IngestionClient::from_trait(
641 Arc::new(MockIngestionClient {
642 latest_checkpoint: FALLBACK,
643 ..Default::default()
644 }),
645 metrics.clone(),
646 ),
647 None,
648 IngestionConfig::default(),
649 metrics,
650 );
651
652 let latest = service.latest_checkpoint_number().await.unwrap();
653 assert_eq!(latest, FALLBACK);
654 }
655}