sui_indexer_alt_framework/ingestion/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4// Allow use of `unbounded_channel` in `ingestion` -- it is used by the regulator task to receive
5// feedback. Traffic through this task should be minimal, but if a bound is applied to it and that
6// bound is hit, the indexer could deadlock.
7#![allow(clippy::disallowed_methods)]
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use prometheus::Registry;
13use serde::Deserialize;
14use serde::Serialize;
15use sui_futures::service::Service;
16use tokio::sync::mpsc;
17
18pub use crate::config::ConcurrencyConfig as IngestConcurrencyConfig;
19use crate::ingestion::broadcaster::broadcaster;
20use crate::ingestion::error::Error;
21use crate::ingestion::error::Result;
22use crate::ingestion::ingestion_client::CheckpointEnvelope;
23use crate::ingestion::ingestion_client::IngestionClient;
24use crate::ingestion::ingestion_client::IngestionClientArgs;
25use crate::ingestion::streaming_client::GrpcStreamingClient;
26use crate::ingestion::streaming_client::StreamingClientArgs;
27use crate::metrics::IngestionMetrics;
28
29mod broadcaster;
30mod byte_count;
31pub(crate) mod decode;
32pub mod error;
33pub mod ingestion_client;
34mod rpc_client;
35pub mod store_client;
36pub mod streaming_client;
37#[cfg(test)]
38mod test_utils;
39
40pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;
41
42/// Combined arguments for both ingestion and streaming clients.
43/// This is a convenience wrapper that flattens both argument types.
44#[derive(clap::Args, Clone, Debug, Default)]
45pub struct ClientArgs {
46    #[clap(flatten)]
47    pub ingestion: IngestionClientArgs,
48
49    #[clap(flatten)]
50    pub streaming: StreamingClientArgs,
51}
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct IngestionConfig {
55    /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
56    pub checkpoint_buffer_size: usize,
57
58    /// Concurrency control for checkpoint ingestion. A plain integer gives fixed concurrency;
59    /// an object with `initial`, `min`, and `max` fields enables adaptive concurrency that adjusts
60    /// based on subscriber channel fill fraction.
61    pub ingest_concurrency: IngestConcurrencyConfig,
62
63    /// Polling interval to retry fetching checkpoints that do not exist, in milliseconds.
64    pub retry_interval_ms: u64,
65
66    /// Initial number of checkpoints to process using ingestion after a streaming connection failure.
67    pub streaming_backoff_initial_batch_size: usize,
68
69    /// Maximum number of checkpoints to process using ingestion after repeated streaming connection failures.
70    pub streaming_backoff_max_batch_size: usize,
71
72    /// Timeout for streaming connection in milliseconds.
73    pub streaming_connection_timeout_ms: u64,
74
75    /// Timeout for streaming statement (peek/next) operations in milliseconds.
76    pub streaming_statement_timeout_ms: u64,
77}
78
79pub struct IngestionService {
80    config: IngestionConfig,
81    ingestion_client: IngestionClient,
82    streaming_client: Option<GrpcStreamingClient>,
83    commit_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>,
84    commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
85    subscribers: Vec<mpsc::Sender<Arc<CheckpointEnvelope>>>,
86    metrics: Arc<IngestionMetrics>,
87}
88
89impl IngestionConfig {
90    pub fn retry_interval(&self) -> Duration {
91        Duration::from_millis(self.retry_interval_ms)
92    }
93
94    pub fn streaming_connection_timeout(&self) -> Duration {
95        Duration::from_millis(self.streaming_connection_timeout_ms)
96    }
97
98    pub fn streaming_statement_timeout(&self) -> Duration {
99        Duration::from_millis(self.streaming_statement_timeout_ms)
100    }
101}
102
103impl IngestionService {
104    /// Create a new instance of the ingestion service, responsible for fetching checkpoints and
105    /// disseminating them to subscribers.
106    ///
107    /// - `args` specifies where to fetch checkpoints from.
108    /// - `config` specifies the various sizes and time limits for ingestion.
109    /// - `metrics_prefix` and `registry` are used to set up metrics for the service.
110    ///
111    /// After initialization, subscribers can be added using [Self::subscribe], and the service is
112    /// started with [Self::run], given a range of checkpoints to fetch (potentially unbounded).
113    pub fn new(
114        args: ClientArgs,
115        config: IngestionConfig,
116        metrics_prefix: Option<&str>,
117        registry: &Registry,
118    ) -> Result<Self> {
119        let metrics = IngestionMetrics::new(metrics_prefix, registry);
120        let ingestion_client = IngestionClient::new(args.ingestion, metrics.clone())?;
121        let streaming_client = args.streaming.streaming_url.map(|uri| {
122            GrpcStreamingClient::new(
123                uri,
124                config.streaming_connection_timeout(),
125                config.streaming_statement_timeout(),
126            )
127        });
128
129        let subscribers = Vec::new();
130        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
131        Ok(Self {
132            config,
133            ingestion_client,
134            streaming_client,
135            commit_hi_tx,
136            commit_hi_rx,
137            subscribers,
138            metrics,
139        })
140    }
141
142    /// The ingestion client this service uses to fetch checkpoints.
143    pub(crate) fn ingestion_client(&self) -> &IngestionClient {
144        &self.ingestion_client
145    }
146
147    /// Access to the ingestion metrics.
148    pub(crate) fn metrics(&self) -> &Arc<IngestionMetrics> {
149        &self.metrics
150    }
151
152    /// Add a new subscription to the ingestion service. Note that the service is susceptible to
153    /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the
154    /// checkpoint ingestion rate, it will eventually hold up all receivers.
155    ///
156    /// The ingestion service can optionally receive checkpoint high values from its
157    /// subscribers. If a subscriber provides a commit_hi, the ingestion service will commit to not
158    /// run ahead of the commit_hi by more than the config's buffer_size.
159    ///
160    /// Returns the channel to receive checkpoints from and the channel to send commit_hi values to.
161    pub fn subscribe(
162        &mut self,
163    ) -> (
164        mpsc::Receiver<Arc<CheckpointEnvelope>>,
165        mpsc::UnboundedSender<(&'static str, u64)>,
166    ) {
167        let (sender, receiver) = mpsc::channel(self.config.checkpoint_buffer_size);
168        self.subscribers.push(sender);
169        (receiver, self.commit_hi_tx.clone())
170    }
171
172    /// Start the ingestion service as a background task, consuming it in the process.
173    ///
174    /// Checkpoints are fetched concurrently from the `checkpoints` iterator, and pushed to
175    /// subscribers' channels (potentially out-of-order). Subscribers can communicate with the
176    /// ingestion service via their channels in the following ways:
177    ///
178    /// - If a subscriber is slow to accept checkpoints from the channel, it will provide
179    ///   back-pressure as this channel has a fixed buffer size (configured when the ingestion
180    ///   service is initialized).
181    /// - If the ingestion service is run with `next_sequential_checkpoint` set, subscribers must
182    ///   also update the ingestion service with the latest checkpoint they have completely
183    ///   processed, and the ingestion service will use this to limit how far ahead it fetches
184    ///   checkpoints.
185    /// - If a subscriber closes either of these channels, the ingestion service will interpret
186    ///   that as a signal to shutdown as well.
187    ///
188    /// If ingestion reaches the leading edge of the network, it will encounter checkpoints that do
189    /// not exist yet. These will be retried repeatedly on a fixed `retry_interval` until they
190    /// become available.
191    pub async fn run<R>(
192        self,
193        checkpoints: R,
194        next_sequential_checkpoint: Option<u64>,
195    ) -> Result<Service>
196    where
197        R: std::ops::RangeBounds<u64> + Send + 'static,
198    {
199        let IngestionService {
200            config,
201            ingestion_client,
202            streaming_client,
203            commit_hi_tx: _,
204            commit_hi_rx,
205            subscribers,
206            metrics,
207        } = self;
208
209        if subscribers.is_empty() {
210            return Err(Error::NoSubscribers);
211        }
212
213        Ok(broadcaster(
214            checkpoints,
215            next_sequential_checkpoint,
216            streaming_client,
217            config,
218            ingestion_client,
219            commit_hi_rx,
220            subscribers,
221            metrics,
222        ))
223    }
224}
225
226impl Default for IngestionConfig {
227    fn default() -> Self {
228        Self {
229            checkpoint_buffer_size: 50,
230            ingest_concurrency: IngestConcurrencyConfig::Adaptive {
231                initial: 1,
232                min: 1,
233                max: 500,
234                dead_band: None,
235            },
236            retry_interval_ms: 200,
237            streaming_backoff_initial_batch_size: 10, // 10 checkpoints, ~ 2 seconds
238            streaming_backoff_max_batch_size: 10000,  // 10000 checkpoints, ~ 40 minutes
239            streaming_connection_timeout_ms: 5000,    // 5 seconds
240            streaming_statement_timeout_ms: 5000,     // 5 seconds
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Mutex;
248
249    use axum::http::StatusCode;
250    use sui_futures::task::TaskGuard;
251    use url::Url;
252    use wiremock::MockServer;
253    use wiremock::Request;
254
255    use crate::ingestion::store_client::tests::respond_with;
256    use crate::ingestion::store_client::tests::respond_with_chain_id;
257    use crate::ingestion::store_client::tests::status;
258    use crate::ingestion::test_utils::test_checkpoint_data;
259
260    use super::*;
261
262    async fn test_ingestion(
263        uri: String,
264        checkpoint_buffer_size: usize,
265        ingest_concurrency: usize,
266    ) -> IngestionService {
267        let registry = Registry::new();
268        IngestionService::new(
269            ClientArgs {
270                ingestion: IngestionClientArgs {
271                    remote_store_url: Some(Url::parse(&uri).unwrap()),
272                    ..Default::default()
273                },
274                ..Default::default()
275            },
276            IngestionConfig {
277                checkpoint_buffer_size,
278                ingest_concurrency: IngestConcurrencyConfig::Fixed {
279                    value: ingest_concurrency,
280                },
281                ..Default::default()
282            },
283            None,
284            &registry,
285        )
286        .unwrap()
287    }
288
289    async fn test_subscriber(
290        stop_after: usize,
291        mut rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
292    ) -> TaskGuard<Vec<u64>> {
293        TaskGuard::new(tokio::spawn(async move {
294            let mut seqs = vec![];
295            for _ in 0..stop_after {
296                let Some(checkpoint_envelope) = rx.recv().await else {
297                    break;
298                };
299
300                seqs.push(checkpoint_envelope.checkpoint.summary.sequence_number);
301            }
302
303            seqs
304        }))
305    }
306
307    /// If the ingestion service has no subscribers, it will fail fast (before fetching any
308    /// checkpoints).
309    #[tokio::test]
310    async fn fail_on_no_subscribers() {
311        // The mock server will repeatedly return 404, so if the service does try to fetch a
312        // checkpoint, it will be stuck repeatedly retrying.
313        let server = MockServer::start().await;
314        respond_with(&server, status(StatusCode::NOT_FOUND)).await;
315
316        let ingestion_service = test_ingestion(server.uri(), 1, 1).await;
317
318        let res = ingestion_service.run(0.., None).await;
319        assert!(matches!(res, Err(Error::NoSubscribers)));
320    }
321
322    /// The subscriber has no effective limit, and the mock server will always return checkpoint
323    /// information, but the ingestion service can still be stopped by shutting it down.
324    #[tokio::test]
325    async fn shutdown() {
326        let server = MockServer::start().await;
327        respond_with(
328            &server,
329            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
330        )
331        .await;
332        respond_with_chain_id(&server).await;
333
334        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
335
336        let (rx, _) = ingestion_service.subscribe();
337        let subscriber = test_subscriber(usize::MAX, rx).await;
338        let svc = ingestion_service.run(0.., None).await.unwrap();
339
340        svc.shutdown().await.unwrap();
341        subscriber.await.unwrap();
342    }
343
344    /// The subscriber will stop after receiving a single checkpoint, and this will trigger the
345    /// ingestion service to stop as well, even if there are more checkpoints to fetch.
346    #[tokio::test]
347    async fn shutdown_on_subscriber_drop() {
348        let server = MockServer::start().await;
349        respond_with(
350            &server,
351            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(42)),
352        )
353        .await;
354        respond_with_chain_id(&server).await;
355
356        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
357
358        let (rx, _) = ingestion_service.subscribe();
359        let subscriber = test_subscriber(1, rx).await;
360        let mut svc = ingestion_service.run(0.., None).await.unwrap();
361
362        drop(subscriber);
363        svc.join().await.unwrap();
364    }
365
366    /// The service will retry fetching a checkpoint that does not exist, in this test, the 4th
367    /// checkpoint will return 404 a couple of times, before eventually succeeding.
368    #[tokio::test]
369    async fn retry_on_not_found() {
370        let server = MockServer::start().await;
371        let times: Mutex<u64> = Mutex::new(0);
372        respond_with(&server, move |_: &Request| {
373            let mut times = times.lock().unwrap();
374            *times += 1;
375            match *times {
376                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
377                4..6 => status(StatusCode::NOT_FOUND),
378                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
379            }
380        })
381        .await;
382        respond_with_chain_id(&server).await;
383
384        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
385
386        let (rx, _) = ingestion_service.subscribe();
387        let subscriber = test_subscriber(6, rx).await;
388        let _svc = ingestion_service.run(0.., None).await.unwrap();
389
390        let seqs = subscriber.await.unwrap();
391        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
392    }
393
394    /// Similar to the previous test, but now it's a transient error that causes the retry.
395    #[tokio::test]
396    async fn retry_on_transient_error() {
397        let server = MockServer::start().await;
398        let times: Mutex<u64> = Mutex::new(0);
399        respond_with(&server, move |_: &Request| {
400            let mut times = times.lock().unwrap();
401            *times += 1;
402            match *times {
403                1..4 => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
404                4..6 => status(StatusCode::REQUEST_TIMEOUT),
405                _ => status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times)),
406            }
407        })
408        .await;
409        respond_with_chain_id(&server).await;
410
411        let mut ingestion_service = test_ingestion(server.uri(), 1, 1).await;
412
413        let (rx, _) = ingestion_service.subscribe();
414        let subscriber = test_subscriber(6, rx).await;
415        let _svc = ingestion_service.run(0.., None).await.unwrap();
416
417        let seqs = subscriber.await.unwrap();
418        assert_eq!(seqs, vec![0, 1, 2, 3, 6, 7]);
419    }
420
421    /// One subscriber is going to stop processing checkpoints, so even though the service can keep
422    /// fetching checkpoints, it will stop short because of the slow receiver. Other subscribers
423    /// can keep processing checkpoints that were buffered for the slow one.
424    #[tokio::test]
425    async fn back_pressure_and_buffering() {
426        let server = MockServer::start().await;
427        let times: Mutex<u64> = Mutex::new(0);
428        respond_with(&server, move |_: &Request| {
429            let mut times = times.lock().unwrap();
430            *times += 1;
431            status(StatusCode::OK).set_body_bytes(test_checkpoint_data(*times))
432        })
433        .await;
434        respond_with_chain_id(&server).await;
435
436        let mut ingestion_service = test_ingestion(server.uri(), 3, 1).await;
437
438        // This subscriber will take its sweet time processing checkpoints.
439        let (mut laggard, _) = ingestion_service.subscribe();
440        async fn unblock(laggard: &mut mpsc::Receiver<Arc<CheckpointEnvelope>>) -> u64 {
441            let checkpoint_envelope = laggard.recv().await.unwrap();
442            checkpoint_envelope.checkpoint.summary.sequence_number
443        }
444
445        let (rx, _) = ingestion_service.subscribe();
446        let subscriber = test_subscriber(6, rx).await;
447        let _svc = ingestion_service.run(0.., None).await.unwrap();
448
449        // At this point, the service will have been able to pass 3 checkpoints to the non-lagging
450        // subscriber, while the laggard's buffer fills up. Now the laggard will pull two
451        // checkpoints, which will allow the rest of the pipeline to progress enough for the live
452        // subscriber to receive its quota. Checkpoint 0 is served by the chain_id mock.
453        assert_eq!(unblock(&mut laggard).await, 0);
454        assert_eq!(unblock(&mut laggard).await, 1);
455        assert_eq!(unblock(&mut laggard).await, 2);
456
457        let seqs = subscriber.await.unwrap();
458        assert_eq!(seqs, vec![0, 1, 2, 3, 4, 5]);
459    }
460}