sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4// Allow use of `unbounded_channel` in `ingestion` -- it is used by the regulator task to receive
5// feedback. Traffic through this task should be minimal, but if a bound is applied to it and that
6// bound is hit, the indexer could deadlock.
7#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
19use crate::ingestion::broadcaster::broadcaster;
20use crate::ingestion::error::Error;
21use crate::ingestion::error::Result;
22use crate::ingestion::ingestion_client::IngestionClient;
23use crate::ingestion::ingestion_client::IngestionClientArgs;
24use crate::ingestion::streaming_client::GrpcStreamingClient;
25use crate::ingestion::streaming_client::StreamingClientArgs;
26use crate::metrics::IngestionMetrics;
27use crate::types::full_checkpoint_content::Checkpoint;
28
29mod broadcaster;
30pub(crate) mod decode;
31pub mod error;
32pub mod ingestion_client;
33mod rpc_client;
34pub mod store_client;
35pub mod streaming_client;
36#[cfg(test)]
37mod test_utils;
38
39pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
40
41/// Combined arguments for both ingestion and streaming clients.
42/// This is a convenience wrapper that flattens both argument types.
43#[derive(clap::Args, Clone, Debug, Default)]
44pub struct ClientArgs {
45    #[clap(flatten)]
46    pub ingestion: IngestionClientArgs,
47
48    #[clap(flatten)]
49    pub streaming: StreamingClientArgs,
50}
51
52#[derive(Serialize, Deserialize, Debug, Clone)]
53pub struct IngestionConfig {
54    /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
55    pub checkpoint_buffer_size: usize,
56
57    /// Concurrency control for checkpoint ingestion. A plain integer gives fixed concurrency;
58    /// an object with `initial`, `min`, and `max` fields enables adaptive concurrency that adjusts
59    /// based on subscriber channel fill fraction.
60    pub ingest_concurrency: IngestConcurrencyConfig,
61
62    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
63    pub retry_interval_ms: u64,
64
65    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
66    pub streaming_backoff_initial_batch_size: usize,
67
68    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
69    pub streaming_backoff_max_batch_size: usize,
70
71    /// Timeout for streaming connection in milliseconds.
72    pub streaming_connection_timeout_ms: u64,
73
74    /// Timeout for streaming statement (peek/next) operations in milliseconds.
75    pub streaming_statement_timeout_ms: u64,
76}
77
78pub struct IngestionService {
79    config: IngestionConfig,
80    ingestion_client: IngestionClient,
81    streaming_client: Option<GrpcStreamingClient>,
82    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
83    commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
84    subscribers: Vec<mpsc::Sender<Arc<Checkpoint>>>,
85    metrics: Arc<IngestionMetrics>,
86}
87
88impl IngestionConfig {
89    pub fn retry_interval(&self) -> Duration {
90        Duration::from_millis(self.retry_interval_ms)
91    }
92
93    pub fn streaming_connection_timeout(&self) -> Duration {
94        Duration::from_millis(self.streaming_connection_timeout_ms)
95    }
96
97    pub fn streaming_statement_timeout(&self) -> Duration {
98        Duration::from_millis(self.streaming_statement_timeout_ms)
99    }
100}
101
102impl IngestionService {
103    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
104    /// disseminating them to subscribers.
105    ///
106    /// - `args` specifies where to fetch checkpoints from.
107    /// - `config` specifies the various sizes and time limits for ingestion.
108    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
109    ///
110    /// After initialization, subscribers can be added using [Self::subscribe], and the service is
111    /// started with [Self::run], given a range of checkpoints to fetch (potentially unbounded).
112    pub fn new(
113        args: ClientArgs,
114        config: IngestionConfig,
115        metrics_prefix: Option<&str>,
116        registry: &Registry,
117    ) -> Result<Self> {
118        let metrics = IngestionMetrics::new(metrics_prefix, registry);
119        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
120        let streaming_client = args
121            .streaming
122            .streaming_url
123            .map(|uri| GrpcStreamingClient::new(uri, config.streaming_connection_timeout()));
124
125        let subscribers = Vec::new();
126        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
127        Ok(Self {
128            config,
129            ingestion_client,
130            streaming_client,
131            commit_hi_tx,
132            commit_hi_rx,
133            subscribers,
134            metrics,
135        })
136    }
137
138    /// The ingestion client this service uses to fetch checkpoints.
139    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
140        &self.ingestion_client
141    }
142
143    /// Access to the ingestion metrics.
144    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
145        &self.metrics
146    }
147
148    /// Add a new subscription to the ingestion service. Note that the service is susceptible to
149    /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the
150    /// checkpoint ingestion rate, it will eventually hold up all receivers.
151    ///
152    /// The ingestion service can optionally receive checkpoint high values from its
153    /// subscribers. If a subscriber provides a commit_hi, the ingestion service will commit to not
154    /// run ahead of the commit_hi by more than the config's buffer_size.
155    ///
156    /// Returns the channel to receive checkpoints from and the channel to send commit_hi values to.
157    pub fn subscribe(
158        &mut self,
159    ) -> (
160        mpsc::Receiver<Arc<Checkpoint>>,
161        mpsc::UnboundedSender<(&'static str, u64)>,
162    ) {
163        let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
164        self.subscribers.push(sender);
165        (receiver, self.commit_hi_tx.clone())
166    }
167
168    /// Start the ingestion service as a background task, consuming it in the process.
169    ///
170    /// Checkpoints are fetched concurrently from the `checkpoints` iterator, and pushed to
171    /// subscribers' channels (potentially out-of-order). Subscribers can communicate with the
172    /// ingestion service via their channels in the following ways:
173    ///
174    /// - If a subscriber is slow to accept checkpoints from the channel, it will provide
175    ///   back-pressure as this channel has a fixed buffer size (configured when the ingestion
176    ///   service is initialized).
177    /// - If the ingestion service is run with `next_sequential_checkpoint` set, subscribers must
178    ///   also update the ingestion service with the latest checkpoint they have completely
179    ///   processed, and the ingestion service will use this to limit how far ahead it fetches
180    ///   checkpoints.
181    /// - If a subscriber closes either of these channels, the ingestion service will interpret
182    ///   that as a signal to shutdown as well.
183    ///
184    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do
185    /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they
186    /// become available.
187    pub async fn run<R>(
188        self,
189        checkpoints: R,
190        next_sequential_checkpoint: Option<u64>,
191    ) -> Result<Service>
192    where
193        R: std::ops::RangeBounds<u64> + Send + 'static,
194    {
195        let IngestionService {
196            config,
197            ingestion_client,
198            streaming_client,
199            commit_hi_tx: _,
200            commit_hi_rx,
201            subscribers,
202            metrics,
203        } = self;
204
205        if subscribers.is_empty() {
206            return Err(Error::NoSubscribers);
207        }
208
209        Ok(broadcaster(
210            checkpoints,
211            next_sequential_checkpoint,
212            streaming_client,
213            config,
214            ingestion_client,
215            commit_hi_rx,
216            subscribers,
217            metrics,
218        ))
219    }
220}
221
222impl Default for IngestionConfig {
223    fn default() -> Self {
224        Self {
225            checkpoint_buffer_size: 50,
226            ingest_concurrency: IngestConcurrencyConfig::Adaptive {
227                initial: 1,
228                min: 1,
229                max: 500,
230                dead_band: None,
231            },
232            retry_interval_ms: 200,
233            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
234            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
235            streaming_connection_timeout_ms: 5000,    // 5 seconds
236            streaming_statement_timeout_ms: 5000,     // 5 seconds
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::sync::Mutex;
244
245    use axum::http::StatusCode;
246    use sui_futures::task::TaskGuard;
247    use url::Url;
248    use wiremock::MockServer;
249    use wiremock::Request;
250
251    use crate::ingestion::store_client::tests::respond_with;
252    use crate::ingestion::store_client::tests::status;
253    use crate::ingestion::test_utils::test_checkpoint_data;
254
255    use super::*;
256
257    async fn test_ingestion(
258        uri: String,
259        checkpoint_buffer_size: usize,
260        ingest_concurrency: usize,
261    ) -> IngestionService {
262        let registry = Registry::new();
263        IngestionService::new(
264            ClientArgs {
265                ingestion: IngestionClientArgs {
266                    remote_store_url: Some(Url::parse(&uri).unwrap()),
267                    ..Default::default()
268                },
269                ..Default::default()
270            },
271            IngestionConfig {
272                checkpoint_buffer_size,
273                ingest_concurrency: IngestConcurrencyConfig::Fixed {
274                    value: ingest_concurrency,
275                },
276                ..Default::default()
277            },
278            None,
279            &registry,
280        )
281        .unwrap()
282    }
283
284    async fn test_subscriber(
285        stop_after: usize,
286        mut rx: mpsc::Receiver<Arc<Checkpoint>>,
287    ) -> TaskGuard<Vec<u64>> {
288        TaskGuard::new(tokio::spawn(async move {
289            let mut seqs = vec![];
290            for _ in 0..stop_after {
291                let Some(checkpoint) = rx.recv().await else {
292                    break;
293                };
294
295                seqs.push(checkpoint.summary.sequence_number);
296            }
297
298            seqs
299        }))
300    }
301
302    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
303    /// checkpoints).
304    #[tokio::test]
305    async fn fail_on_no_subscribers() {
306        // The mock server will repeatedly return 404, so if the service does try to fetch a
307        // checkpoint, it will be stuck repeatedly retrying.
308        let server = MockServer::start().await;
309        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
310
311        let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
312
313        let res = ingestion_service.run(0.., None).await;
314        assert!(matches!(res, Err(Error::NoSubscribers)));
315    }
316
317    /// The subscriber has no effective limit, and the mock server will always return checkpoint
318    /// information, but the ingestion service can still be stopped by shutting it down.
319    #[tokio::test]
320    async fn shutdown() {
321        let server = MockServer::start().await;
322        respond_with(
323            &server,
324            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
325        )
326        .await;
327
328        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
329
330        let (rx, _) = ingestion_service.subscribe();
331        let subscriber = test_subscriber(usize::MAX, rx).await;
332        let svc = ingestion_service.run(0.., None).await.unwrap();
333
334        svc.shutdown().await.unwrap();
335        subscriber.await.unwrap();
336    }
337
338    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
339    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
340    #[tokio::test]
341    async fn shutdown_on_subscriber_drop() {
342        let server = MockServer::start().await;
343        respond_with(
344            &server,
345            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
346        )
347        .await;
348
349        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
350
351        let (rx, _) = ingestion_service.subscribe();
352        let subscriber = test_subscriber(1, rx).await;
353        let mut svc = ingestion_service.run(0.., None).await.unwrap();
354
355        drop(subscriber);
356        svc.join().await.unwrap();
357    }
358
359    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
360    /// checkpoint will return 404 a couple of times, before eventually succeeding.
361    #[tokio::test]
362    async fn retry_on_not_found() {
363        let server = MockServer::start().await;
364        let times: Mutex<u64> = Mutex::new(0);
365        respond_with(&server, move |_: &Request| {
366            let mut times = times.lock().unwrap();
367            *times += 1;
368            match *times {
369                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
370                4..6 => status(StatusCode::NOT_FOUND),
371                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
372            }
373        })
374        .await;
375
376        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
377
378        let (rx, _) = ingestion_service.subscribe();
379        let subscriber = test_subscriber(5, rx).await;
380        let _svc = ingestion_service.run(0.., None).await.unwrap();
381
382        let seqs = subscriber.await.unwrap();
383        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
384    }
385
386    /// Similar to the previous test, but now it's a transient error that causes the retry.
387    #[tokio::test]
388    async fn retry_on_transient_error() {
389        let server = MockServer::start().await;
390        let times: Mutex<u64> = Mutex::new(0);
391        respond_with(&server, move |_: &Request| {
392            let mut times = times.lock().unwrap();
393            *times += 1;
394            match *times {
395                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
396                4..6 => status(StatusCode::REQUEST_TIMEOUT),
397                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
398            }
399        })
400        .await;
401
402        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
403
404        let (rx, _) = ingestion_service.subscribe();
405        let subscriber = test_subscriber(5, rx).await;
406        let _svc = ingestion_service.run(0.., None).await.unwrap();
407
408        let seqs = subscriber.await.unwrap();
409        assert_eq!(seqs, vec![1, 2, 3, 6, 7]);
410    }
411
412    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
413    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
414    /// can keep processing checkpoints that were buffered for the slow one.
415    #[tokio::test]
416    async fn back_pressure_and_buffering() {
417        let server = MockServer::start().await;
418        let times: Mutex<u64> = Mutex::new(0);
419        respond_with(&server, move |_: &Request| {
420            let mut times = times.lock().unwrap();
421            *times += 1;
422            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
423        })
424        .await;
425
426        let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
427
428        // This subscriber will take its sweet time processing checkpoints.
429        let (mut laggard, _) = ingestion_service.subscribe();
430        async fn unblock(laggard: &mut mpsc::Receiver<Arc<Checkpoint>>) -> u64 {
431            let checkpoint = laggard.recv().await.unwrap();
432            checkpoint.summary.sequence_number
433        }
434
435        let (rx, _) = ingestion_service.subscribe();
436        let subscriber = test_subscriber(5, rx).await;
437        let _svc = ingestion_service.run(0.., None).await.unwrap();
438
439        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
440        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
441        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
442        // subscriber to receive its quota.
443        assert_eq!(unblock(&mut laggard).await, 1);
444        assert_eq!(unblock(&mut laggard).await, 2);
445
446        let seqs = subscriber.await.unwrap();
447        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
448    }
449}