sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use prometheus::Registry;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::mpsc;
12use tracing::warn;
13
14pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
15use crate::ingestion::broadcaster::broadcaster;
16use crate::ingestion::error::Error;
17use crate::ingestion::error::Result;
18use crate::ingestion::ingestion_client::CheckpointEnvelope;
19use crate::ingestion::ingestion_client::IngestionClient;
20use crate::ingestion::ingestion_client::IngestionClientArgs;
21use crate::ingestion::ingestion_client::retry_transient_with_slow_monitor;
22use crate::ingestion::streaming_client::CheckpointStreamingClient;
23use crate::ingestion::streaming_client::GrpcStreamingClient;
24use crate::ingestion::streaming_client::StreamingClientArgs;
25use crate::metrics::IngestionMetrics;
26
27mod broadcaster;
28mod byte_count;
29pub(crate) mod decode;
30pub mod error;
31pub mod ingestion_client;
32mod rpc_client;
33pub mod store_client;
34pub mod streaming_client;
35#[cfg(test)]
36mod test_utils;
37
38pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
39
40/// Combined arguments for both ingestion and streaming clients.
41/// This is a convenience wrapper that flattens both argument types.
42#[derive(clap::Args, Clone, Debug, Default)]
43pub struct ClientArgs {
44    #[clap(flatten)]
45    pub ingestion: IngestionClientArgs,
46
47    #[clap(flatten)]
48    pub streaming: StreamingClientArgs,
49}
50
51#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct IngestionConfig {
53    /// Concurrency control for checkpoint ingestion. A plain integer gives fixed concurrency;
54    /// an object with `initial`, `min`, and `max` fields enables adaptive concurrency that adjusts
55    /// based on subscriber channel fill fraction.
56    pub ingest_concurrency: IngestConcurrencyConfig,
57
58    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
59    pub retry_interval_ms: u64,
60
61    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
62    pub streaming_backoff_initial_batch_size: usize,
63
64    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
65    pub streaming_backoff_max_batch_size: usize,
66
67    /// Timeout for streaming connection in milliseconds.
68    pub streaming_connection_timeout_ms: u64,
69
70    /// Timeout for streaming statement (peek/next) operations in milliseconds.
71    pub streaming_statement_timeout_ms: u64,
72}
73
74pub struct IngestionService {
75    config: IngestionConfig,
76    ingestion_client: IngestionClient,
77    streaming_client: Option<GrpcStreamingClient>,
78    subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
79    metrics: Arc<IngestionMetrics>,
80}
81
82impl IngestionConfig {
83    pub fn retry_interval(&self) -> Duration {
84        Duration::from_millis(self.retry_interval_ms)
85    }
86
87    pub fn streaming_connection_timeout(&self) -> Duration {
88        Duration::from_millis(self.streaming_connection_timeout_ms)
89    }
90
91    pub fn streaming_statement_timeout(&self) -> Duration {
92        Duration::from_millis(self.streaming_statement_timeout_ms)
93    }
94}
95
96impl IngestionService {
97    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
98    /// disseminating them to subscribers.
99    ///
100    /// - `args` specifies where to fetch checkpoints from.
101    /// - `config` specifies the various sizes and time limits for ingestion.
102    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
103    ///
104    /// After initialization, subscribers can be added using [Self::subscribe_bounded], and the
105    /// service is started with [Self::run], given a range of checkpoints to fetch (potentially
106    /// unbounded).
107    pub fn new(
108        args: ClientArgs,
109        config: IngestionConfig,
110        metrics_prefix: Option<&str>,
111        registry: &Registry,
112    ) -> Result<Self> {
113        let metrics = IngestionMetrics::new(metrics_prefix, registry);
114        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
115        let streaming_client = args.streaming.streaming_url.map(|uri| {
116            GrpcStreamingClient::new(
117                uri,
118                config.streaming_connection_timeout(),
119                config.streaming_statement_timeout(),
120            )
121        });
122
123        Ok(Self {
124            config,
125            ingestion_client,
126            streaming_client,
127            subscribers: Vec::new(),
128            metrics,
129        })
130    }
131
132    /// The ingestion client this service uses to fetch checkpoints.
133    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
134        &self.ingestion_client
135    }
136
137    /// Return the latest checkpoint number known to the ingestion service, preferably via the
138    /// streaming client, and failing that via the ingestion client.
139    pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
140        let streaming_client = self.streaming_client.clone();
141        let ingestion_client = self.ingestion_client.clone();
142        let future = move || {
143            let mut streaming_client = streaming_client.clone();
144            let ingestion_client = ingestion_client.clone();
145            async move {
146                latest_checkpoint_number(&mut streaming_client, &ingestion_client)
147                    .await
148                    .map_err(|e| backoff::Error::transient(Error::LatestCheckpointError(e)))
149            }
150        };
151
152        Ok(retry_transient_with_slow_monitor(
153            "latest_checkpoint_number",
154            future,
155            &self.metrics.ingested_latest_checkpoint_latency,
156        )
157        .await?)
158    }
159
160    /// Access to the ingestion metrics.
161    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
162        &self.metrics
163    }
164
165    /// The ingestion configuration this service was built with.
166    pub fn config(&self) -> &IngestionConfig {
167        &self.config
168    }
169
170    /// Add a new subscription backed by a bounded `mpsc` channel of the given capacity. The
171    /// channel itself is the backpressure signal: when this consumer falls behind, the channel
172    /// fills and the adaptive ingestion controller cuts fetch concurrency. Send blocks when the
173    /// channel is full.
174    ///
175    /// Callers typically pass `pipeline::IngestionConfig::subscriber_channel_size()`.
176    pub fn subscribe_bounded(&mut self, size: usize) -> mpsc::Receiver<Arc<CheckpointEnvelope>> {
177        let (tx, rx) = mpsc::channel(size);
178        self.subscribers.push(tx);
179        rx
180    }
181
182    /// Start the ingestion service as a background task, consuming it in the process.
183    ///
184    /// Checkpoints are fetched concurrently from the `checkpoints` iterator and pushed to
185    /// subscribers' channels (potentially out-of-order). Each subscriber's bounded channel
186    /// acts as the backpressure signal: when it fills, the adaptive ingestion controller
187    /// throttles fetch concurrency. The slowest subscriber gates ingestion for everyone.
188    ///
189    /// If a subscriber closes its channel, the ingestion service shuts down as well.
190    ///
191    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints
192    /// that do not exist yet. These are retried on a fixed `retry_interval` until they become
193    /// available.
194    pub async fn run<R>(self, checkpoints: R) -> Result<Service>
195    where
196        R: std::ops::RangeBounds<u64> + Send + 'static,
197    {
198        let IngestionService {
199            config,
200            ingestion_client,
201            streaming_client,
202            subscribers,
203            metrics,
204        } = self;
205
206        if subscribers.is_empty() {
207            return Err(Error::NoSubscribers);
208        }
209
210        Ok(broadcaster(
211            checkpoints,
212            streaming_client,
213            config,
214            ingestion_client,
215            subscribers,
216            metrics,
217        ))
218    }
219}
220
221impl Default for IngestionConfig {
222    fn default() -> Self {
223        Self {
224            ingest_concurrency: IngestConcurrencyConfig::Adaptive {
225                initial: 1,
226                min: 1,
227                max: 500,
228                dead_band: None,
229            },
230            retry_interval_ms: 200,
231            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
232            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
233            streaming_connection_timeout_ms: 5000,    // 5 seconds
234            streaming_statement_timeout_ms: 5000,     // 5 seconds
235        }
236    }
237}
238
239async fn latest_checkpoint_number(
240    streaming_client: &mut Option<impl CheckpointStreamingClient + Send>,
241    ingestion_client: &IngestionClient,
242) -> anyhow::Result<u64> {
243    if let Some(streaming_client) = streaming_client.as_mut() {
244        match streaming_client.latest_checkpoint_number().await {
245            Ok(checkpoint_number) => return Ok(checkpoint_number),
246            Err(e) => {
247                warn!(
248                    operation = "latest_checkpoint_number",
249                    "Failed to get latest checkpoint number from streaming client: {e}"
250                );
251            }
252        }
253    }
254
255    ingestion_client.latest_checkpoint_number().await
256}
257
258#[cfg(test)]
259mod tests {
260    use std::sync::Mutex;
261
262    use axum::http::StatusCode;
263    use sui_futures::task::TaskGuard;
264    use url::Url;
265    use wiremock::MockServer;
266    use wiremock::Request;
267
268    use crate::ingestion::ingestion_client::tests::MockIngestionClient;
269    use crate::ingestion::store_client::tests::respond_with;
270    use crate::ingestion::store_client::tests::respond_with_chain_id;
271    use crate::ingestion::store_client::tests::status;
272    use crate::ingestion::streaming_client::test_utils::MockStreamingClient;
273    use crate::ingestion::test_utils::test_checkpoint_data;
274    use crate::metrics::IngestionMetrics;
275
276    use super::*;
277
278    const FALLBACK: u64 = 99;
279
280    fn mock_ingestion_client(latest_checkpoint: u64) -> IngestionClient {
281        let metrics = IngestionMetrics::new(None, &Registry::new());
282        let mock = MockIngestionClient {
283            latest_checkpoint,
284            ..Default::default()
285        };
286        IngestionClient::new_impl(Arc::new(mock), metrics)
287    }
288
289    async fn test_ingestion(uri: String, ingest_concurrency: usize) -> IngestionService {
290        let registry = Registry::new();
291        IngestionService::new(
292            ClientArgs {
293                ingestion: IngestionClientArgs {
294                    remote_store_url: Some(Url::parse(&uri).unwrap()),
295                    ..Default::default()
296                },
297                ..Default::default()
298            },
299            IngestionConfig {
300                ingest_concurrency: IngestConcurrencyConfig::Fixed {
301                    value: ingest_concurrency,
302                },
303                ..Default::default()
304            },
305            None,
306            &registry,
307        )
308        .unwrap()
309    }
310
311    async fn test_subscriber(
312        stop_after: usize,
313        mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
314    ) -> TaskGuard<Vec<u64>> {
315        TaskGuard::new(tokio::spawn(async move {
316            let mut seqs = vec![];
317            for _ in 0..stop_after {
318                let Some(checkpoint_envelope) = rx.recv().await else {
319                    break;
320                };
321
322                seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
323            }
324
325            seqs
326        }))
327    }
328
329    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
330    /// checkpoints).
331    #[tokio::test]
332    async fn fail_on_no_subscribers() {
333        // The mock server will repeatedly return 404, so if the service does try to fetch a
334        // checkpoint, it will be stuck repeatedly retrying.
335        let server = MockServer::start().await;
336        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
337
338        let ingestion_service = test_ingestion(server.uri(), 1).await;
339
340        let res = ingestion_service.run(0..).await;
341        assert!(matches!(res, Err(Error::NoSubscribers)));
342    }
343
344    /// The subscriber has no effective limit, and the mock server will always return checkpoint
345    /// information, but the ingestion service can still be stopped by shutting it down.
346    #[tokio::test]
347    async fn shutdown() {
348        let server = MockServer::start().await;
349        respond_with(
350            &server,
351            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
352        )
353        .await;
354        respond_with_chain_id(&server).await;
355
356        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
357
358        let rx = ingestion_service.subscribe_bounded(1);
359        let subscriber = test_subscriber(usize::MAX, rx).await;
360        let svc = ingestion_service.run(0..).await.unwrap();
361
362        svc.shutdown().await.unwrap();
363        subscriber.await.unwrap();
364    }
365
366    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
367    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
368    #[tokio::test]
369    async fn shutdown_on_subscriber_drop() {
370        let server = MockServer::start().await;
371        respond_with(
372            &server,
373            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
374        )
375        .await;
376        respond_with_chain_id(&server).await;
377
378        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
379
380        let rx = ingestion_service.subscribe_bounded(1);
381        let subscriber = test_subscriber(1, rx).await;
382        let mut svc = ingestion_service.run(0..).await.unwrap();
383
384        drop(subscriber);
385        svc.join().await.unwrap();
386    }
387
388    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
389    /// checkpoint will return 404 a couple of times, before eventually succeeding.
390    #[tokio::test]
391    async fn retry_on_not_found() {
392        let server = MockServer::start().await;
393        let times: Mutex<u64> = Mutex::new(0);
394        respond_with(&server, move |_: &Request| {
395            let mut times = times.lock().unwrap();
396            *times += 1;
397            match *times {
398                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
399                4..6 => status(StatusCode::NOT_FOUND),
400                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
401            }
402        })
403        .await;
404        respond_with_chain_id(&server).await;
405
406        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
407
408        let rx = ingestion_service.subscribe_bounded(1);
409        let subscriber = test_subscriber(6, rx).await;
410        let _svc = ingestion_service.run(0..).await.unwrap();
411
412        let seqs = subscriber.await.unwrap();
413        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
414    }
415
416    /// Similar to the previous test, but now it's a transient error that causes the retry.
417    #[tokio::test]
418    async fn retry_on_transient_error() {
419        let server = MockServer::start().await;
420        let times: Mutex<u64> = Mutex::new(0);
421        respond_with(&server, move |_: &Request| {
422            let mut times = times.lock().unwrap();
423            *times += 1;
424            match *times {
425                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
426                4..6 => status(StatusCode::REQUEST_TIMEOUT),
427                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
428            }
429        })
430        .await;
431        respond_with_chain_id(&server).await;
432
433        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
434
435        let rx = ingestion_service.subscribe_bounded(1);
436        let subscriber = test_subscriber(6, rx).await;
437        let _svc = ingestion_service.run(0..).await.unwrap();
438
439        let seqs = subscriber.await.unwrap();
440        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
441    }
442
443    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
444    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
445    /// can keep processing checkpoints that were buffered for the slow one.
446    #[tokio::test]
447    async fn back_pressure_and_buffering() {
448        let server = MockServer::start().await;
449        let times: Mutex<u64> = Mutex::new(0);
450        respond_with(&server, move |_: &Request| {
451            let mut times = times.lock().unwrap();
452            *times += 1;
453            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
454        })
455        .await;
456        respond_with_chain_id(&server).await;
457
458        let mut ingestion_service = test_ingestion(server.uri(), 1).await;
459        let size = 3;
460
461        // This subscriber will take its sweet time processing checkpoints.
462        let mut laggard = ingestion_service.subscribe_bounded(size);
463        async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
464            let checkpoint_envelope = laggard.recv().await.unwrap();
465            checkpoint_envelope.checkpoint.summary.sequence_number
466        }
467
468        let rx = ingestion_service.subscribe_bounded(size);
469        let subscriber = test_subscriber(6, rx).await;
470        let _svc = ingestion_service.run(0..).await.unwrap();
471
472        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
473        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
474        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
475        // subscriber to receive its quota. Checkpoint 0 is served by the chain_id mock.
476        assert_eq!(unblock(&mut laggard).await, 0);
477        assert_eq!(unblock(&mut laggard).await, 1);
478        assert_eq!(unblock(&mut laggard).await, 2);
479
480        let seqs = subscriber.await.unwrap();
481        assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
482    }
483
484    #[tokio::test]
485    async fn latest_checkpoint_number_no_streaming_client() {
486        let client = mock_ingestion_client(FALLBACK);
487        let mut streaming: Option<MockStreamingClient> = None;
488        let result = latest_checkpoint_number(&mut streaming, &client).await;
489        assert_eq!(result.unwrap(), FALLBACK);
490    }
491
492    #[tokio::test]
493    async fn latest_checkpoint_number_from_stream() {
494        let client = mock_ingestion_client(FALLBACK);
495        let mut streaming = Some(MockStreamingClient::new([42], None));
496        let result = latest_checkpoint_number(&mut streaming, &client).await;
497        assert_eq!(result.unwrap(), 42);
498    }
499
500    #[tokio::test]
501    async fn latest_checkpoint_number_stream_error_falls_back() {
502        let client = mock_ingestion_client(FALLBACK);
503        let mut mock = MockStreamingClient::new(std::iter::empty::<u64>(), None);
504        mock.insert_error();
505        let mut streaming = Some(mock);
506        let result = latest_checkpoint_number(&mut streaming, &client).await;
507        assert_eq!(result.unwrap(), FALLBACK);
508    }
509
510    #[tokio::test]
511    async fn latest_checkpoint_number_empty_stream_falls_back() {
512        let client = mock_ingestion_client(FALLBACK);
513        let mut streaming = Some(MockStreamingClient::new(std::iter::empty::<u64>(), None));
514        let result = latest_checkpoint_number(&mut streaming, &client).await;
515        assert_eq!(result.unwrap(), FALLBACK);
516    }
517
518    #[tokio::test]
519    async fn latest_checkpoint_number_connection_failure_falls_back() {
520        let client = mock_ingestion_client(FALLBACK);
521        let mut streaming = Some(
522            MockStreamingClient::new(std::iter::empty::<u64>(), None).fail_connection_times(1),
523        );
524        let result = latest_checkpoint_number(&mut streaming, &client).await;
525        assert_eq!(result.unwrap(), FALLBACK);
526    }
527}