sui_indexer_alt_framework/ingestion/
mod.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use prometheus::Registry;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::mpsc;
12use tracing::warn;
13
14pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
15use crate::ingestion::broadcaster::broadcaster;
16use crate::ingestion::error::Error;
17use crate::ingestion::error::Result;
18use crate::ingestion::ingestion_client::CheckpointEnvelope;
19use crate::ingestion::ingestion_client::IngestionClient;
20use crate::ingestion::ingestion_client::IngestionClientArgs;
21use crate::ingestion::ingestion_client::retry_transient_with_slow_monitor;
22use crate::ingestion::streaming_client::CheckpointStreamingClient;
23use crate::ingestion::streaming_client::GrpcStreamingClient;
24use crate::ingestion::streaming_client::StreamingClientArgs;
25use crate::metrics::IngestionMetrics;
26
27mod broadcaster;
28mod byte_count;
29pub(crate) mod decode;
30pub mod error;
31pub mod ingestion_client;
32mod rpc_client;
33pub mod store_client;
34pub mod streaming_client;
35#[cfg(test)]
36mod test_utils;
37
38pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
39
40#[derive(clap::Args, Clone, Debug, Default)]
43pub struct ClientArgs {
44 #[clap(flatten)]
45 pub ingestion: IngestionClientArgs,
46
47 #[clap(flatten)]
48 pub streaming: StreamingClientArgs,
49}
50
51#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct IngestionConfig {
53 pub ingest_concurrency: IngestConcurrencyConfig,
57
58 pub retry_interval_ms: u64,
60
61 pub streaming_backoff_initial_batch_size: usize,
63
64 pub streaming_backoff_max_batch_size: usize,
66
67 pub streaming_connection_timeout_ms: u64,
69
70 pub streaming_statement_timeout_ms: u64,
72}
73
74pub struct IngestionService {
75 config: IngestionConfig,
76 ingestion_client: IngestionClient,
77 streaming_client: Option<GrpcStreamingClient>,
78 subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
79 metrics: Arc<IngestionMetrics>,
80}
81
82impl IngestionConfig {
83 pub fn retry_interval(&self) -> Duration {
84 Duration::from_millis(self.retry_interval_ms)
85 }
86
87 pub fn streaming_connection_timeout(&self) -> Duration {
88 Duration::from_millis(self.streaming_connection_timeout_ms)
89 }
90
91 pub fn streaming_statement_timeout(&self) -> Duration {
92 Duration::from_millis(self.streaming_statement_timeout_ms)
93 }
94}
95
96impl IngestionService {
97 pub fn new(
108 args: ClientArgs,
109 config: IngestionConfig,
110 metrics_prefix: Option<&str>,
111 registry: &Registry,
112 ) -> Result<Self> {
113 let metrics = IngestionMetrics::new(metrics_prefix, registry);
114 let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
115 let streaming_client = args.streaming.streaming_url.map(|uri| {
116 GrpcStreamingClient::new(
117 uri,
118 config.streaming_connection_timeout(),
119 config.streaming_statement_timeout(),
120 )
121 });
122
123 Ok(Self {
124 config,
125 ingestion_client,
126 streaming_client,
127 subscribers: Vec::new(),
128 metrics,
129 })
130 }
131
132 pub(crate) fn ingestion_client(&self) -> &IngestionClient {
134 &self.ingestion_client
135 }
136
137 pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
140 let streaming_client = self.streaming_client.clone();
141 let ingestion_client = self.ingestion_client.clone();
142 let future = move || {
143 let mut streaming_client = streaming_client.clone();
144 let ingestion_client = ingestion_client.clone();
145 async move {
146 latest_checkpoint_number(&mut streaming_client, &ingestion_client)
147 .await
148 .map_err(|e| backoff::Error::transient(Error::LatestCheckpointError(e)))
149 }
150 };
151
152 Ok(retry_transient_with_slow_monitor(
153 "latest_checkpoint_number",
154 future,
155 &self.metrics.ingested_latest_checkpoint_latency,
156 )
157 .await?)
158 }
159
160 pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
162 &self.metrics
163 }
164
165 pub fn config(&self) -> &IngestionConfig {
167 &self.config
168 }
169
170 pub fn subscribe_bounded(&mut self, size: usize) -> mpsc::Receiver<Arc<CheckpointEnvelope>> {
177 let (tx, rx) = mpsc::channel(size);
178 self.subscribers.push(tx);
179 rx
180 }
181
182 pub async fn run<R>(self, checkpoints: R) -> Result<Service>
195 where
196 R: std::ops::RangeBounds<u64> + Send + 'static,
197 {
198 let IngestionService {
199 config,
200 ingestion_client,
201 streaming_client,
202 subscribers,
203 metrics,
204 } = self;
205
206 if subscribers.is_empty() {
207 return Err(Error::NoSubscribers);
208 }
209
210 Ok(broadcaster(
211 checkpoints,
212 streaming_client,
213 config,
214 ingestion_client,
215 subscribers,
216 metrics,
217 ))
218 }
219}
220
221impl Default for IngestionConfig {
222 fn default() -> Self {
223 Self {
224 ingest_concurrency: IngestConcurrencyConfig::Adaptive {
225 initial: 1,
226 min: 1,
227 max: 500,
228 dead_band: None,
229 },
230 retry_interval_ms: 200,
231 streaming_backoff_initial_batch_size: 10, streaming_backoff_max_batch_size: 10000, streaming_connection_timeout_ms: 5000, streaming_statement_timeout_ms: 5000, }
236 }
237}
238
239async fn latest_checkpoint_number(
240 streaming_client: &mut Option<impl CheckpointStreamingClient + Send>,
241 ingestion_client: &IngestionClient,
242) -> anyhow::Result<u64> {
243 if let Some(streaming_client) = streaming_client.as_mut() {
244 match streaming_client.latest_checkpoint_number().await {
245 Ok(checkpoint_number) => return Ok(checkpoint_number),
246 Err(e) => {
247 warn!(
248 operation = "latest_checkpoint_number",
249 "Failed to get latest checkpoint number from streaming client: {e}"
250 );
251 }
252 }
253 }
254
255 ingestion_client.latest_checkpoint_number().await
256}
257
258#[cfg(test)]
259mod tests {
260 use std::sync::Mutex;
261
262 use axum::http::StatusCode;
263 use sui_futures::task::TaskGuard;
264 use url::Url;
265 use wiremock::MockServer;
266 use wiremock::Request;
267
268 use crate::ingestion::ingestion_client::tests::MockIngestionClient;
269 use crate::ingestion::store_client::tests::respond_with;
270 use crate::ingestion::store_client::tests::respond_with_chain_id;
271 use crate::ingestion::store_client::tests::status;
272 use crate::ingestion::streaming_client::test_utils::MockStreamingClient;
273 use crate::ingestion::test_utils::test_checkpoint_data;
274 use crate::metrics::IngestionMetrics;
275
276 use super::*;
277
278 const FALLBACK: u64 = 99;
279
280 fn mock_ingestion_client(latest_checkpoint: u64) -> IngestionClient {
281 let metrics = IngestionMetrics::new(None, &Registry::new());
282 let mock = MockIngestionClient {
283 latest_checkpoint,
284 ..Default::default()
285 };
286 IngestionClient::new_impl(Arc::new(mock), metrics)
287 }
288
289 async fn test_ingestion(uri: String, ingest_concurrency: usize) -> IngestionService {
290 let registry = Registry::new();
291 IngestionService::new(
292 ClientArgs {
293 ingestion: IngestionClientArgs {
294 remote_store_url: Some(Url::parse(&uri).unwrap()),
295 ..Default::default()
296 },
297 ..Default::default()
298 },
299 IngestionConfig {
300 ingest_concurrency: IngestConcurrencyConfig::Fixed {
301 value: ingest_concurrency,
302 },
303 ..Default::default()
304 },
305 None,
306 ®istry,
307 )
308 .unwrap()
309 }
310
311 async fn test_subscriber(
312 stop_after: usize,
313 mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
314 ) -> TaskGuard<Vec<u64>> {
315 TaskGuard::new(tokio::spawn(async move {
316 let mut seqs = vec![];
317 for _ in 0..stop_after {
318 let Some(checkpoint_envelope) = rx.recv().await else {
319 break;
320 };
321
322 seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
323 }
324
325 seqs
326 }))
327 }
328
329 #[tokio::test]
332 async fn fail_on_no_subscribers() {
333 let server = MockServer::start().await;
336 respond_with(&server, status(StatusCode::NOT_FOUND)).await;
337
338 let ingestion_service = test_ingestion(server.uri(), 1).await;
339
340 let res = ingestion_service.run(0..).await;
341 assert!(matches!(res, Err(Error::NoSubscribers)));
342 }
343
344 #[tokio::test]
347 async fn shutdown() {
348 let server = MockServer::start().await;
349 respond_with(
350 &server,
351 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
352 )
353 .await;
354 respond_with_chain_id(&server).await;
355
356 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
357
358 let rx = ingestion_service.subscribe_bounded(1);
359 let subscriber = test_subscriber(usize::MAX, rx).await;
360 let svc = ingestion_service.run(0..).await.unwrap();
361
362 svc.shutdown().await.unwrap();
363 subscriber.await.unwrap();
364 }
365
366 #[tokio::test]
369 async fn shutdown_on_subscriber_drop() {
370 let server = MockServer::start().await;
371 respond_with(
372 &server,
373 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
374 )
375 .await;
376 respond_with_chain_id(&server).await;
377
378 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
379
380 let rx = ingestion_service.subscribe_bounded(1);
381 let subscriber = test_subscriber(1, rx).await;
382 let mut svc = ingestion_service.run(0..).await.unwrap();
383
384 drop(subscriber);
385 svc.join().await.unwrap();
386 }
387
388 #[tokio::test]
391 async fn retry_on_not_found() {
392 let server = MockServer::start().await;
393 let times: Mutex<u64> = Mutex::new(0);
394 respond_with(&server, move |_: &Request| {
395 let mut times = times.lock().unwrap();
396 *times += 1;
397 match *times {
398 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
399 4..6 => status(StatusCode::NOT_FOUND),
400 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
401 }
402 })
403 .await;
404 respond_with_chain_id(&server).await;
405
406 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
407
408 let rx = ingestion_service.subscribe_bounded(1);
409 let subscriber = test_subscriber(6, rx).await;
410 let _svc = ingestion_service.run(0..).await.unwrap();
411
412 let seqs = subscriber.await.unwrap();
413 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
414 }
415
416 #[tokio::test]
418 async fn retry_on_transient_error() {
419 let server = MockServer::start().await;
420 let times: Mutex<u64> = Mutex::new(0);
421 respond_with(&server, move |_: &Request| {
422 let mut times = times.lock().unwrap();
423 *times += 1;
424 match *times {
425 1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
426 4..6 => status(StatusCode::REQUEST_TIMEOUT),
427 _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
428 }
429 })
430 .await;
431 respond_with_chain_id(&server).await;
432
433 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
434
435 let rx = ingestion_service.subscribe_bounded(1);
436 let subscriber = test_subscriber(6, rx).await;
437 let _svc = ingestion_service.run(0..).await.unwrap();
438
439 let seqs = subscriber.await.unwrap();
440 assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
441 }
442
443 #[tokio::test]
447 async fn back_pressure_and_buffering() {
448 let server = MockServer::start().await;
449 let times: Mutex<u64> = Mutex::new(0);
450 respond_with(&server, move |_: &Request| {
451 let mut times = times.lock().unwrap();
452 *times += 1;
453 status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
454 })
455 .await;
456 respond_with_chain_id(&server).await;
457
458 let mut ingestion_service = test_ingestion(server.uri(), 1).await;
459 let size = 3;
460
461 let mut laggard = ingestion_service.subscribe_bounded(size);
463 async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
464 let checkpoint_envelope = laggard.recv().await.unwrap();
465 checkpoint_envelope.checkpoint.summary.sequence_number
466 }
467
468 let rx = ingestion_service.subscribe_bounded(size);
469 let subscriber = test_subscriber(6, rx).await;
470 let _svc = ingestion_service.run(0..).await.unwrap();
471
472 assert_eq!(unblock(&mut laggard).await, 0);
477 assert_eq!(unblock(&mut laggard).await, 1);
478 assert_eq!(unblock(&mut laggard).await, 2);
479
480 let seqs = subscriber.await.unwrap();
481 assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
482 }
483
484 #[tokio::test]
485 async fn latest_checkpoint_number_no_streaming_client() {
486 let client = mock_ingestion_client(FALLBACK);
487 let mut streaming: Option<MockStreamingClient> = None;
488 let result = latest_checkpoint_number(&mut streaming, &client).await;
489 assert_eq!(result.unwrap(), FALLBACK);
490 }
491
492 #[tokio::test]
493 async fn latest_checkpoint_number_from_stream() {
494 let client = mock_ingestion_client(FALLBACK);
495 let mut streaming = Some(MockStreamingClient::new([42], None));
496 let result = latest_checkpoint_number(&mut streaming, &client).await;
497 assert_eq!(result.unwrap(), 42);
498 }
499
500 #[tokio::test]
501 async fn latest_checkpoint_number_stream_error_falls_back() {
502 let client = mock_ingestion_client(FALLBACK);
503 let mut mock = MockStreamingClient::new(std::iter::empty::<u64>(), None);
504 mock.insert_error();
505 let mut streaming = Some(mock);
506 let result = latest_checkpoint_number(&mut streaming, &client).await;
507 assert_eq!(result.unwrap(), FALLBACK);
508 }
509
510 #[tokio::test]
511 async fn latest_checkpoint_number_empty_stream_falls_back() {
512 let client = mock_ingestion_client(FALLBACK);
513 let mut streaming = Some(MockStreamingClient::new(std::iter::empty::<u64>(), None));
514 let result = latest_checkpoint_number(&mut streaming, &client).await;
515 assert_eq!(result.unwrap(), FALLBACK);
516 }
517
518 #[tokio::test]
519 async fn latest_checkpoint_number_connection_failure_falls_back() {
520 let client = mock_ingestion_client(FALLBACK);
521 let mut streaming = Some(
522 MockStreamingClient::new(std::iter::empty::<u64>(), None).fail_connection_times(1),
523 );
524 let result = latest_checkpoint_number(&mut streaming, &client).await;
525 assert_eq!(result.unwrap(), FALLBACK);
526 }
527}